changeset 230:0aa54404df19

update arduinoNode to support streamed graph output Ignore-this: fa35d1fae5b0e411b167650550c3e77d
author drewp@bigasterisk.com
date Thu, 28 Jan 2016 02:24:32 -0800
parents 07ee72a042db
children c89f88cbdcc6
files service/arduinoNode/arduinoNode.py service/arduinoNode/devices.py service/piNode/piNode.py
diffstat 3 files changed, 61 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py	Thu Jan 28 00:21:31 2016 -0800
+++ b/service/arduinoNode/arduinoNode.py	Thu Jan 28 02:24:32 2016 -0800
@@ -8,7 +8,7 @@
 import shutil, json
 import serial
 import cyclone.web
-from rdflib import Graph, Namespace, URIRef, Literal, RDF
+from rdflib import Graph, Namespace, URIRef, Literal, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
 from docopt import docopt
@@ -23,8 +23,12 @@
 
 from loggingserial import LoggingSerial
 
-sys.path.append("/my/site/magma")
-from stategraph import StateGraph
+sys.path.append("../../lib")
+from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
+
+sys.path.append("/my/proj/light9")
+from light9.rdfdb.patch import Patch
+from light9.rdfdb.rdflibpatch import inContext
 
 sys.path.append("/my/proj/room")
 from carbondata import CarbonClient
@@ -37,15 +41,19 @@
 
 ACTION_BASE = 10 # higher than any of the fixed command numbers
 
+CTX = ROOM['arduinosOn%s' % socket.gethostname()]
+
 class Config(object):
-    def __init__(self):
-        self.graph = Graph()
+    def __init__(self, masterGraph):
+        self.graph = ConjunctiveGraph()
         log.info('read config')
         for f in os.listdir('config'):
             if f.startswith('.'): continue
             self.graph.parse('config/%s' % f, format='n3')
         self.graph.bind('', ROOM) # not working
         self.graph.bind('rdf', RDF)
