changeset 1092:54de5144900d

switch from evtiming to greplin.scales. Optimize rules reader to reuse previous data (400ms -> 0.6ms) Ignore-this: a655f4c56db51b09b3f14d7f09e354cb darcs-hash:4ffd7012f404392375434243104eba065ffb8086
author drewp <drewp@bigasterisk.com>
date Mon, 09 May 2016 00:32:08 -0700
parents 352ecf3c9aea
children e3ae71fbd3bd
files service/reasoning/inference.py service/reasoning/reasoning.py service/reasoning/requirements.txt
diffstat 3 files changed, 68 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/inference.py	Sun May 08 03:05:27 2016 -0700
+++ b/service/reasoning/inference.py	Mon May 09 00:32:08 2016 -0700
@@ -2,7 +2,7 @@
 see ./reasoning for usage
 """
 
-import sys, os
+import sys, os, contextlib
 try:
     from rdflib.Graph import Graph
 except ImportError:
@@ -18,62 +18,80 @@
 from rdflib import plugin, Namespace
 from rdflib.store import Store
 
-sys.path.append('../../../ffg/ffg')
-import evtiming
+from greplin import scales 
+STATS = scales.collection('/web',
+                          scales.PmfStat('readRules'))
 
 from escapeoutputstatements import escapeOutputStatements
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 
+def _loadAndEscape(ruleStore, n3, outputPatterns):
+    ruleGraph = Graph(ruleStore)
+
+    # Can't escapeOutputStatements in the ruleStore since it
+    # doesn't support removals. Can't copy plainGraph into
+    # ruleGraph since something went wrong with traversing the
+    # triples inside quoted graphs, and I lose all the bodies
+    # of my rules. This serialize/parse version is very slow (400ms),
+    # but it only runs when the file changes.
+    plainGraph = Graph()
+    plainGraph.parse(StringInputSource(n3), format='n3') # for inference
+    escapeOutputStatements(plainGraph, outputPatterns=outputPatterns)
+    expandedN3 = plainGraph.serialize(format='n3')
+
+    ruleGraph.parse(StringInputSource(expandedN3), format='n3')
+
 _rulesCache = (None, None, None, None)
-@evtiming.serviceLevel.timed('readRules')
 def readRules(rulesPath, outputPatterns):
     """
-    returns (rulesN3, ruleGraph)
+    returns (rulesN3, ruleStore)
 
     This includes escaping certain statements in the output
     (implied) subgraaphs so they're not confused with input
     statements.
     """
     global _rulesCache
-    mtime = os.path.getmtime(rulesPath)
-    key = (rulesPath, mtime)
-    if _rulesCache[:2] == key:
-        _, _, rulesN3, expandedN3 = _rulesCache
-    else:
-        rulesN3 = open(rulesPath).read() # for web display
 
-        plainGraph = Graph()
-        plainGraph.parse(StringInputSource(rulesN3),
-                         format='n3') # for inference
-        escapeOutputStatements(plainGraph, outputPatterns=outputPatterns)
-        expandedN3 = plainGraph.serialize(format='n3')
-        _rulesCache = key + (rulesN3, expandedN3)
+    with STATS.readRules.time():
+        mtime = os.path.getmtime(rulesPath)
+        key = (rulesPath, mtime)
+        if _rulesCache[:2] == key:
+            _, _, rulesN3, ruleStore = _rulesCache
+        else:
+            rulesN3 = open(rulesPath).read() # for web display
 
-    # the rest needs to happen each time since inference is
-    # consuming the ruleGraph somehow
-    ruleStore = N3RuleStore()
-    ruleGraph = Graph(ruleStore)
-
-    ruleGraph.parse(StringInputSource(expandedN3), format='n3')
-    log.debug('%s rules' % len(ruleStore.rules))
-    return rulesN3, ruleGraph
+            ruleStore = N3RuleStore()
+            _loadAndEscape(ruleStore, rulesN3, outputPatterns)
+            log.debug('%s rules' % len(ruleStore.rules))
+            
+            _rulesCache = key + (rulesN3, ruleStore)
+        return rulesN3, ruleStore
 
 def infer(graph, rules):
     """
