Mercurial > code > home > repos > homeauto
changeset 230:0aa54404df19
update arduinoNode to support streamed graph output
Ignore-this: fa35d1fae5b0e411b167650550c3e77d
author | drewp@bigasterisk.com |
---|---|
date | Thu, 28 Jan 2016 02:24:32 -0800 |
parents | 07ee72a042db |
children | c89f88cbdcc6 |
files | service/arduinoNode/arduinoNode.py service/arduinoNode/devices.py service/piNode/piNode.py |
diffstat | 3 files changed, 61 insertions(+), 66 deletions(-) [+] |
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py Thu Jan 28 00:21:31 2016 -0800 +++ b/service/arduinoNode/arduinoNode.py Thu Jan 28 02:24:32 2016 -0800 @@ -8,7 +8,7 @@ import shutil, json import serial import cyclone.web -from rdflib import Graph, Namespace, URIRef, Literal, RDF +from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph from rdflib.parser import StringInputSource from twisted.internet import reactor, task from docopt import docopt @@ -23,8 +23,12 @@ from loggingserial import LoggingSerial -sys.path.append("/my/site/magma") -from stategraph import StateGraph +sys.path.append("../../lib") +from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler + +sys.path.append("/my/proj/light9") +from light9.rdfdb.patch import Patch +from light9.rdfdb.rdflibpatch import inContext sys.path.append("/my/proj/room") from carbondata import CarbonClient @@ -37,15 +41,19 @@ ACTION_BASE = 10 # higher than any of the fixed command numbers +CTX = ROOM['arduinosOn%s' % socket.gethostname()] + class Config(object): - def __init__(self): - self.graph = Graph() + def __init__(self, masterGraph): + self.graph = ConjunctiveGraph() log.info('read config') for f in os.listdir('config'): if f.startswith('.'): continue self.graph.parse('config/%s' % f, format='n3') self.graph.bind('', ROOM) # not working self.graph.bind('rdf', RDF) + # config graph is too noisy; maybe make it a separate resource + #masterGraph.patch(Patch(addGraph=self.graph)) def serialDevices(self): return dict([(row.dev, row.board) for row in self.graph.query( @@ -57,20 +65,22 @@ class Board(object): """an arduino connected to this computer""" baudrate = 115200 - def __init__(self, dev, graph, uri, onChange): + def __init__(self, dev, configGraph, masterGraph, uri): """ each connected thing has some pins. - - We'll call onChange when we know the currentGraph() has - changed (and not just in creation time). """ self.uri = uri - self.graph = graph + self.configGraph = configGraph + self.masterGraph = masterGraph self.dev = dev + + self.masterGraph.patch(Patch(addQuads=[ + (HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, CTX), + ])) # The order of this list needs to be consistent between the # deployToArduino call and the poll call. - self._devs = devices.makeDevices(graph, self.uri) + self._devs = devices.makeDevices(configGraph, self.uri) self._devCommandNum = dict((dev.uri, ACTION_BASE + devIndex) for devIndex, dev in enumerate(self._devs)) self._polledDevs = [d for d in self._devs if d.generatePollCode()] @@ -78,6 +88,8 @@ self._statementsFromInputs = {} # input device uri: latest statements self._carbon = CarbonClient(serverHost='bang') self.open() + for d in self._devs: + self.syncMasterGraphToHostStatements(d) def description(self): """for web page""" @@ -107,13 +119,18 @@ reactor.crash() raise except Exception as e: + import traceback; traceback.print_exc() log.warn("poll: %r" % e) def _pollWork(self): t1 = time.time() self.ser.write("\x60\x00") for i in self._polledDevs: - self._statementsFromInputs[i.uri] = i.readFromPoll(self.ser.read) + prev = self._statementsFromInputs.get(i.uri, []) + new = self._statementsFromInputs[i.uri] = inContext( + i.readFromPoll(self.ser.read), i.uri) + self.masterGraph.patch(Patch.fromDiff(prev, new)) + #plus statements about succeeding or erroring on the last poll byte = self.ser.read(1) if byte != 'x': @@ -128,25 +145,12 @@ now = time.time() # objects of these statements are suitable as graphite values. graphitePredicates = {ROOM['temperatureF']} - for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']): + for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']): for group in self._statementsFromInputs.values(): for stmt in group: if stmt[0] == s and stmt[1] in graphitePredicates: self._carbon.send(graphiteName, stmt[2].toPython(), now) - def currentGraph(self): - g = Graph() - - g.add((HOST[socket.gethostname()], ROOM['connectedTo'], self.uri)) - - for si in self._statementsFromInputs.values(): - for s in si: - g.add(s) - for dev in self._devs: - for stmt in dev.hostStatements(): - g.add(stmt) - return g - def outputStatements(self, stmts): unused = set(stmts) for dev in self._devs: @@ -166,12 +170,23 @@ raise ValueError( "%s sendOutput/generateActionCode didn't use " "matching output bytes" % dev.__class__) + # Dev *could* change hostStatements at any time, and + # we're not currently tracking that, but the usual is + # to change them in response to sendOutput so this + # should be good enough. The right answer is to give + # each dev the masterGraph for it to write to. + self.syncMasterGraphToHostStatements(dev) log.info("success") if unused: log.info("Board %s doesn't care about these statements:", self.uri) for s in unused: log.info("%r", s) - + + def syncMasterGraphToHostStatements(self, dev): + hostStmtCtx = URIRef(dev.uri + '/host') + newQuads = inContext(dev.hostStatements(), hostStmtCtx) + self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) + def generateArduinoCode(self): code = write_arduino_code.writeCode(self.baudrate, self._devs, self._devCommandNum) code = write_arduino_code.indent(code) @@ -224,7 +239,7 @@ with open(workDir + '/makefile', 'w') as makefile: makefile.write(write_arduino_code.writeMakefile( dev=self.dev, - tag=self.graph.value(self.uri, ROOM['boardTag']), + tag=self.configGraph.value(self.uri, ROOM['boardTag']), allLibs=sum((d.generateArduinoLibs() for d in self._devs), []))) with open(workDir + '/main.ino', 'w') as main: @@ -232,26 +247,15 @@ subprocess.check_call(['make', 'upload'], cwd=workDir) + + def currentGraph(self): + g = Graph() -class GraphPage(cyclone.web.RequestHandler): - def get(self): - g = StateGraph(ctx=ROOM['arduinosOn%s' % 'host']) - - for b in self.settings.boards: - for stmt in b.currentGraph(): - g.add(stmt) - if self.get_argument('config', 'no') == 'yes': - for stmt in self.settings.config.graph: + for dev in self._devs: + for stmt in dev.hostStatements(): g.add(stmt) - - if self.request.headers.get('accept') == 'application/ld+json': - self.set_header('Content-type', 'application/ld+json') - self.write(g.asJsonLd()) - return - - self.set_header('Content-type', 'application/x-trig') - self.write(g.asTrig()) + return g class Dot(cyclone.web.RequestHandler): def get(self): @@ -306,7 +310,7 @@ def currentSerialDevices(): log.info('find connected boards') return glob.glob('/dev/serial/by-id/*') - + def main(): arg = docopt(""" Usage: arduinoNode.py [options] @@ -319,20 +323,17 @@ twlog.startLogging(sys.stdout) log.setLevel(logging.DEBUG) - - config = Config() + + masterGraph = PatchableGraph() + config = Config(masterGraph) current = currentSerialDevices() - def onChange(): - # notify reasoning - pass - boards = [] for dev, board in config.serialDevices().items(): if str(dev) not in current: continue log.info("we have board %s connected at %s" % (board, dev)) - b = Board(dev, config.graph, board, onChange) + b = Board(dev, config.graph, masterGraph, board) boards.append(b) for b in boards: @@ -348,7 +349,8 @@ "path": "static", "default_filename": "index.html"}), (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}), (r'/boards', Boards), - (r"/graph", GraphPage), + (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), (r'/arduinoCode', ArduinoCode), (r'/dot', Dot),
--- a/service/arduinoNode/devices.py Thu Jan 28 00:21:31 2016 -0800 +++ b/service/arduinoNode/devices.py Thu Jan 28 02:24:32 2016 -0800 @@ -320,7 +320,11 @@ sensorUri = URIRef(os.path.join(self.uri, 'dev-%s' % hex(addr)[2:])) stmts.extend([ (self.uri, ROOM['connectedTo'], sensorUri), - (sensorUri, ROOM['temperatureF'], Literal(tempF))]) + # rounding may be working around a bug where the + # Literal gets two representations (at different + # roundings), and this makes an extra value linger on + # the client. + (sensorUri, ROOM['temperatureF'], Literal(round(tempF, 2)))]) self._knownTempSubjects.add(sensorUri) log.debug("read temp in %.1fms" % ((time.time() - t1) * 1000))
--- a/service/piNode/piNode.py Thu Jan 28 00:21:31 2016 -0800 +++ b/service/piNode/piNode.py Thu Jan 28 02:24:32 2016 -0800 @@ -106,17 +106,6 @@ def staticStmts(self): return [(HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, self.ctx)] - def currentGraph(self): - g = Graph() - - for s in self.staticStmts(): - g.add(s[:3]) - - for si in self._statementsFromInputs.values(): - for s in si: - g.add(s) - return g - def description(self): """for web page""" return {