Mercurial > code > home > repos > homeauto
comparison lib/patchablegraph/patchsource.py @ 592:32939cddf360
more state vars
Ignore-this: 5c5064141b82e35eecc82b0210c7db86
author | drewp@bigasterisk.com |
---|---|
date | Sat, 06 Jul 2019 13:54:52 -0700 |
parents | 2d13254fdd44 |
children |
comparison
equal
deleted
inserted
replaced
591:1398f4ec01a5 | 592:32939cddf360 |
---|---|
1 import logging | 1 import logging, time |
2 import traceback | 2 import traceback |
3 from rdflib import ConjunctiveGraph | 3 from rdflib import ConjunctiveGraph |
4 from rdflib.parser import StringInputSource | 4 from rdflib.parser import StringInputSource |
5 from twisted.internet import reactor, defer | 5 from twisted.internet import reactor, defer |
6 | 6 |
21 self.connectionFailed = defer.Deferred() | 21 self.connectionFailed = defer.Deferred() |
22 self.connectionLost = defer.Deferred() | 22 self.connectionLost = defer.Deferred() |
23 | 23 |
24 self._listeners = set() | 24 self._listeners = set() |
25 log.info('start read from %s', url) | 25 log.info('start read from %s', url) |
26 self._startReadTime = time.time() | |
27 self._patchesReceived = 0 # including fullgraph | |
26 # note: fullGraphReceived isn't guaranteed- the stream could | 28 # note: fullGraphReceived isn't guaranteed- the stream could |
27 # start with patches | 29 # start with patches |
28 self._fullGraphReceived = False | 30 self._fullGraphReceived = False |
29 self._eventSource = EventSource(url.toPython().encode('utf8'), | 31 self._eventSource = EventSource(url.toPython().encode('utf8'), |
30 userAgent=agent) | 32 userAgent=agent) |
36 | 38 |
37 def state(self): | 39 def state(self): |
38 return { | 40 return { |
39 'url': self.url, | 41 'url': self.url, |
40 'fullGraphReceived': self._fullGraphReceived, | 42 'fullGraphReceived': self._fullGraphReceived, |
43 'patchesReceived': self._patchesReceived, | |
44 'time': { | |
45 'open': getattr(self, '_startReadTime', None), | |
46 'fullGraph': getattr(self, '_fullGraphTime', None), | |
47 'latestPatch': getattr(self, '_latestPatchTime', None), | |
48 }, | |
49 'closed': self._eventSource is None, | |
41 } | 50 } |
42 | 51 |
43 def addPatchListener(self, func): | 52 def addPatchListener(self, func): |
44 """ | 53 """ |
45 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | 54 func(patch, fullGraph=[true if the patch is the initial fullgraph]) |
74 self._sendPatch(p, fullGraph=True) | 83 self._sendPatch(p, fullGraph=True) |
75 except Exception: | 84 except Exception: |
76 log.error(traceback.format_exc()) | 85 log.error(traceback.format_exc()) |
77 raise | 86 raise |
78 self._fullGraphReceived = True | 87 self._fullGraphReceived = True |
88 self._fullGraphTime = time.time() | |
89 self._patchesReceived += 1 | |
79 | 90 |
80 def _onPatch(self, message): | 91 def _onPatch(self, message): |
81 try: | 92 try: |
82 p = patchFromJson(message) | 93 p = patchFromJson(message) |
83 self._sendPatch(p, fullGraph=False) | 94 self._sendPatch(p, fullGraph=False) |
84 except: | 95 except: |
85 log.error(traceback.format_exc()) | 96 log.error(traceback.format_exc()) |
86 raise | 97 raise |
98 self._latestPatchTime = time.time() | |
99 self._patchesReceived += 1 | |
87 | 100 |
88 def _sendPatch(self, p, fullGraph): | 101 def _sendPatch(self, p, fullGraph): |
89 log.debug('PatchSource %s received patch %s (fullGraph=%s)', | 102 log.debug('PatchSource %s received patch %s (fullGraph=%s)', |
90 self.url, p.shortSummary(), fullGraph) | 103 self.url, p.shortSummary(), fullGraph) |
91 for lis in self._listeners: | 104 for lis in self._listeners: |