-    returns new graph of inferred statements
+    returns new graph of inferred statements. Plain rete api seems to
+    alter rules.formulae and rules.rules, but this function does not
+    alter the incoming rules object, so you can cache it.
     """
     # based on fuxi/tools/rdfpipe.py
-    store = plugin.get('IOMemory',Store)()        
-    store.open('')
-
     target = Graph()
     tokenSet = generateTokenSet(graph)
-    network = ReteNetwork(rules, inferredTarget=target)
-    network.feedFactsToAdd(tokenSet)
-
-    store.rollback()
+    with _dontChangeRulesStore(rules):
+        network = ReteNetwork(rules, inferredTarget=target)
+        network.feedFactsToAdd(tokenSet)
+    
     return target
 
+@contextlib.contextmanager
+def _dontChangeRulesStore(rules):
+    if not hasattr(rules, '_stashOriginalRules'):
+        rules._stashOriginalRules = rules.rules[:]
+    yield
+    for k in rules.formulae.keys():
+        if not k.startswith('_:Formula'):
+            del rules.formulae[k]
+    rules.rules = rules._stashOriginalRules[:]
+    
 import time, logging
 log = logging.getLogger()
 def logTime(func):
--- a/service/reasoning/reasoning.py	Sun May 08 03:05:27 2016 -0700
+++ b/service/reasoning/reasoning.py	Mon May 09 00:32:08 2016 -0700
@@ -26,6 +26,10 @@
 from twisted.internet.defer import inlineCallbacks
 import cyclone.web, cyclone.websocket
 
+sys.path.append("scales/src")
+from greplin import scales
+from greplin.scales.cyclonehandler import StatsHandler
+
 from inference import infer, readRules
 from actions import Actions
 from inputgraph import InputGraph
@@ -35,14 +39,15 @@
 from logsetup import log
 
 
-sys.path.append('../../../ffg/ffg')
-import evtiming
-
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
 NS = {'': ROOM, 'dev': DEV}
 
+STATS = scales.collection('/web',
+                          scales.PmfStat('poll'),
+                          scales.PmfStat('graphChanged'))
+
 class Reasoning(object):
     def __init__(self):
         self.prevGraph = None
@@ -58,21 +63,21 @@
         self.inputGraph.updateFileData()
 
     @inlineCallbacks
+    @STATS.poll.time()
     def poll(self):
-        t1 = time.time()
         try:
             yield self.inputGraph.updateRemoteData()
             self.lastPollTime = time.time()
         except Exception, e:
             log.error(traceback.format_exc())
             self.lastError = str(e)
-        evtiming.serviceLevel.addData('poll', time.time() - t1)
+
 
     def updateRules(self):
         rulesPath = 'rules.n3'
         try:
             t1 = time.time()
-            self.rulesN3, self.ruleGraph = readRules(
+            self.rulesN3, self.ruleStore = readRules(
                 rulesPath, outputPatterns=[
                     # Incomplete. See escapeoutputstatements.py for
                     # explanation.
@@ -81,7 +86,6 @@
                     (None, ROOM['powerState'], None),
                     (None, ROOM['state'], None),
                 ])
-            self._readRules(rulesPath)
             ruleParseTime = time.time() - t1
         except ValueError:
             # this is so if you're just watching the inferred output,
@@ -93,7 +97,7 @@
         return [(ROOM['reasoner'], ROOM['ruleParseTime'],
                  Literal(ruleParseTime))], ruleParseTime
 
-    evtiming.serviceLevel.timed('graphChanged')
+    @STATS.graphChanged.time()
     def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
         """
         If we're getting called for a oneShot event, the oneShotGraph
@@ -108,11 +112,11 @@
         oldInferred = self.inferred
         try:
             ruleStatStmts, ruleParseSec = self.updateRules()
-            
+
             self.inferred = self._makeInferred(inputGraph.getGraph())
 
             self.inferred += unquoteOutputStatements(self.inferred)
-            
+
             self.inferred += ruleStatStmts
 
             if oneShot:
@@ -146,9 +150,9 @@
         return out
 
 
+        
 class Index(cyclone.web.RequestHandler):
     def get(self):
-        print evtiming.serviceLevel.serviceJsonReport()
 
         # make sure GET / fails if our poll loop died
         ago = time.time() - self.settings.reasoning.lastPollTime
@@ -276,6 +280,7 @@
             (r'/rules', Rules),
             (r'/status', Status),
             (r'/events', Events),
+            (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}),
         ]
         cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)
 
--- a/service/reasoning/requirements.txt	Sun May 08 03:05:27 2016 -0700
+++ b/service/reasoning/requirements.txt	Mon May 09 00:32:08 2016 -0700
@@ -3,3 +3,4 @@
 docopt==0.6.2
 rdflib==4.2.1
 git+http://github.com/drewp/FuXi.git@22263b0751a29839013ce43646dd18302c7b9bb1#egg=FuXi
+git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales