diff service/piNode/piNode.py @ 233:4ebb5cc30002

server/browser graph sync. cut dependency on the WS version. merge some changes between arduino/pi code. Ignore-this: cf7d20d54e134e8ff33a9ee405610846
author drewp@bigasterisk.com
date Sat, 30 Jan 2016 06:40:00 -0800
parents 0aa54404df19
children 141079644c45
line wrap: on
line diff
--- a/service/piNode/piNode.py	Thu Jan 28 02:48:54 2016 -0800
+++ b/service/piNode/piNode.py	Sat Jan 30 06:40:00 2016 -0800
@@ -1,10 +1,11 @@
 from __future__ import division
-import sys, logging, socket, json, time
+import sys, logging, socket, json, time, os
 import cyclone.web
 from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
 from rdflib.parser import StringInputSource
 from twisted.internet import reactor, task
 from docopt import docopt
+
 logging.basicConfig(level=logging.DEBUG)
 sys.path.append("/opt/homeauto_lib")
 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
@@ -26,25 +27,30 @@
 
 log = logging.getLogger()
 logging.getLogger('serial').setLevel(logging.WARN)
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 HOST = Namespace('http://bigasterisk.com/ruler/host/')
 
 hostname = socket.gethostname()
 
+CTX = ROOM['pi/%s' % hostname]
+
 class Config(object):
     def __init__(self, masterGraph):
         self.graph = ConjunctiveGraph()
         log.info('read config')
-        self.graph.parse('config.n3', format='n3')
-        self.graph.bind('', ROOM) # maybe working
+        for f in os.listdir('config'):
+            if f.startswith('.'): continue
+            self.graph.parse('config/%s' % f, format='n3')
+        self.graph.bind('', ROOM) # not working
         self.graph.bind('rdf', RDF)
-        masterGraph.patch(Patch(addGraph=self.graph))
+        # config graph is too noisy; maybe make it a separate resource
+        #masterGraph.patch(Patch(addGraph=self.graph))
 
 class Board(object):
     """similar to arduinoNode.Board but without the communications stuff"""
     def __init__(self, graph, masterGraph, uri):
         self.graph, self.uri = graph, uri
-        self.ctx = ROOM['pi/%s' % hostname]
         self.masterGraph = masterGraph
         self.masterGraph.patch(Patch(addQuads=self.staticStmts()))
         self.pi = pigpio.pi()
@@ -52,39 +58,19 @@
         log.debug('found %s devices', len(self._devs))
         self._statementsFromInputs = {} # input device uri: latest statements
         self._carbon = CarbonClient(serverHost='bang')
-
+        for d in self._devs:
+            self.syncMasterGraphToHostStatements(d)
     def startPolling(self):
         task.LoopingCall(self._poll).start(.5)
 
     def _poll(self):
         for i in self._devs:
-            prev = inContext(self._statementsFromInputs.get(i.uri, []), self.ctx)
+            prev = inContext(self._statementsFromInputs.get(i.uri, []), CTX)
             new = self._statementsFromInputs[i.uri] = i.poll()
-            new = inContext(new, self.ctx)
+            new = inContext(new, CTX)
             self.masterGraph.patch(Patch.fromDiff(prev, new))
         self._exportToGraphite()
 
-    def outputStatements(self, stmts):
-        unused = set(stmts)
-        for dev in self._devs:
-            stmtsForDev = []
-            for pat in dev.outputPatterns():
-                if [term is None for term in pat] != [False, False, True]:
-                    raise NotImplementedError
-                for stmt in stmts:
-                    if stmt[:2] == pat[:2]:
-                        stmtsForDev.append(stmt)
-                        unused.discard(stmt)
-            if stmtsForDev:
-                log.info("output goes to action handler for %s" % dev.uri)
-                dev.sendOutput(stmtsForDev)
-                log.info("success")
-        if unused:
-            log.warn("No devices cared about these statements:")
-            for s in unused:
-                log.warn(repr(s))
-                
-    # needs merge with arduinoNode.py
     def _exportToGraphite(self):
         # note this is writing way too often- graphite is storing at a lower res
         now = time.time()
@@ -103,9 +89,42 @@
                         log.debug('  sending %s -> %s', stmt[0], graphiteName)
                         self._carbon.send(graphiteName, stmt[2].toPython(), now)
 
+    def outputStatements(self, stmts):
+        unused = set(stmts)
+        for dev in self._devs:
+            stmtsForDev = []
+            for pat in dev.outputPatterns():
+                if [term is None for term in pat] != [False, False, True]:
+                    raise NotImplementedError
+                for stmt in stmts:
+                    if stmt[:2] == pat[:2]:
+                        stmtsForDev.append(stmt)
+                        unused.discard(stmt)
+            if stmtsForDev:
+                log.info("output goes to action handler for %s" % dev.uri)
+                dev.sendOutput(stmtsForDev)
+
+                # Dev *could* change hostStatements at any time, and
+                # we're not currently tracking that, but the usual is
+                # to change them in response to sendOutput so this
+                # should be good enough. The right answer is to give
+                # each dev the masterGraph for it to write to.
+                self.syncMasterGraphToHostStatements(dev)
+                log.info("output and masterGraph sync complete")
+        if unused:
+            log.info("Board %s doesn't care about these statements:", self.uri)
+            for s in unused:
+                log.warn("%r", s)
+
+    def syncMasterGraphToHostStatements(self, dev):
+        hostStmtCtx = URIRef(dev.uri + '/host')
+        newQuads = inContext(dev.hostStatements(), hostStmtCtx)
+        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
+        log.debug("patch master with these host stmts %s", p)
+
     def staticStmts(self):
-        return [(HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, self.ctx)]
-                        
+        return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)]
+
     def description(self):
         """for web page"""
         return {
@@ -114,6 +133,12 @@
             'graph': 'http://sticker:9059/graph', #todo
             }
         
+class Dot(cyclone.web.RequestHandler):
+    def get(self):
+        configGraph = self.settings.config.graph
+        dot = dotrender.render(configGraph, self.settings.boards)
+        self.write(dot)
+        
 def rdfGraphBody(body, headers):
     g = Graph()
     g.parse(StringInputSource(body), format='nt')
@@ -142,7 +167,7 @@
     def get(self):
         self.set_header('Content-type', 'application/json')
         self.write(json.dumps({
-            'host': socket.gethostname(),
+            'host': hostname,
             'boards': [self.settings.board.description()]
         }, indent=2))
         
@@ -158,11 +183,11 @@
         twlog.startLogging(sys.stdout)
 
         log.setLevel(logging.DEBUG)
-    
+
     masterGraph = PatchableGraph()
     config = Config(masterGraph)
 
-    thisHost = Literal(socket.gethostname())
+    thisHost = Literal(hostname)
     for row in config.graph.query(
             'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
             initBindings=dict(h=thisHost)):
@@ -179,11 +204,11 @@
         (r"/()", cyclone.web.StaticFileHandler, {
             "path": "../arduinoNode/static", "default_filename": "index.html"}),
         (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "../arduinoNode/static"}),
+        (r'/boards', Boards),
         (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
         (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
-        (r'/boards', Boards),
-        #(r'/dot', Dot),
+        (r'/dot', Dot),
         ], config=config, board=board, debug=arg['-v']), interface='::')
     reactor.run()