view service/reasoning/reasoning.py @ 1083:c5b61f943061

try a 2-mode rule for redButton, but this can't work as-is Ignore-this: 391356b778a41a28bb99097af42d65a4 darcs-hash:aa3db688cfa98036b7527a92059ba9cc4703c636
author drewp <drewp@bigasterisk.com>
date Fri, 06 May 2016 17:31:20 -0700
parents 4d16fa39d54a
children c192d37b2bc8
line wrap: on
line source

#!bin/python
"""
Graph consists of:
  input/* (read at startup)
  webinput/* (new files are noticed in here)
  any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly

gather subgraphs from various services, run them through a rules
engine, and make http requests with the conclusions.

E.g. 'when drew's phone is near the house, and someone is awake,
unlock the door when the door's motion sensor is activated'

When do we gather? The services should be able to trigger us, perhaps
with PSHB, that their graph has changed.
"""


from twisted.internet import reactor, task
from twisted.internet.defer import inlineCallbacks, gatherResults
import time, traceback, sys, json, logging
from rdflib import Graph, ConjunctiveGraph
from rdflib import Namespace, URIRef, Literal, RDF
from rdflib.parser import StringInputSource

import cyclone.web, cyclone.websocket
from inference import infer
from docopt import docopt
from actions import Actions
from inputgraph import InputGraph
from FuXi.Rete.RuleStore import N3RuleStore

sys.path.append("../../lib")
from logsetup import log
log.setLevel(logging.WARN)
outlog = logging.getLogger('output')
outlog.setLevel(logging.WARN)
fetchlog = logging.getLogger('fetch')
fetchlog.setLevel(logging.WARN)

sys.path.append('../../../ffg/ffg')
import evtiming

ROOM = Namespace("http://projects.bigasterisk.com/room/")
DEV = Namespace("http://projects.bigasterisk.com/device/")

        
class Reasoning(object):
    def __init__(self):
        self.prevGraph = None
        self.lastPollTime = 0
        self.lastError = ""

        self.actions = Actions(sendToLiveClients)

        self.rulesN3 = "(not read yet)"
        self.inferred = Graph() # gets replaced in each graphChanged call

        self.inputGraph = InputGraph([], self.graphChanged)      
        self.inputGraph.updateFileData()

    @evtiming.serviceLevel.timed('readRules')
    def readRules(self):
        self.rulesN3 = open('rules.n3').read() # for web display
        self.ruleStore = N3RuleStore()
        self.ruleGraph = Graph(self.ruleStore)
        self.ruleGraph.parse('rules.n3', format='n3') # for inference

    @inlineCallbacks
    def poll(self):
        t1 = time.time()
        try:
            yield self.inputGraph.updateRemoteData()
            self.lastPollTime = time.time()
        except Exception, e:
            log.error(traceback.format_exc())
            self.lastError = str(e)
        evtiming.serviceLevel.addData('poll', time.time() - t1)

    def updateRules(self):
        try:
            t1 = time.time()
            self.readRules()
            ruleParseTime = time.time() - t1
        except ValueError:
            # this is so if you're just watching the inferred output,
            # you'll see the error too
            self.inferred = Graph()
            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
                               Literal(traceback.format_exc())))
            raise
        return [(ROOM['reasoner'], ROOM['ruleParseTime'],
                               Literal(ruleParseTime))]

    evtiming.serviceLevel.timed('graphChanged')
    def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
        t1 = time.time()
        oldInferred = self.inferred
        try:
            ruleStmts = self.updateRules()
            
            g = inputGraph.getGraph()
            self.inferred = self._makeInferred(g)
            [self.inferred.add(s) for s in ruleStmts]

            if oneShot:
                # unclear where this should go, but the oneshot'd
                # statements should be just as usable as inferred
                # ones.
                for s in oneShotGraph:
                    self.inferred.add(s)

            t2 = time.time()
            self.actions.putResults(self.inputGraph.getGraph(), self.inferred)
            putResultsTime = time.time() - t2
        finally:
            if oneShot:
                self.inferred = oldInferred
        log.info("graphChanged %.1f ms (putResults %.1f ms)" %
                 ((time.time() - t1) * 1000,
                  putResultsTime * 1000))

    def _makeInferred(self, inputGraph):
        t1 = time.time()
        out = infer(inputGraph, self.ruleStore)
        inferenceTime = time.time() - t1

        out.add((ROOM['reasoner'], ROOM['inferenceTime'],
                 Literal(inferenceTime)))
        return out



class Index(cyclone.web.RequestHandler):
    def get(self):
        print evtiming.serviceLevel.serviceJsonReport()

        # make sure GET / fails if our poll loop died
        ago = time.time() - self.settings.reasoning.lastPollTime
        if ago > 2:
            self.set_status(500)
            self.finish("last poll was %s sec ago. last error: %s" %
                        (ago, self.settings.reasoning.lastError))
            return
        self.set_header("Content-Type", "text/html")
        self.write(open('index.html').read())

class ImmediateUpdate(cyclone.web.RequestHandler):
    @inlineCallbacks
    def put(self):
        """
        request an immediate load of the remote graphs; the thing we
        do in the background anyway. No payload.

        Using PUT because this is idempotent and retryable and
        everything.

        todo: this should do the right thing when many requests come
        in very quickly
        """
        print self.request.headers
        log.info("immediateUpdate from %s",
                 self.request.headers.get('User-Agent', '?'))
        yield r.poll()
        self.set_status(202)

def parseRdf(text, contentType):
    g = Graph()
    g.parse(StringInputSource(text), format={
        'text/n3': 'n3',
        }[contentType])
    return g

class OneShot(cyclone.web.RequestHandler):
    def post(self):
        """
        payload is an rdf graph. The statements are momentarily added
        to the input graph for exactly one update.

        todo: how do we go from a transition like doorclosed-to-open
        to a oneshot event? the upstream shouldn't have to do it. Do
        we make those oneshot events here? for every object change?
        there are probably special cases regarding startup time when
        everything appears to be a 'change'.
        """
        try:
            g = parseRdf(self.request.body, self.request.headers['content-type'])
            for s in g:
                log.debug("oneshot stmt %r", s)
            if not len(g):
                log.warn("incoming oneshot graph had no statements: %r", self.request.body)
                return
            t1 = time.time()
            self.settings.reasoning.inputGraph.addOneShot(g)
            self.set_header('x-graph-ms', str(1000 * (time.time() - t1)))
        except Exception as e:
            log.error(e)
            raise
            
# for reuse
class GraphResource(cyclone.web.RequestHandler):
    def get(self, which):
        self.set_header("Content-Type", "application/json")
        r = self.settings.reasoning
        g = {'lastInput': r.inputGraph.getGraph(),
             'lastOutput': r.inferred,
             }[which]
        self.write(self.jsonRdf(g))

    def jsonRdf(self, g):
        return json.dumps(sorted(list(g)))

class NtGraphs(cyclone.web.RequestHandler):
    """same as what gets posted above"""
    def get(self):
        r = self.settings.reasoning
        inputGraphNt = r.inputGraph.getGraph().serialize(format="nt")
        inferredNt = r.inferred.serialize(format="nt")
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps({"input": inputGraphNt,
                               "inferred": inferredNt}))

class Rules(cyclone.web.RequestHandler):
    def get(self):
        self.set_header("Content-Type", "text/plain")
        self.write(self.settings.reasoning.rulesN3)

class Status(cyclone.web.RequestHandler):
    def get(self):
        self.set_header("Content-Type", "text/plain")
        g = self.settings.reasoning.inputGraph.getGraph()
        msg = ""
        for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']):
            msg += "GET %s failed (%s). " % (
                badSource, g.value(badSource, ROOM['graphLoadError']))
        if not msg:
            self.finish("all inputs ok")
            return
        self.set_status(500)
        self.finish(msg)

class Static(cyclone.web.RequestHandler):
    def get(self, p):
        self.write(open(p).read())

liveClients = set()
def sendToLiveClients(d=None, asJson=None):
    j = asJson or json.dumps(d)
    for c in liveClients:
        c.sendMessage(j)

class Events(cyclone.websocket.WebSocketHandler):

    def connectionMade(self, *args, **kwargs):
        log.info("websocket opened")
        liveClients.add(self)

    def connectionLost(self, reason):
        log.info("websocket closed")
        liveClients.remove(self)

    def messageReceived(self, message):
        log.info("got message %s" % message)

class Application(cyclone.web.Application):
    def __init__(self, reasoning):
        handlers = [
            (r"/", Index),
            (r"/immediateUpdate", ImmediateUpdate),
            (r"/oneShot", OneShot),
            (r'/(jquery.min.js)', Static),
            (r'/(lastInput|lastOutput)Graph', GraphResource),
            (r'/ntGraphs', NtGraphs),
            (r'/rules', Rules),
            (r'/status', Status),
            (r'/events', Events),
        ]
        cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)

if __name__ == '__main__':

    arg = docopt("""
    Usage: reasoning.py [options]

    -v                Verbose (and slow updates)
    -f                Verbose log for fetching
    --source=<substr>  Limit sources to those with this string.
    """)
    
    r = Reasoning()
    if arg['-v']:
        from colorlog import ColoredFormatter
        log.handlers[0].setFormatter(ColoredFormatter("%(log_color)s%(levelname)-8s%(reset)s %(white)s%(message)s",
        datefmt=None,
        reset=True,
        log_colors={
                'DEBUG':    'cyan',
                'INFO':     'green',
                'WARNING':  'yellow',
                'ERROR':    'red',
                'CRITICAL': 'red,bg_white',
        },
        secondary_log_colors={},
        style='%'
))

        import twisted.python.log
        twisted.python.log.startLogging(sys.stdout)
        log.setLevel(logging.DEBUG)
        outlog.setLevel(logging.DEBUG)

    if arg['-f']:
        fetchlog.setLevel(logging.DEBUG)
        
    task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10)
    reactor.listenTCP(9071, Application(r), interface='::')
    reactor.run()