changeset 473:388769b5f8ff

stats support and maybe a no-op filtering logic change snuck in there Ignore-this: d54125308243159b28ef11e2d09014f4
author drewp@bigasterisk.com
date Sat, 20 Apr 2019 23:51:02 -0700
parents a63549a50b3f
children 1d2817cb9a6f
files lib/patchablegraph.py
diffstat 1 files changed, 49 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/lib/patchablegraph.py	Sat Apr 20 23:49:16 2019 -0700
+++ b/lib/patchablegraph.py	Sat Apr 20 23:51:02 2019 -0700
@@ -20,7 +20,7 @@
 differences between RDF graphs
 
 """
-import sys, json, logging
+import sys, json, logging, itertools
 import cyclone.sse
 sys.path.append("/my/proj/rdfdb")
 from rdfdb.grapheditapi import GraphEditApi
@@ -30,6 +30,7 @@
 from rdflib_jsonld.serializer import from_rdf
 from rdflib.parser import StringInputSource
 from cycloneerr import PrettyErrorHandler
+from greplin import scales
 
 log = logging.getLogger('patchablegraph')
 
@@ -73,7 +74,8 @@
     # This is not the same as g.serialize(format='json-ld')! That
     # version omits literal datatypes.
     return json.dumps(from_rdf(g))
-    
+
+_graphsInProcess = itertools.count()
 class PatchableGraph(GraphEditApi):
     """
     Master graph that you modify with self.patch, and we get the
@@ -82,44 +84,65 @@
     def __init__(self):
         self._graph = ConjunctiveGraph()
         self._observers = []
+        scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
 
+    _serialize = scales.PmfStat('serialize')
     def serialize(self, to, **kw):
-        return self._graph.serialize(to, **kw)
-        
+        with self._serialize.time():
+            return self._graph.serialize(to, **kw)
+
+    _patch = scales.PmfStat('patch')
+    _len = scales.IntStat('statementCount')
     def patch(self, p):
-        if p.isNoop():
-            return
-        patchQuads(self._graph,
-                   deleteQuads=p.delQuads,
-                   addQuads=p.addQuads,
-                   perfect=False) # true?
-        for ob in self._observers:
-            ob(patchAsJson(p))
+        with self._patch.time():
+            # assuming no stmt is both in p.addQuads and p.delQuads.
+            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
+            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
+            minimizedP = Patch(addQuads=adds, delQuads=dels)
+            if minimizedP.isNoop():
+                return
+            patchQuads(self._graph,
+                       deleteQuads=dels,
+                       addQuads=adds,
+                       perfect=False) # true?
+            for ob in self._observers:
+                ob(patchAsJson(p))
+            self._len = len(self._graph)
 
     def asJsonLd(self):
         return graphAsJson(self._graph)
-            
+
+    _currentObservers = scales.IntStat('observers/current')
+    _observersAdded = scales.IntStat('observers/added')
     def addObserver(self, onPatch):
         self._observers.append(onPatch)
+        self._currentObservers = len(self._observers)
+        self._observersAdded += 1
         
     def removeObserver(self, onPatch):
         try:
             self._observers.remove(onPatch)
         except ValueError:
             pass
+        self._currentObservers = len(self._observers)
 
     def setToGraph(self, newGraph):
         self.patch(Patch.fromDiff(self._graph, newGraph))
         
+    _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
+    _sendFullGraph = scales.PmfStat('serve/events/sendFull')
+    _sendPatch = scales.PmfStat('serve/events/sendPatch')
 
 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
     def initialize(self, masterGraph):
         self.masterGraph = masterGraph
         
     def get(self):
-        writeGraphResponse(self, self.masterGraph,
-                           self.request.headers.get('accept'))
-        
+        with self.masterGraph._sendSimpleGraph.time():
+            writeGraphResponse(self, self.masterGraph,
+                               self.request.headers.get('accept'))
+
+
 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
     """
     One session with one client.
@@ -134,17 +157,19 @@
     def __init__(self, application, request, masterGraph):
         cyclone.sse.SSEHandler.__init__(self, application, request)
         self.masterGraph = masterGraph
-        
+
     def bind(self):
-        graphJson = self.masterGraph.asJsonLd()
-        log.debug("send fullGraph event: %s", graphJson)
-        self.sendEvent(message=graphJson, event=b'fullGraph')
-        self.masterGraph.addObserver(self.onPatch)
+        with self.masterGraph._sendFullGraph.time():
+            graphJson = self.masterGraph.asJsonLd()
+            log.debug("send fullGraph event: %s", graphJson)
+            self.sendEvent(message=graphJson, event=b'fullGraph')
+            self.masterGraph.addObserver(self.onPatch)
 
     def onPatch(self, patchJson):
-        # throttle and combine patches here- ideally we could see how
-        # long the latency to the client is to make a better rate choice
-        self.sendEvent(message=patchJson, event=b'patch')
+        with self.masterGraph._sendPatch.time():
+            # throttle and combine patches here- ideally we could see how
+            # long the latency to the client is to make a better rate choice
+            self.sendEvent(message=patchJson, event=b'patch')
                
     def unbind(self):
         self.masterGraph.removeObserver(self.onPatch)