view service/piNode/piNode.py @ 1035:f01d9892ed79

update arduinoNode to support streamed graph output Ignore-this: fa35d1fae5b0e411b167650550c3e77d darcs-hash:c22c59acf2d4bbabfc467085f146a5de72b19f03
author drewp <drewp@bigasterisk.com>
date Thu, 28 Jan 2016 02:24:32 -0800
parents 9328df09f679
children 4ebb5cc30002
line wrap: on
line source

from __future__ import division
import sys, logging, socket, json, time
import cyclone.web
from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
from rdflib.parser import StringInputSource
from twisted.internet import reactor, task
from docopt import docopt
logging.basicConfig(level=logging.DEBUG)
sys.path.append("/opt/homeauto_lib")
from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
from light9.rdfdb.rdflibpatch import inContext
from light9.rdfdb.patch import Patch
sys.path.append('/opt/pigpio')
try:
    import pigpio
except ImportError:
    class pigpio(object):
        @staticmethod
        def pi():
            return None

import devices

# from /my/proj/room
from carbondata import CarbonClient

log = logging.getLogger()
logging.getLogger('serial').setLevel(logging.WARN)
ROOM = Namespace('http://projects.bigasterisk.com/room/')
HOST = Namespace('http://bigasterisk.com/ruler/host/')

hostname = socket.gethostname()

class Config(object):
    def __init__(self, masterGraph):
        self.graph = ConjunctiveGraph()
        log.info('read config')
        self.graph.parse('config.n3', format='n3')
        self.graph.bind('', ROOM) # maybe working
        self.graph.bind('rdf', RDF)
        masterGraph.patch(Patch(addGraph=self.graph))

class Board(object):
    """similar to arduinoNode.Board but without the communications stuff"""
    def __init__(self, graph, masterGraph, uri):
        self.graph, self.uri = graph, uri
        self.ctx = ROOM['pi/%s' % hostname]
        self.masterGraph = masterGraph
        self.masterGraph.patch(Patch(addQuads=self.staticStmts()))
        self.pi = pigpio.pi()
        self._devs = devices.makeDevices(graph, self.uri, self.pi)
        log.debug('found %s devices', len(self._devs))
        self._statementsFromInputs = {} # input device uri: latest statements
        self._carbon = CarbonClient(serverHost='bang')

    def startPolling(self):
        task.LoopingCall(self._poll).start(.5)

    def _poll(self):
        for i in self._devs:
            prev = inContext(self._statementsFromInputs.get(i.uri, []), self.ctx)
            new = self._statementsFromInputs[i.uri] = i.poll()
            new = inContext(new, self.ctx)
            self.masterGraph.patch(Patch.fromDiff(prev, new))
        self._exportToGraphite()

    def outputStatements(self, stmts):
        unused = set(stmts)
        for dev in self._devs:
            stmtsForDev = []
            for pat in dev.outputPatterns():
                if [term is None for term in pat] != [False, False, True]:
                    raise NotImplementedError
                for stmt in stmts:
                    if stmt[:2] == pat[:2]:
                        stmtsForDev.append(stmt)
                        unused.discard(stmt)
            if stmtsForDev:
                log.info("output goes to action handler for %s" % dev.uri)
                dev.sendOutput(stmtsForDev)
                log.info("success")
        if unused:
            log.warn("No devices cared about these statements:")
            for s in unused:
                log.warn(repr(s))
                
    # needs merge with arduinoNode.py
    def _exportToGraphite(self):
        # note this is writing way too often- graphite is storing at a lower res
        now = time.time()
        # 20 sec is not precise; just trying to reduce wifi traffic
        if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
            return
        self.lastGraphiteExport = now
        log.debug('graphite export:')
        # objects of these statements are suitable as graphite values.
        graphitePredicates = {ROOM['temperatureF']}
        # bug: one sensor can have temp and humid- this will be ambiguous
        for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']):
            for group in self._statementsFromInputs.values():
                for stmt in group:
                    if stmt[0] == s and stmt[1] in graphitePredicates:
                        log.debug('  sending %s -> %s', stmt[0], graphiteName)
                        self._carbon.send(graphiteName, stmt[2].toPython(), now)

    def staticStmts(self):
        return [(HOST[socket.gethostname()], ROOM['connectedTo'], self.uri, self.ctx)]
                        
    def description(self):
        """for web page"""
        return {
            'uri': self.uri,
            'devices': [d.description() for d in self._devs],
            'graph': 'http://sticker:9059/graph', #todo
            }
        
def rdfGraphBody(body, headers):
    g = Graph()
    g.parse(StringInputSource(body), format='nt')
    return g

class OutputPage(cyclone.web.RequestHandler):
    def put(self):
        arg = self.request.arguments
        if arg.get('s') and arg.get('p'):
            subj = URIRef(arg['s'][-1])
            pred = URIRef(arg['p'][-1])
            turtleLiteral = self.request.body
            try:
                obj = Literal(float(turtleLiteral))
            except ValueError:
                obj = Literal(turtleLiteral)
            stmt = (subj, pred, obj)
        else:
            g = rdfGraphBody(self.request.body, self.request.headers)
            assert len(g) == 1, len(g)
            stmt = g.triples((None, None, None)).next()

        self.settings.board.outputStatements([stmt])

class Boards(cyclone.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'application/json')
        self.write(json.dumps({
            'host': socket.gethostname(),
            'boards': [self.settings.board.description()]
        }, indent=2))
        
def main():
    arg = docopt("""
    Usage: piNode.py [options]

    -v   Verbose
    """)
    log.setLevel(logging.WARN)
    if arg['-v']:
        from twisted.python import log as twlog
        twlog.startLogging(sys.stdout)

        log.setLevel(logging.DEBUG)
    
    masterGraph = PatchableGraph()
    config = Config(masterGraph)

    thisHost = Literal(socket.gethostname())
    for row in config.graph.query(
            'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
            initBindings=dict(h=thisHost)):
        thisBoard = row.board
        break
    else:
        raise ValueError("config had no board for :hostname %r" % thisHost)

    log.info("found config for board %r" % thisBoard)
    board = Board(config.graph, masterGraph, thisBoard)
    board.startPolling()
    
    reactor.listenTCP(9059, cyclone.web.Application([
        (r"/()", cyclone.web.StaticFileHandler, {
            "path": "../arduinoNode/static", "default_filename": "index.html"}),
        (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "../arduinoNode/static"}),
        (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
        (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
        (r'/output', OutputPage),
        (r'/boards', Boards),
        #(r'/dot', Dot),
        ], config=config, board=board, debug=arg['-v']), interface='::')
    reactor.run()

main()