comparison service/reasoning/reasoning.py @ 287:3b61c0dfaaef

switch from evtiming to greplin.scales. Optimize rules reader to reuse previous data (400ms -> 0.6ms) Ignore-this: a655f4c56db51b09b3f14d7f09e354cb
author drewp@bigasterisk.com
date Mon, 09 May 2016 00:32:08 -0700
parents da0b3a1394a3
children e03696277b32
comparison
equal deleted inserted replaced
286:da0b3a1394a3 287:3b61c0dfaaef
24 from rdflib import Namespace, Literal, RDF, Graph 24 from rdflib import Namespace, Literal, RDF, Graph
25 from twisted.internet import reactor, task 25 from twisted.internet import reactor, task
26 from twisted.internet.defer import inlineCallbacks 26 from twisted.internet.defer import inlineCallbacks
27 import cyclone.web, cyclone.websocket 27 import cyclone.web, cyclone.websocket
28 28
29 sys.path.append("scales/src")
30 from greplin import scales
31 from greplin.scales.cyclonehandler import StatsHandler
32
29 from inference import infer, readRules 33 from inference import infer, readRules
30 from actions import Actions 34 from actions import Actions
31 from inputgraph import InputGraph 35 from inputgraph import InputGraph
32 from escapeoutputstatements import unquoteOutputStatements 36 from escapeoutputstatements import unquoteOutputStatements
33 37
34 sys.path.append("../../lib") 38 sys.path.append("../../lib")
35 from logsetup import log 39 from logsetup import log
36 40
37 41
38 sys.path.append('../../../ffg/ffg')
39 import evtiming
40
41 ROOM = Namespace("http://projects.bigasterisk.com/room/") 42 ROOM = Namespace("http://projects.bigasterisk.com/room/")
42 DEV = Namespace("http://projects.bigasterisk.com/device/") 43 DEV = Namespace("http://projects.bigasterisk.com/device/")
43 44
44 NS = {'': ROOM, 'dev': DEV} 45 NS = {'': ROOM, 'dev': DEV}
46
47 STATS = scales.collection('/web',
48 scales.PmfStat('poll'),
49 scales.PmfStat('graphChanged'))
45 50
46 class Reasoning(object): 51 class Reasoning(object):
47 def __init__(self): 52 def __init__(self):
48 self.prevGraph = None 53 self.prevGraph = None
49 self.lastPollTime = 0 54 self.lastPollTime = 0
56 61
57 self.inputGraph = InputGraph([], self.graphChanged) 62 self.inputGraph = InputGraph([], self.graphChanged)
58 self.inputGraph.updateFileData() 63 self.inputGraph.updateFileData()
59 64
60 @inlineCallbacks 65 @inlineCallbacks
66 @STATS.poll.time()
61 def poll(self): 67 def poll(self):
62 t1 = time.time()
63 try: 68 try:
64 yield self.inputGraph.updateRemoteData() 69 yield self.inputGraph.updateRemoteData()
65 self.lastPollTime = time.time() 70 self.lastPollTime = time.time()
66 except Exception, e: 71 except Exception, e:
67 log.error(traceback.format_exc()) 72 log.error(traceback.format_exc())
68 self.lastError = str(e) 73 self.lastError = str(e)
69 evtiming.serviceLevel.addData('poll', time.time() - t1) 74
70 75
71 def updateRules(self): 76 def updateRules(self):
72 rulesPath = 'rules.n3' 77 rulesPath = 'rules.n3'
73 try: 78 try:
74 t1 = time.time() 79 t1 = time.time()
75 self.rulesN3, self.ruleGraph = readRules( 80 self.rulesN3, self.ruleStore = readRules(
76 rulesPath, outputPatterns=[ 81 rulesPath, outputPatterns=[
77 # Incomplete. See escapeoutputstatements.py for 82 # Incomplete. See escapeoutputstatements.py for
78 # explanation. 83 # explanation.
79 (None, ROOM['brightness'], None), 84 (None, ROOM['brightness'], None),
80 (None, ROOM['playState'], None), 85 (None, ROOM['playState'], None),
81 (None, ROOM['powerState'], None), 86 (None, ROOM['powerState'], None),
82 (None, ROOM['state'], None), 87 (None, ROOM['state'], None),
83 ]) 88 ])
84 self._readRules(rulesPath)
85 ruleParseTime = time.time() - t1 89 ruleParseTime = time.time() - t1
86 except ValueError: 90 except ValueError:
87 # this is so if you're just watching the inferred output, 91 # this is so if you're just watching the inferred output,
88 # you'll see the error too 92 # you'll see the error too
89 self.inferred = Graph() 93 self.inferred = Graph()
91 Literal(traceback.format_exc()))) 95 Literal(traceback.format_exc())))
92 raise 96 raise
93 return [(ROOM['reasoner'], ROOM['ruleParseTime'], 97 return [(ROOM['reasoner'], ROOM['ruleParseTime'],
94 Literal(ruleParseTime))], ruleParseTime 98 Literal(ruleParseTime))], ruleParseTime
95 99
96 evtiming.serviceLevel.timed('graphChanged') 100 @STATS.graphChanged.time()
97 def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None): 101 def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
98 """ 102 """
99 If we're getting called for a oneShot event, the oneShotGraph 103 If we're getting called for a oneShot event, the oneShotGraph
100 statements are already in inputGraph.getGraph(). 104 statements are already in inputGraph.getGraph().
101 """ 105 """
106 log.debug(" oneshot stmt %r", s) 110 log.debug(" oneshot stmt %r", s)
107 t1 = time.time() 111 t1 = time.time()
108 oldInferred = self.inferred 112 oldInferred = self.inferred
109 try: 113 try:
110 ruleStatStmts, ruleParseSec = self.updateRules() 114 ruleStatStmts, ruleParseSec = self.updateRules()
111 115
112 self.inferred = self._makeInferred(inputGraph.getGraph()) 116 self.inferred = self._makeInferred(inputGraph.getGraph())
113 117
114 self.inferred += unquoteOutputStatements(self.inferred) 118 self.inferred += unquoteOutputStatements(self.inferred)
115 119
116 self.inferred += ruleStatStmts 120 self.inferred += ruleStatStmts
117 121
118 if oneShot: 122 if oneShot:
119 # It's possible a oneShotGraph statement didn't 123 # It's possible a oneShotGraph statement didn't
120 # trigger a rule to do something, but was itself the 124 # trigger a rule to do something, but was itself the
144 out.add((ROOM['reasoner'], ROOM['inferenceTime'], 148 out.add((ROOM['reasoner'], ROOM['inferenceTime'],
145 Literal(inferenceTime))) 149 Literal(inferenceTime)))
146 return out 150 return out
147 151
148 152
153
149 class Index(cyclone.web.RequestHandler): 154 class Index(cyclone.web.RequestHandler):
150 def get(self): 155 def get(self):
151 print evtiming.serviceLevel.serviceJsonReport()
152 156
153 # make sure GET / fails if our poll loop died 157 # make sure GET / fails if our poll loop died
154 ago = time.time() - self.settings.reasoning.lastPollTime 158 ago = time.time() - self.settings.reasoning.lastPollTime
155 if ago > 15: 159 if ago > 15:
156 self.set_status(500) 160 self.set_status(500)
274 (r'/(lastInput|lastOutput)Graph', GraphResource), 278 (r'/(lastInput|lastOutput)Graph', GraphResource),
275 (r'/ntGraphs', NtGraphs), 279 (r'/ntGraphs', NtGraphs),
276 (r'/rules', Rules), 280 (r'/rules', Rules),
277 (r'/status', Status), 281 (r'/status', Status),
278 (r'/events', Events), 282 (r'/events', Events),
283 (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}),
279 ] 284 ]
280 cyclone.web.Application.__init__(self, handlers, reasoning=reasoning) 285 cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)
281 286
282 def configLogging(arg): 287 def configLogging(arg):
283 log.setLevel(WARN) 288 log.setLevel(WARN)