Mercurial > code > home > repos > homeauto
view service/piNode/piNode.py @ 1035:f01d9892ed79
update arduinoNode to support streamed graph output
Ignore-this: fa35d1fae5b0e411b167650550c3e77d
darcs-hash:c22c59acf2d4bbabfc467085f146a5de72b19f03
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 28 Jan 2016 02:24:32 -0800 |
parents | 9328df09f679 |
children | 4ebb5cc30002 |
line wrap: on
line source
from __future__ import division import sys, logging, socket, json, time import cyclone.web from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph from rdflib.parser import StringInputSource from twisted.internet import reactor, task from docopt import docopt logging.basicConfig(level=logging.DEBUG) sys.path.append("/opt/homeauto_lib") from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler from light9.rdfdb.rdflibpatch import inContext from light9.rdfdb.patch import Patch sys.path.append('/opt/pigpio') try: import pigpio except ImportError: class pigpio(object): @staticmethod def pi(): return None import devices # from /my/proj/room from carbondata import CarbonClient log = logging.getLogger() logging.getLogger('serial').setLevel(logging.WARN) ROOM = Namespace('http://projects.bigasterisk.com/room/') HOST = Namespace('http://bigasterisk.com/ruler/host/') hostname = socket.gethostname() class Config(object): def __init__(self, masterGraph): self.graph = ConjunctiveGraph() log.info('read config') self.graph.parse('config.n3', format='n3') self.graph.bind('', ROOM) # maybe working self.graph.bind('rdf', RDF) masterGraph.patch(Patch(addGraph=self.graph)) class Board(object): """similar to arduinoNode.Board but without the communications stuff""" def __init__(self, graph, masterGraph, uri): self.graph, self.uri = graph, uri self.ctx = ROOM['pi/%s' % hostname] self.masterGraph = masterGraph self.masterGraph.patch(Patch(addQuads=self.staticStmts())) self.pi = pigpio.pi() self._devs = devices.makeDevices(graph, self.uri, self.pi) log.debug('found %s devices', len(self._devs)) self._statementsFromInputs = {} # input device uri: latest statements self._carbon = CarbonClient(serverHost='bang') def startPolling(self): task.LoopingCall(self._poll).start(.5) def _poll(self): for i in self._devs: prev = inContext(self._statementsFromInputs.get(i.uri, []), self.ctx) new = self._statementsFromInputs[i.uri] = i.poll() new = inContext(new, self.ctx) self.masterGraph.patch(Patch.fromDiff(prev, new)) self._exportToGraphite() def outputStatements(self, stmts): unused = set(stmts) for dev in self._devs: stmtsForDev = [] for pat in dev.outputPatterns(): if [term is None for term in pat] != [False, False, True]: raise NotImplementedError for stmt in stmts: if stmt[:2] == pat[:2]: stmtsForDev.append(stmt) unused.discard(stmt) if stmtsForDev: log.info("output goes to action handler for %s" % dev.uri) dev.sendOutput(stmtsForDev) log.info("success") if unused: log.warn("No devices cared about these statements:") for s in unused: log.warn(repr(s)) # needs merge with arduinoNode.py def _exportToGraphite(self): # note this is writing way too often- graphite is storing at a lower res now = time.time() # 20 sec is not precise; just trying to reduce wifi traffic if getattr(self, 'lastGraphiteExport', 0) + 20 > now: return self.lastGraphiteExport = now log.debug('graphite export:') # objects of these statements are suitable as graphite values. graphitePredicates = {ROOM['temperatureF']} # bug: one sensor can have temp and humid- this will be ambiguous for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']): for group in self._statementsFromInputs.values(): for stmt in group: if stmt[0] == s and stmt[1] in graphitePredicates: log.debug(' sending %s -> %s', stmt[0], graphiteName) self._carbon.send(graphiteName, stmt[2].toPython(), now) def staticStmts(self): return [(HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, self.ctx)] def description(self): """for web page""" return { 'uri': self.uri, 'devices': [d.description() for d in self._devs], 'graph': 'http://sticker:9059/graph', #todo } def rdfGraphBody(body, headers): g = Graph() g.parse(StringInputSource(body), format='nt') return g class OutputPage(cyclone.web.RequestHandler): def put(self): arg = self.request.arguments if arg.get('s') and arg.get('p'): subj = URIRef(arg['s'][-1]) pred = URIRef(arg['p'][-1]) turtleLiteral = self.request.body try: obj = Literal(float(turtleLiteral)) except ValueError: obj = Literal(turtleLiteral) stmt = (subj, pred, obj) else: g = rdfGraphBody(self.request.body, self.request.headers) assert len(g) == 1, len(g) stmt = g.triples((None, None, None)).next() self.settings.board.outputStatements([stmt]) class Boards(cyclone.web.RequestHandler): def get(self): self.set_header('Content-type', 'application/json') self.write(json.dumps({ 'host': socket.gethostname(), 'boards': [self.settings.board.description()] }, indent=2)) def main(): arg = docopt(""" Usage: piNode.py [options] -v Verbose """) log.setLevel(logging.WARN) if arg['-v']: from twisted.python import log as twlog twlog.startLogging(sys.stdout) log.setLevel(logging.DEBUG) masterGraph = PatchableGraph() config = Config(masterGraph) thisHost = Literal(socket.gethostname()) for row in config.graph.query( 'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }', initBindings=dict(h=thisHost)): thisBoard = row.board break else: raise ValueError("config had no board for :hostname %r" % thisHost) log.info("found config for board %r" % thisBoard) board = Board(config.graph, masterGraph, thisBoard) board.startPolling() reactor.listenTCP(9059, cyclone.web.Application([ (r"/()", cyclone.web.StaticFileHandler, { "path": "../arduinoNode/static", "default_filename": "index.html"}), (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "../arduinoNode/static"}), (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), (r'/boards', Boards), #(r'/dot', Dot), ], config=config, board=board, debug=arg['-v']), interface='::') reactor.run() main()