comparison lib/stategraph.py @ 1135:b9c6b8724d43

move stategraph from magma Ignore-this: 608e916953280812145c6f9ffc5313db darcs-hash:becf32439cc6d2a9a325b40df9b30f60311f7930
author drewp <drewp@bigasterisk.com>
date Sat, 17 Feb 2018 23:44:51 -0800
parents
children
comparison
equal deleted inserted replaced
1134:39c8ddd79cf2 1135:b9c6b8724d43
1 import datetime, os, inspect
2 from dateutil.tz import tzlocal
3 from rdflib import Graph, Namespace, Literal
4 DCTERMS = Namespace("http://purl.org/dc/terms/")
5
6 class StateGraph(object):
7 """
8 helper to create a graph with some of the current state of the world
9 """
10 def __init__(self, ctx):
11 """
12 note that we put the time of the __init__ call into the graph
13 as its dcterms:modified time.
14 """
15 self.g = Graph()
16 self.ctx = ctx
17
18 try:
19 requestingFile = inspect.stack()[1][1]
20 self.g.add((ctx, DCTERMS['creator'],
21 Literal(os.path.abspath(requestingFile))))
22 except IndexError:
23 pass
24 self.g.add((ctx, DCTERMS['modified'],
25 Literal(datetime.datetime.now(tzlocal()))))
26
27 def add(self, *args, **kw):
28 self.g.add(*args, **kw)
29
30 def ntLines(self):
31 nt = self.g.serialize(format='nt')
32 # this canonical order is just for debugging, so the lines are
33 # stable when you refresh the file repeatedly
34 return sorted(filter(None, nt.splitlines()))
35
36 def asTrig(self):
37 return "%s {\n%s\n}\n" % (self.ctx.n3(), '\n'.join(self.ntLines()))
38
39 def asJsonLd(self):
40 return self.g.serialize(format='json-ld')
41
42 def asAccepted(self, acceptHeader):
43 if acceptHeader == 'application/nquads':
44 return 'application/nquads', '\n'.join(
45 line.strip().rstrip('.') + '%s .' % self.ctx.n3()
46 for line in self.ntLines())
47 else:
48 return 'application/x-trig', self.asTrig()
49