view service/piNode/piNode.py @ 1131:f2e1f28c2efb

piNode pkg upgrades Ignore-this: adac149aff17f7551ce3a71b6d5b30b1 darcs-hash:3ab7402aacb90985cb1bcd4b548a075ff715c300
author drewp <drewp@bigasterisk.com>
date Mon, 15 Jan 2018 03:01:39 -0800
parents 2a87739c243d
children a94f2a522d41
line wrap: on
line source

from __future__ import division
import sys, logging, socket, json, time, os
import cyclone.web
from cyclone.httpclient import fetch
from rdflib import Namespace, URIRef, Literal, Graph, RDF, ConjunctiveGraph
from rdflib.parser import StringInputSource
from twisted.internet import reactor, task
from docopt import docopt

logging.basicConfig(level=logging.DEBUG)
sys.path.append("/opt/homeauto_lib")
from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
from light9.rdfdb.rdflibpatch import inContext
from light9.rdfdb.patch import Patch
sys.path.append('/opt/pigpio')
try:
    import pigpio
except ImportError:
    class pigpio(object):
        @staticmethod
        def pi():
            return None

import devices
from export_to_influxdb import InfluxExporter

log = logging.getLogger()
logging.getLogger('serial').setLevel(logging.WARN)

ROOM = Namespace('http://projects.bigasterisk.com/room/')
HOST = Namespace('http://bigasterisk.com/ruler/host/')

hostname = socket.gethostname()

CTX = ROOM['pi/%s' % hostname]

def patchRandid():
    """
    I'm concerned urandom is slow on raspberry pi, and I'm adding to
    graphs a lot. Unclear what the ordered return values might do to
    the balancing of the graph.
    """
    _id_serial = [1000]
    def randid():
        _id_serial[0] += 1
        return _id_serial[0]
    import rdflib.plugins.memory
    rdflib.plugins.memory.randid = randid
patchRandid()

class Config(object):
    def __init__(self, masterGraph):
        self.graph = ConjunctiveGraph()
        log.info('read config')
        for f in os.listdir('config'):
            if f.startswith('.') or not f.endswith('.n3'): continue
            self.graph.parse('config/%s' % f, format='n3')
            log.info('  parsed %s', f)
        self.graph.bind('', ROOM)
        self.graph.bind('rdf', RDF)
        # config graph is too noisy; maybe make it a separate resource
        #masterGraph.patch(Patch(addGraph=self.graph))

class Board(object):
    """similar to arduinoNode.Board but without the communications stuff"""
    def __init__(self, graph, masterGraph, uri):
        self.graph, self.uri = graph, uri
        self.masterGraph = masterGraph
        self.masterGraph.patch(Patch(addQuads=self.staticStmts()))
        self.pi = pigpio.pi()
        self._devs = devices.makeDevices(graph, self.uri, self.pi)
        log.debug('found %s devices', len(self._devs))
        self._statementsFromInputs = {} # input device uri: latest statements
        self._lastPollTime = {} # input device uri: time()
        self._influx = InfluxExporter(self.graph)
        for d in self._devs:
            self.syncMasterGraphToHostStatements(d)
            
    def startPolling(self):
        task.LoopingCall(self._poll).start(.05)

    def _poll(self):
        try:
            self._pollMaybeError()
        except Exception:
            log.exception("During poll:")
            
    def _pollMaybeError(self):
        pollTime = {} # uri: sec
        for i in self._devs:
            now = time.time()
            if (hasattr(i, 'pollPeriod') and
                self._lastPollTime.get(i.uri, 0) + i.pollPeriod > now):
                continue
            new = i.poll()
            pollTime[i.uri] = time.time() - now
            if isinstance(new, dict): # new style
                oneshot = new['oneshot']
                new = new['latest']
            else:
                oneshot = None
            prev = self._statementsFromInputs.get(i.uri, set())

            if new or prev:
                self._statementsFromInputs[i.uri] = new
                # it's important that quads from different devices
                # don't clash, since that can lead to inconsistent
                # patches (e.g.
                #   dev1 changes value from 1 to 2;
                #   dev2 changes value from 2 to 3;
                #   dev1 changes from 2 to 4 but this patch will
                #     fail since the '2' statement is gone)
                self.masterGraph.patch(Patch.fromDiff(inContext(prev, i.uri),
                                                      inContext(new, i.uri)))

            if oneshot:
                self._sendOneshot(oneshot)
            self._lastPollTime[i.uri] = now
        if log.isEnabledFor(logging.DEBUG):
            log.debug('poll times:')
            for u, s in sorted(pollTime.items()):
                log.debug("  %.4f ms %s", s * 1000, u)
            log.debug('total poll time: %f ms', sum(pollTime.values()) * 1000)
        self._influx.exportToInflux(
            set.union(*[set(v) for v in self._statementsFromInputs.values()]))

    def _sendOneshot(self, oneshot):
        body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                         for s,p,o in oneshot)).encode('utf8')
        url = 'http://[%s]:9071/oneShot' % bang6
        d = fetch(method='POST',
                  url=url,
                  headers={'Content-Type': ['text/n3']},
                  postdata=body,
                  timeout=5)
        def err(e):
            log.info('oneshot post to %r failed:  %s',
                     url, e.getErrorMessage())
        d.addErrback(err)

    def outputStatements(self, stmts):
        unused = set(stmts)
        for dev in self._devs:
            stmtsForDev = []
            for pat in dev.outputPatterns():
                if [term is None for term in pat] != [False, False, True]:
                    raise NotImplementedError
                for stmt in stmts:
                    if stmt[:2] == pat[:2]:
                        stmtsForDev.append(stmt)
                        unused.discard(stmt)
            if stmtsForDev:
                log.info("output goes to action handler for %s" % dev.uri)
                dev.sendOutput(stmtsForDev)

                # Dev *could* change hostStatements at any time, and
                # we're not currently tracking that, but the usual is
                # to change them in response to sendOutput so this
                # should be good enough. The right answer is to give
                # each dev the masterGraph for it to write to.
                self.syncMasterGraphToHostStatements(dev)
                log.info("output and masterGraph sync complete")
        if unused:
            log.info("Board %s doesn't care about these statements:", self.uri)
            for s in unused:
                log.warn("%r", s)

    def syncMasterGraphToHostStatements(self, dev):
        hostStmtCtx = URIRef(dev.uri + '/host')
        newQuads = inContext(dev.hostStatements(), hostStmtCtx)
        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
        log.debug("patch master with these host stmts %s", p)

    def staticStmts(self):
        return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)]

    def description(self):
        """for web page"""
        return {
            'uri': self.uri,
            'devices': [d.description() for d in self._devs],
            'graph': 'http://sticker:9059/graph', #todo
            }
        
class Dot(cyclone.web.RequestHandler):
    def get(self):
        configGraph = self.settings.config.graph
        dot = dotrender.render(configGraph, self.settings.boards)
        self.write(dot)
        
def rdfGraphBody(body, headers):
    g = Graph()
    g.parse(StringInputSource(body), format='nt')
    return g

class OutputPage(cyclone.web.RequestHandler):
    def put(self):
        arg = self.request.arguments
        if arg.get('s') and arg.get('p'):
            subj = URIRef(arg['s'][-1])
            pred = URIRef(arg['p'][-1])
            turtleLiteral = self.request.body
            try:
                obj = Literal(float(turtleLiteral))
            except ValueError:
                obj = Literal(turtleLiteral)
            stmt = (subj, pred, obj)
        else:
            g = rdfGraphBody(self.request.body, self.request.headers)
            assert len(g) == 1, len(g)
            stmt = g.triples((None, None, None)).next()

        self.settings.board.outputStatements([stmt])

class Boards(cyclone.web.RequestHandler):
    def get(self):
        self.set_header('Content-type', 'application/json')
        self.write(json.dumps({
            'host': hostname,
            'boards': [self.settings.board.description()]
        }, indent=2))
        
def main():
    arg = docopt("""
    Usage: piNode.py [options]

    -v   Verbose
    """)
    log.setLevel(logging.WARN)
    if arg['-v']:
        from twisted.python import log as twlog
        twlog.startLogging(sys.stdout)

        log.setLevel(logging.DEBUG)

    masterGraph = PatchableGraph()
    config = Config(masterGraph)

    thisHost = Literal(hostname)
    for row in config.graph.query(
            'SELECT ?board WHERE { ?board a :PiBoard; :hostname ?h }',
            initBindings=dict(h=thisHost)):
        thisBoard = row.board
        break
    else:
        raise ValueError("config had no board for :hostname %r" % thisHost)

    log.info("found config for board %r" % thisBoard)
    board = Board(config.graph, masterGraph, thisBoard)
    board.startPolling()
    
    reactor.listenTCP(9059, cyclone.web.Application([
        (r"/()", cyclone.web.StaticFileHandler, {
            "path": "../arduinoNode/static", "default_filename": "index.html"}),
        (r'/static/(.*)', cyclone.web.StaticFileHandler, {"path": "../arduinoNode/static"}),
        (r'/boards', Boards),
        (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
        (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
        (r'/output', OutputPage),
        (r'/dot', Dot),
        ], config=config, board=board, debug=arg['-v']), interface='::')
    log.warn('serving on 9059')
    reactor.run()

main()