Mercurial > code > home > repos > homeauto
view service/reasoning/actions.py @ 1412:302063bfb8ff
reasoning web page uses rdf/browse/graphView for inputs and outputs now
Ignore-this: 64b7275ee149f631b606320444a3478b
darcs-hash:1ec4d67abed3d43997fae5f10d4bc23ee1a6b2d5
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 24 Jul 2019 00:36:16 -0700 |
parents | 21d0cd98ef7a |
children | e157afd642b5 |
line wrap: on
line source
from rdflib import URIRef, Namespace, RDF, Literal from twisted.internet import reactor import logging import urllib import json import time import treq log = logging.getLogger('output') ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") class HttpPutOutput(object): def __init__(self, url, mockOutput=False): self.url = url self.mockOutput = mockOutput self.payload = None self.foafAgent = None self.nextCall = None self.lastErr = None self.numRequests = 0 def report(self): return { 'url': self.url, 'urlAbbrev': self.url .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':') .replace('http://projects.bigasterisk.com/room/', ':') .replace('.vpn-home.bigasterisk.com', '.vpn-home'), 'payload': self.payload, 'numRequests': self.numRequests, 'lastChangeTime': round(self.lastChangeTime, 2), 'lastErr': str(self.lastErr) if self.lastErr is not None else None, } def setPayload(self, payload, foafAgent): if self.numRequests > 0 and (self.payload == payload and self.foafAgent == foafAgent): return self.payload = payload self.foafAgent = foafAgent self.lastChangeTime = time.time() self.makeRequest() def makeRequest(self): if self.payload is None: log.debug("PUT None to %s - waiting", self.url) return h = {} if self.foafAgent: h['x-foaf-agent'] = self.foafAgent if self.nextCall and self.nextCall.active(): self.nextCall.cancel() self.nextCall = None self.lastErr = None log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent) if not self.mockOutput: self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3) self.currentRequest.addCallback(self.onResponse).addErrback(self.onError) else: reactor.callLater(.2, self.onResponse, None) self.numRequests += 1 def onResponse(self, resp): log.debug(" PUT %s ok", self.url) self.lastErr = None self.currentRequest = None self.nextCall = reactor.callLater(30, self.makeRequest) def onError(self, err): self.lastErr = err log.debug(' PUT %s failed: %s', self.url, err) self.currentRequest = None self.nextCall = reactor.callLater(50, self.makeRequest) class HttpPutOutputs(object): """these grow forever""" def __init__(self, mockOutput=False): self.mockOutput = mockOutput self.state = {} # url: HttpPutOutput def put(self, url, payload, foafAgent): if url not in self.state: self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput) self.state[url].setPayload(payload, foafAgent) class Actions(object): def __init__(self, sendToLiveClients, mockOutput=False): self.mockOutput = mockOutput self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) self.sendToLiveClients = sendToLiveClients def putResults(self, deviceGraph, inferred): """ some conclusions in the inferred graph lead to PUT requests getting made if the graph contains (?d ?p ?o) and ?d and ?p are a device and predicate we support PUTs for, then we look up (?d :putUrl ?url) and (?o :putValue ?val) and call PUT ?url <- ?val If the graph doesn't contain any matches, we use (?d :zeroValue ?val) for the value and PUT that. """ activated = set() # (subj,pred) pairs for which we're currently putting some value activated.update(self._putDevices(deviceGraph, inferred)) self._oneShotPostActions(deviceGraph, inferred) for dev, pred in [ #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState), (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState), (URIRef('http://bigasterisk.com/host/frontdoor/monitor'), ROOM.powerState), (ROOM['storageCeilingLedLong'], ROOM.brightness), (ROOM['storageCeilingLedCross'], ROOM.brightness), (ROOM['garageOverhead'], ROOM.brightness), (ROOM['headboardWhite'], ROOM.brightness), (ROOM['changingWhite'], ROOM.brightness), (ROOM['starTrekLight'], ROOM.brightness), (ROOM['kitchenLight'], ROOM.brightness), (ROOM['kitchenCounterLight'], ROOM.brightness), (ROOM['livingRoomLamp1'], ROOM.brightness), (ROOM['livingRoomLamp2'], ROOM.brightness), (ROOM['loftDeskStrip'], ROOM.x), (ROOM['bedLedStrip'], ROOM.color), ]: url = deviceGraph.value(dev, ROOM.putUrl) log.debug('inferredObjects of dev=%s pred=%s', deviceGraph.qname(dev), deviceGraph.qname(pred)) inferredObjects = list(inferred.objects(dev, pred)) if len(inferredObjects) == 0: # rm this- use activated instead self._putZero(deviceGraph, dev, pred, url) elif len(inferredObjects) == 1: log.debug(' inferredObject: %s %s %r', deviceGraph.qname(dev), deviceGraph.qname(pred), inferredObjects[0].toPython()) activated.add((dev, pred)) self._putInferred(deviceGraph, url, inferredObjects[0]) elif len(inferredObjects) > 1: log.info(" conflict, ignoring: %s has %s of %s" % (dev, pred, inferredObjects)) # write about it to the inferred graph? self.putDefaults(deviceGraph, activated) def putDefaults(self, deviceGraph, activated): """ If inferring (:a :b :c) would cause a PUT, you can say reasoning:defaultOutput reasoning:default [ :subject :a :predicate :b :defaultObject :c ] and we'll do that PUT if no rule has put anything else with (:a :b *). """ defaultStmts = set() for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], REASONING['default']): s = deviceGraph.value(defaultDesc, ROOM['subject']) p = deviceGraph.value(defaultDesc, ROOM['predicate']) if (s, p) not in activated: obj = deviceGraph.value(defaultDesc, ROOM['defaultObject']) defaultStmts.add((s, p, obj)) log.debug('defaultStmts %s %s %s', s, p, obj) self._putDevices(deviceGraph, defaultStmts) def _oneShotPostActions(self, deviceGraph, inferred): """ Inferred graph may contain some one-shot statements. We'll send statement objects to anyone on web sockets, and also generate POST requests as described in the graph. one-shot statement ?s ?p ?o with this in the graph: ?osp a :OneShotPost ?osp :subject ?s ?osp :predicate ?p this will cause a post to ?o """ # nothing in this actually makes them one-shot yet. they'll # just fire as often as we get in here, which is not desirable log.debug("_oneShotPostActions") def err(e): log.warn("post %s failed", postTarget) for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): s = deviceGraph.value(osp, ROOM['subject']) p = deviceGraph.value(osp, ROOM['predicate']) if s is None or p is None: continue #log.info("checking for %s %s", s, p) for postTarget in inferred.objects(s, p): log.debug("post target %r", postTarget) # this packet ought to have 'oneShot' in it somewhere self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) log.debug(" POST %s", postTarget) if not self.mockOutput: treq.post(postTarget, timeout=2).addErrback(err) def _putDevices(self, deviceGraph, inferred): activated = set() agentFor = {} for stmt in inferred: if stmt[1] == ROOM['putAgent']: agentFor[stmt[0]] = stmt[2] for stmt in inferred: log.debug('inferred stmt we might PUT: %s', stmt) putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], default=putPred) if putUrl and matchPred == stmt[1]: log.debug('putDevices: stmt %r %r %r leds to putting at %r', stmt[0], stmt[1], stmt[2], putUrl) self._put(putUrl + '?' + urllib.urlencode([ ('s', str(stmt[0])), ('p', str(putPred))]), str(stmt[2].toPython()), agent=agentFor.get(stmt[0], None)) activated.add((stmt[0], # didn't test that this should be # stmt[1] and not putPred stmt[1])) return activated def _putInferred(self, deviceGraph, putUrl, obj): """ HTTP PUT to putUrl, with a payload that's either obj's :putValue or obj itself. """ value = deviceGraph.value(obj, ROOM.putValue) if value is not None: self._put(putUrl, payload=str(value)) elif isinstance(obj, Literal): self._put(putUrl, payload=str(obj)) else: log.warn(" don't know what payload to put for %s. obj=%r", putUrl, obj) def _putZero(self, deviceGraph, dev, pred, putUrl): # zerovalue should be a function of pred as well. value = deviceGraph.value(dev, ROOM.zeroValue) if value is not None: log.debug(" put zero (%r) to %s", value.toPython(), putUrl) self._put(putUrl, payload=str(value)) # this should be written back into the inferred graph # for feedback def _put(self, url, payload, agent=None): assert isinstance(payload, bytes) self.putOutputs.put(url, payload, agent) import cyclone.sse class PutOutputsTable(cyclone.sse.SSEHandler): def __init__(self, application, request): cyclone.sse.SSEHandler.__init__(self, application, request) self.actions = self.settings.reasoning.actions def bind(self, *args, **kwargs): self.bound = True self.loop() def unbind(self): self.bound = False def loop(self): if not self.bound: return self.sendEvent(message=json.dumps({ 'puts': [row.report() for _, row in sorted(self.actions.putOutputs.state.items())], }), event='update') reactor.callLater(1, self.loop)