+        # config graph is too noisy; maybe make it a separate resource
+        #masterGraph.patch(Patch(addGraph=self.graph))
 
     def serialDevices(self):
         return dict([(row.dev, row.board) for row in self.graph.query(
@@ -57,20 +65,22 @@
 class Board(object):
     """an arduino connected to this computer"""
     baudrate = 115200
-    def __init__(self, dev, graph, uri, onChange):
+    def __init__(self, dev, configGraph, masterGraph, uri):
         """
         each connected thing has some pins.
-
-        We'll call onChange when we know the currentGraph() has
-        changed (and not just in creation time).
         """
         self.uri = uri
-        self.graph = graph
+        self.configGraph = configGraph
+        self.masterGraph = masterGraph
         self.dev = dev
+
+        self.masterGraph.patch(Patch(addQuads=[
+            (HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, CTX),
+        ]))
         
         # The order of this list needs to be consistent between the
         # deployToArduino call and the poll call.
-        self._devs = devices.makeDevices(graph, self.uri)
+        self._devs = devices.makeDevices(configGraph, self.uri)
         self._devCommandNum = dict((dev.uri, ACTION_BASE + devIndex)
                                    for devIndex, dev in enumerate(self._devs))
         self._polledDevs = [d for d in self._devs if d.generatePollCode()]
@@ -78,6 +88,8 @@
         self._statementsFromInputs = {} # input device uri: latest statements
         self._carbon = CarbonClient(serverHost='bang')
         self.open()
+        for d in self._devs:
+            self.syncMasterGraphToHostStatements(d)
 
     def description(self):
         """for web page"""
@@ -107,13 +119,18 @@
             reactor.crash()
             raise
         except Exception as e:
+            import traceback; traceback.print_exc()
             log.warn("poll: %r" % e)
             
     def _pollWork(self):
         t1 = time.time()
         self.ser.write("\x60\x00")
         for i in self._polledDevs:
-            self._statementsFromInputs[i.uri] = i.readFromPoll(self.ser.read)
+            prev = self._statementsFromInputs.get(i.uri, [])
+            new = self._statementsFromInputs[i.uri] = inContext(
+                i.readFromPoll(self.ser.read), i.uri)
+            self.masterGraph.patch(Patch.fromDiff(prev, new))
+            
         #plus statements about succeeding or erroring on the last poll
         byte = self.ser.read(1)
         if byte != 'x':
@@ -128,25 +145,12 @@
         now = time.time()
         # objects of these statements are suitable as graphite values.
         graphitePredicates = {ROOM['temperatureF']} 
-        for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']):
+        for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']):
             for group in self._statementsFromInputs.values():
                 for stmt in group:
                     if stmt[0] == s and stmt[1] in graphitePredicates:
                         self._carbon.send(graphiteName, stmt[2].toPython(), now)
         
-    def currentGraph(self):
-        g = Graph()
-        
-        g.add((HOST[socket.gethostname()], ROOM['connectedTo'], self.uri))
-
-        for si in self._statementsFromInputs.values():
-            for s in si:
-                g.add(s)
-        for dev in self._devs:
-            for stmt in dev.hostStatements():
-                g.add(stmt)
-        return g
-
     def outputStatements(self, stmts):
         unused = set(stmts)
         for dev in self._devs:
@@ -166,12 +170,23 @@
                     raise ValueError(
                         "%s sendOutput/generateActionCode didn't use "
                         "matching output bytes" % dev.__class__)
+                # Dev *could* change hostStatements at any time, and
+                # we're not currently tracking that, but the usual is
+                # to change them in response to sendOutput so this
+                # should be good enough. The right answer is to give
+                # each dev the masterGraph for it to write to.
+                self.syncMasterGraphToHostStatements(dev)
                 log.info("success")
         if unused:
             log.info("Board %s doesn't care about these statements:", self.uri)
             for s in unused:
                 log.info("%r", s)
-        
+
+    def syncMasterGraphToHostStatements(self, dev):
+        hostStmtCtx = URIRef(dev.uri + '/host')
+        newQuads = inContext(dev.hostStatements(), hostStmtCtx)
+        self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
+                
     def generateArduinoCode(self):
         code = write_arduino_code.writeCode(self.baudrate, self._devs, self._devCommandNum)
         code = write_arduino_code.indent(code)
@@ -224,7 +239,7 @@
         with open(workDir + '/makefile', 'w') as makefile:
             makefile.write(write_arduino_code.writeMakefile(
                 dev=self.dev,
-                tag=self.graph.value(self.uri, ROOM['boardTag']),
+                tag=self.configGraph.value(self.uri, ROOM['boardTag']),
                 allLibs=sum((d.generateArduinoLibs() for d in self._devs), [])))
 
         with open(workDir + '/main.ino', 'w') as main:
@@ -232,26 +247,15 @@
 
         subprocess.check_call(['make', 'upload'], cwd=workDir)
         
+
+    def currentGraph(self):
+        g = Graph()
         
-class GraphPage(cyclone.web.RequestHandler):
-    def get(self):
-        g = StateGraph(ctx=ROOM['arduinosOn%s' % 'host'])
-
-        for b in self.settings.boards:
-            for stmt in b.currentGraph():
-                g.add(stmt)
 
-        if self.get_argument('config', 'no') == 'yes':
-            for stmt in self.settings.config.graph:
+        for dev in self._devs:
+            for stmt in dev.hostStatements():
                 g.add(stmt)
-
-        if self.request.headers.get('accept') == 'application/ld+json':
-            self.set_header('Content-type', 'application/ld+json')
-            self.write(g.asJsonLd())
-            return
-            
-        self.set_header('Content-type', 'application/x-trig')
-        self.write(g.asTrig())
+        return g
 
 class Dot(cyclone.web.RequestHandler):
     def get(self):
@@ -306,7 +310,7 @@
 def currentSerialDevices():
     log.info('find connected boards')
     return glob.glob('/dev/serial/by-id/*')
-
+        
 def main():
     arg = docopt("""
     Usage: arduinoNode.py [options]
@@ -319,20 +323,17 @@
         twlog.startLogging(sys.stdout)
 
         log.setLevel(logging.DEBUG)
-    
-    config = Config()
+
+    masterGraph = PatchableGraph()
+    config = Config(masterGraph)
     current = currentSerialDevices()
 
-    def onChange():
-        # notify reasoning
-        pass
-    
     boards = []
     for dev, board in config.serialDevices().items():
         if str(dev) not in current:
             continue
         log.info("we have board %s connected at %s" % (board, dev))
-        b = Board(dev, config.graph, board, onChange)
+        b = Board(dev, config.graph, masterGraph, board)
         boards.append(b)
 
     for b in boards:
@@ -348,7 +349,8 @@
             "path": "static", "default_filename": "index.html"}),
         (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "static"}),
         (r'/boards', Boards),
-        (r"/graph", GraphPage),
+        (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
+        (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
         (r'/arduinoCode', ArduinoCode),
         (r'/dot', Dot),
--- a/service/arduinoNode/devices.py	Thu Jan 28 00:21:31 2016 -0800
+++ b/service/arduinoNode/devices.py	Thu Jan 28 02:24:32 2016 -0800
@@ -320,7 +320,11 @@
             sensorUri = URIRef(os.path.join(self.uri, 'dev-%s' % hex(addr)[2:]))
             stmts.extend([
                 (self.uri, ROOM['connectedTo'], sensorUri),
-                (sensorUri, ROOM['temperatureF'], Literal(tempF))])
+                # rounding may be working around a bug where the
+                # Literal gets two representations (at different
+                # roundings), and this makes an extra value linger on
+                # the client.
+                (sensorUri, ROOM['temperatureF'], Literal(round(tempF, 2)))])
             self._knownTempSubjects.add(sensorUri)
 
         log.debug("read temp in %.1fms" % ((time.time() - t1) * 1000))
--- a/service/piNode/piNode.py	Thu Jan 28 00:21:31 2016 -0800
+++ b/service/piNode/piNode.py	Thu Jan 28 02:24:32 2016 -0800
@@ -106,17 +106,6 @@
     def staticStmts(self):
         return [(HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, self.ctx)]
                         
-    def currentGraph(self):
-        g = Graph()
-
-        for s in self.staticStmts():
-            g.add(s[:3])
-
-        for si in self._statementsFromInputs.values():
-            for s in si:
-                g.add(s)
-        return g
-
     def description(self):
         """for web page"""
         return {