comparison lib/patchablegraph/patchsource.py @ 592:32939cddf360

more state vars Ignore-this: 5c5064141b82e35eecc82b0210c7db86
author drewp@bigasterisk.com
date Sat, 06 Jul 2019 13:54:52 -0700
parents 2d13254fdd44
children
comparison
equal deleted inserted replaced
591:1398f4ec01a5 592:32939cddf360
1 import logging 1 import logging, time
2 import traceback 2 import traceback
3 from rdflib import ConjunctiveGraph 3 from rdflib import ConjunctiveGraph
4 from rdflib.parser import StringInputSource 4 from rdflib.parser import StringInputSource
5 from twisted.internet import reactor, defer 5 from twisted.internet import reactor, defer
6 6
21 self.connectionFailed = defer.Deferred() 21 self.connectionFailed = defer.Deferred()
22 self.connectionLost = defer.Deferred() 22 self.connectionLost = defer.Deferred()
23 23
24 self._listeners = set() 24 self._listeners = set()
25 log.info('start read from %s', url) 25 log.info('start read from %s', url)
26 self._startReadTime = time.time()
27 self._patchesReceived = 0 # including fullgraph
26 # note: fullGraphReceived isn't guaranteed- the stream could 28 # note: fullGraphReceived isn't guaranteed- the stream could
27 # start with patches 29 # start with patches
28 self._fullGraphReceived = False 30 self._fullGraphReceived = False
29 self._eventSource = EventSource(url.toPython().encode('utf8'), 31 self._eventSource = EventSource(url.toPython().encode('utf8'),
30 userAgent=agent) 32 userAgent=agent)
36 38
37 def state(self): 39 def state(self):
38 return { 40 return {
39 'url': self.url, 41 'url': self.url,
40 'fullGraphReceived': self._fullGraphReceived, 42 'fullGraphReceived': self._fullGraphReceived,
43 'patchesReceived': self._patchesReceived,
44 'time': {
45 'open': getattr(self, '_startReadTime', None),
46 'fullGraph': getattr(self, '_fullGraphTime', None),
47 'latestPatch': getattr(self, '_latestPatchTime', None),
48 },
49 'closed': self._eventSource is None,
41 } 50 }
42 51
43 def addPatchListener(self, func): 52 def addPatchListener(self, func):
44 """ 53 """
45 func(patch, fullGraph=[true if the patch is the initial fullgraph]) 54 func(patch, fullGraph=[true if the patch is the initial fullgraph])
74 self._sendPatch(p, fullGraph=True) 83 self._sendPatch(p, fullGraph=True)
75 except Exception: 84 except Exception:
76 log.error(traceback.format_exc()) 85 log.error(traceback.format_exc())
77 raise 86 raise
78 self._fullGraphReceived = True 87 self._fullGraphReceived = True
88 self._fullGraphTime = time.time()
89 self._patchesReceived += 1
79 90
80 def _onPatch(self, message): 91 def _onPatch(self, message):
81 try: 92 try:
82 p = patchFromJson(message) 93 p = patchFromJson(message)
83 self._sendPatch(p, fullGraph=False) 94 self._sendPatch(p, fullGraph=False)
84 except: 95 except:
85 log.error(traceback.format_exc()) 96 log.error(traceback.format_exc())
86 raise 97 raise
98 self._latestPatchTime = time.time()
99 self._patchesReceived += 1
87 100
88 def _sendPatch(self, p, fullGraph): 101 def _sendPatch(self, p, fullGraph):
89 log.debug('PatchSource %s received patch %s (fullGraph=%s)', 102 log.debug('PatchSource %s received patch %s (fullGraph=%s)',
90 self.url, p.shortSummary(), fullGraph) 103 self.url, p.shortSummary(), fullGraph)
91 for lis in self._listeners: 104 for lis in self._listeners: