Mercurial > code > home > repos > homeauto
view service/starArduino/starArduino.py @ 438:d453cdd8a86f
start dev mode nginx config. not working yet
Ignore-this: d18b11cd5566d844bf126b8a9e50b912
author | drewp@bigasterisk.com |
---|---|
date | Sat, 13 Apr 2019 20:56:30 -0700 |
parents | ca380454d176 |
children |
line wrap: on
line source
""" arduino driver for the nightlight+buttons+temp setup running on star """ from __future__ import division import sys,json from twisted.internet import reactor, task from rdflib import Namespace import cyclone.web from cyclone.httpclient import fetch sys.path.append("/my/proj/pixel/shiftweb") from drvarduino import ShiftbriteArduino from shiftweb import hexFromRgb, rgbFromHex sys.path.append("/my/proj/homeauto/lib") from cycloneerr import PrettyErrorHandler from logsetup import log sys.path.append("/my/proj/ariremote") from oscserver import ArduinoWatcher ROOM = Namespace("http://projects.bigasterisk.com/room/") class Index(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): self.settings.arduino.ping() self.set_header("Content-Type", "application/xhtml+xml") self.write(open("index.html").read()) class Temperature(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): f = self.settings.arduino.getTemperature() self.set_header("Content-Type", "application/json") self.write(json.dumps({"temp" : f})) class Brite(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self, pos): self.set_header("Content-Type", "text/plain") self.write(hexFromRgb(self.settings.colors[int(pos)])) def put(self, pos): channel = int(pos) colors = self.settings.colors colors[channel] = rgbFromHex(self.request.body) self.settings.arduino.update(colors) self.set_header("Content-Type", "text/plain") self.write("updated %r" % colors) class Barcode(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/plain") ard = self.settings.arduino ard.ser.write("\x60\x02") self.write(str(ard.readJson())) class BarcodeBeep(PrettyErrorHandler, cyclone.web.RequestHandler): def put(self): self.set_header("Content-Type", "text/plain") ard = self.settings.arduino ard.ser.write("\x60\x03") self.write(str(ard.readJson())) def barcodeWatch(arduino, postBarcode): arduino.ser.write("\xff\xfb") ret = arduino.readJson() if not ret['barcode']: return if ret['barcode'] == "27 80 48 13 ": return # scanner's beep response arduino.ser.write("\xff\xfc") arduino.readJson() # my beep response s = ''.join(chr(int(x)) for x in ret['barcode'].split()) for code in s.split('\x02'): if not code: continue if not code.endswith('\x03'): log.warn("couldn't read %r", code) return codeType = {'A':'UPC-A', 'B':'JAN-8', 'E':'UPC-E', 'N':'NW-7', 'C':'CODE39', 'I':'ITF', 'K':'CODE128', }[code[0]] code = code[1:-1] body = "%s %s %s ." % ( ROOM['barcodeScan'].n3(), ROOM['read'].n3(), ROOM['barcode/%s/%s' % (codeType, code)].n3()) body = body.encode('utf8') print "body: %r" % body fetch("http://bang:9071/oneShot", method='POST', timeout=1, postdata=body, headers={"content-type" : ["text/n3"]}, ).addErrback(log.error) class Graph(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): raise NotImplementedError if __name__ == '__main__': class A(ShiftbriteArduino): # from loggingserial.py def readJson(self): line = '' while True: c = self.ser.read(1) #print "gotchar", repr(c) if c: line += c if c == "\n": break else: raise ValueError("timed out waiting for chars") return json.loads(line) sb = A(numChannels=3) colors = [(0,0,0)] * sb.numChannels aw = ArduinoWatcher(sb) task.LoopingCall(aw.poll).start(1.0/20) postBarcode = 'http://star:9011/barcodeScan' task.LoopingCall(barcodeWatch, sb, postBarcode).start(interval=.5) reactor.listenTCP(9014, cyclone.web.Application([ (r'/', Index), (r'/temperature', Temperature), (r'/brite/(\d+)', Brite), (r'/barcode', Barcode), (r'/barcode/beep', BarcodeBeep), (r'/graph', Graph), ], arduino=sb, colors=colors)) reactor.run()