Mercurial > code > home > repos > homeauto
changeset 473:388769b5f8ff
stats support and maybe a no-op filtering logic change snuck in there
Ignore-this: d54125308243159b28ef11e2d09014f4
author | drewp@bigasterisk.com |
---|---|
date | Sat, 20 Apr 2019 23:51:02 -0700 |
parents | a63549a50b3f |
children | 1d2817cb9a6f |
files | lib/patchablegraph.py |
diffstat | 1 files changed, 49 insertions(+), 24 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/patchablegraph.py Sat Apr 20 23:49:16 2019 -0700 +++ b/lib/patchablegraph.py Sat Apr 20 23:51:02 2019 -0700 @@ -20,7 +20,7 @@ differences between RDF graphs """ -import sys, json, logging +import sys, json, logging, itertools import cyclone.sse sys.path.append("/my/proj/rdfdb") from rdfdb.grapheditapi import GraphEditApi @@ -30,6 +30,7 @@ from rdflib_jsonld.serializer import from_rdf from rdflib.parser import StringInputSource from cycloneerr import PrettyErrorHandler +from greplin import scales log = logging.getLogger('patchablegraph') @@ -73,7 +74,8 @@ # This is not the same as g.serialize(format='json-ld')! That # version omits literal datatypes. return json.dumps(from_rdf(g)) - + +_graphsInProcess = itertools.count() class PatchableGraph(GraphEditApi): """ Master graph that you modify with self.patch, and we get the @@ -82,44 +84,65 @@ def __init__(self): self._graph = ConjunctiveGraph() self._observers = [] + scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) + _serialize = scales.PmfStat('serialize') def serialize(self, to, **kw): - return self._graph.serialize(to, **kw) - + with self._serialize.time(): + return self._graph.serialize(to, **kw) + + _patch = scales.PmfStat('patch') + _len = scales.IntStat('statementCount') def patch(self, p): - if p.isNoop(): - return - patchQuads(self._graph, - deleteQuads=p.delQuads, - addQuads=p.addQuads, - perfect=False) # true? - for ob in self._observers: - ob(patchAsJson(p)) + with self._patch.time(): + # assuming no stmt is both in p.addQuads and p.delQuads. + dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) + adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) + minimizedP = Patch(addQuads=adds, delQuads=dels) + if minimizedP.isNoop(): + return + patchQuads(self._graph, + deleteQuads=dels, + addQuads=adds, + perfect=False) # true? + for ob in self._observers: + ob(patchAsJson(p)) + self._len = len(self._graph) def asJsonLd(self): return graphAsJson(self._graph) - + + _currentObservers = scales.IntStat('observers/current') + _observersAdded = scales.IntStat('observers/added') def addObserver(self, onPatch): self._observers.append(onPatch) + self._currentObservers = len(self._observers) + self._observersAdded += 1 def removeObserver(self, onPatch): try: self._observers.remove(onPatch) except ValueError: pass + self._currentObservers = len(self._observers) def setToGraph(self, newGraph): self.patch(Patch.fromDiff(self._graph, newGraph)) + _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') + _sendFullGraph = scales.PmfStat('serve/events/sendFull') + _sendPatch = scales.PmfStat('serve/events/sendPatch') class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): def initialize(self, masterGraph): self.masterGraph = masterGraph def get(self): - writeGraphResponse(self, self.masterGraph, - self.request.headers.get('accept')) - + with self.masterGraph._sendSimpleGraph.time(): + writeGraphResponse(self, self.masterGraph, + self.request.headers.get('accept')) + + class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): """ One session with one client. @@ -134,17 +157,19 @@ def __init__(self, application, request, masterGraph): cyclone.sse.SSEHandler.__init__(self, application, request) self.masterGraph = masterGraph - + def bind(self): - graphJson = self.masterGraph.asJsonLd() - log.debug("send fullGraph event: %s", graphJson) - self.sendEvent(message=graphJson, event=b'fullGraph') - self.masterGraph.addObserver(self.onPatch) + with self.masterGraph._sendFullGraph.time(): + graphJson = self.masterGraph.asJsonLd() + log.debug("send fullGraph event: %s", graphJson) + self.sendEvent(message=graphJson, event=b'fullGraph') + self.masterGraph.addObserver(self.onPatch) def onPatch(self, patchJson): - # throttle and combine patches here- ideally we could see how - # long the latency to the client is to make a better rate choice - self.sendEvent(message=patchJson, event=b'patch') + with self.masterGraph._sendPatch.time(): + # throttle and combine patches here- ideally we could see how + # long the latency to the client is to make a better rate choice + self.sendEvent(message=patchJson, event=b'patch') def unbind(self): self.masterGraph.removeObserver(self.onPatch)