Mercurial > code > home > repos > homeauto
changeset 287:3b61c0dfaaef
switch from evtiming to greplin.scales. Optimize rules reader to reuse previous data (400ms -> 0.6ms)
Ignore-this: a655f4c56db51b09b3f14d7f09e354cb
author | drewp@bigasterisk.com |
---|---|
date | Mon, 09 May 2016 00:32:08 -0700 |
parents | da0b3a1394a3 |
children | e03696277b32 |
files | service/reasoning/inference.py service/reasoning/reasoning.py service/reasoning/requirements.txt |
diffstat | 3 files changed, 68 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/inference.py Sun May 08 03:05:27 2016 -0700 +++ b/service/reasoning/inference.py Mon May 09 00:32:08 2016 -0700 @@ -2,7 +2,7 @@ see ./reasoning for usage """ -import sys, os +import sys, os, contextlib try: from rdflib.Graph import Graph except ImportError: @@ -18,62 +18,80 @@ from rdflib import plugin, Namespace from rdflib.store import Store -sys.path.append('../../../ffg/ffg') -import evtiming +from greplin import scales +STATS = scales.collection('/web', + scales.PmfStat('readRules')) from escapeoutputstatements import escapeOutputStatements ROOM = Namespace("http://projects.bigasterisk.com/room/") +def _loadAndEscape(ruleStore, n3, outputPatterns): + ruleGraph = Graph(ruleStore) + + # Can't escapeOutputStatements in the ruleStore since it + # doesn't support removals. Can't copy plainGraph into + # ruleGraph since something went wrong with traversing the + # triples inside quoted graphs, and I lose all the bodies + # of my rules. This serialize/parse version is very slow (400ms), + # but it only runs when the file changes. + plainGraph = Graph() + plainGraph.parse(StringInputSource(n3), format='n3') # for inference + escapeOutputStatements(plainGraph, outputPatterns=outputPatterns) + expandedN3 = plainGraph.serialize(format='n3') + + ruleGraph.parse(StringInputSource(expandedN3), format='n3') + _rulesCache = (None, None, None, None) -@evtiming.serviceLevel.timed('readRules') def readRules(rulesPath, outputPatterns): """ - returns (rulesN3, ruleGraph) + returns (rulesN3, ruleStore) This includes escaping certain statements in the output (implied) subgraaphs so they're not confused with input statements. """ global _rulesCache - mtime = os.path.getmtime(rulesPath) - key = (rulesPath, mtime) - if _rulesCache[:2] == key: - _, _, rulesN3, expandedN3 = _rulesCache - else: - rulesN3 = open(rulesPath).read() # for web display - plainGraph = Graph() - plainGraph.parse(StringInputSource(rulesN3), - format='n3') # for inference - escapeOutputStatements(plainGraph, outputPatterns=outputPatterns) - expandedN3 = plainGraph.serialize(format='n3') - _rulesCache = key + (rulesN3, expandedN3) + with STATS.readRules.time(): + mtime = os.path.getmtime(rulesPath) + key = (rulesPath, mtime) + if _rulesCache[:2] == key: + _, _, rulesN3, ruleStore = _rulesCache + else: + rulesN3 = open(rulesPath).read() # for web display - # the rest needs to happen each time since inference is - # consuming the ruleGraph somehow - ruleStore = N3RuleStore() - ruleGraph = Graph(ruleStore) - - ruleGraph.parse(StringInputSource(expandedN3), format='n3') - log.debug('%s rules' % len(ruleStore.rules)) - return rulesN3, ruleGraph + ruleStore = N3RuleStore() + _loadAndEscape(ruleStore, rulesN3, outputPatterns) + log.debug('%s rules' % len(ruleStore.rules)) + + _rulesCache = key + (rulesN3, ruleStore) + return rulesN3, ruleStore def infer(graph, rules): """ - returns new graph of inferred statements + returns new graph of inferred statements. Plain rete api seems to + alter rules.formulae and rules.rules, but this function does not + alter the incoming rules object, so you can cache it. """ # based on fuxi/tools/rdfpipe.py - store = plugin.get('IOMemory',Store)() - store.open('') - target = Graph() tokenSet = generateTokenSet(graph) - network = ReteNetwork(rules, inferredTarget=target) - network.feedFactsToAdd(tokenSet) - - store.rollback() + with _dontChangeRulesStore(rules): + network = ReteNetwork(rules, inferredTarget=target) + network.feedFactsToAdd(tokenSet) + return target +@contextlib.contextmanager +def _dontChangeRulesStore(rules): + if not hasattr(rules, '_stashOriginalRules'): + rules._stashOriginalRules = rules.rules[:] + yield + for k in rules.formulae.keys(): + if not k.startswith('_:Formula'): + del rules.formulae[k] + rules.rules = rules._stashOriginalRules[:] + import time, logging log = logging.getLogger() def logTime(func):
--- a/service/reasoning/reasoning.py Sun May 08 03:05:27 2016 -0700 +++ b/service/reasoning/reasoning.py Mon May 09 00:32:08 2016 -0700 @@ -26,6 +26,10 @@ from twisted.internet.defer import inlineCallbacks import cyclone.web, cyclone.websocket +sys.path.append("scales/src") +from greplin import scales +from greplin.scales.cyclonehandler import StatsHandler + from inference import infer, readRules from actions import Actions from inputgraph import InputGraph @@ -35,14 +39,15 @@ from logsetup import log -sys.path.append('../../../ffg/ffg') -import evtiming - ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") NS = {'': ROOM, 'dev': DEV} +STATS = scales.collection('/web', + scales.PmfStat('poll'), + scales.PmfStat('graphChanged')) + class Reasoning(object): def __init__(self): self.prevGraph = None @@ -58,21 +63,21 @@ self.inputGraph.updateFileData() @inlineCallbacks + @STATS.poll.time() def poll(self): - t1 = time.time() try: yield self.inputGraph.updateRemoteData() self.lastPollTime = time.time() except Exception, e: log.error(traceback.format_exc()) self.lastError = str(e) - evtiming.serviceLevel.addData('poll', time.time() - t1) + def updateRules(self): rulesPath = 'rules.n3' try: t1 = time.time() - self.rulesN3, self.ruleGraph = readRules( + self.rulesN3, self.ruleStore = readRules( rulesPath, outputPatterns=[ # Incomplete. See escapeoutputstatements.py for # explanation. @@ -81,7 +86,6 @@ (None, ROOM['powerState'], None), (None, ROOM['state'], None), ]) - self._readRules(rulesPath) ruleParseTime = time.time() - t1 except ValueError: # this is so if you're just watching the inferred output, @@ -93,7 +97,7 @@ return [(ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))], ruleParseTime - evtiming.serviceLevel.timed('graphChanged') + @STATS.graphChanged.time() def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None): """ If we're getting called for a oneShot event, the oneShotGraph @@ -108,11 +112,11 @@ oldInferred = self.inferred try: ruleStatStmts, ruleParseSec = self.updateRules() - + self.inferred = self._makeInferred(inputGraph.getGraph()) self.inferred += unquoteOutputStatements(self.inferred) - + self.inferred += ruleStatStmts if oneShot: @@ -146,9 +150,9 @@ return out + class Index(cyclone.web.RequestHandler): def get(self): - print evtiming.serviceLevel.serviceJsonReport() # make sure GET / fails if our poll loop died ago = time.time() - self.settings.reasoning.lastPollTime @@ -276,6 +280,7 @@ (r'/rules', Rules), (r'/status', Status), (r'/events', Events), + (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}), ] cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)
--- a/service/reasoning/requirements.txt Sun May 08 03:05:27 2016 -0700 +++ b/service/reasoning/requirements.txt Mon May 09 00:32:08 2016 -0700 @@ -3,3 +3,4 @@ docopt==0.6.2 rdflib==4.2.1 git+http://github.com/drewp/FuXi.git@22263b0751a29839013ce43646dd18302c7b9bb1#egg=FuXi +git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales