Files
@ a287ed391ed5
Branch filter:
Location: light9/light9/rdfdb/graphfile.py
a287ed391ed5
7.2 KiB
text/x-python
rdfdb preserve n3 prefixes in the files you rewrite
Ignore-this: 5645efdb50760922a96f4f5163caccb7
Ignore-this: 5645efdb50760922a96f4f5163caccb7
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | import logging, traceback, os, time
from twisted.python.filepath import FilePath
from twisted.internet import reactor
from twisted.internet.inotify import humanReadableMask
from rdflib import Graph
from light9.rdfdb.patch import Patch
from light9.rdfdb.rdflibpatch import inContext
log = logging.getLogger('graphfile')
iolog = logging.getLogger('io')
class GraphFile(object):
"""
one rdf file that we read from, write to, and notice external changes to
"""
def __init__(self, notifier, path, uri, patch, getSubgraph):
"""
uri is the context for the triples in this file. We assume
sometimes that we're the only ones with triples in this
context.
this does not include an initial reread() call
"""
self.path, self.uri = path, uri
self.patch, self.getSubgraph = patch, getSubgraph
self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote
self.namespaces = {}
if not os.path.exists(path):
# can't start notify until file exists
try:
os.makedirs(os.path.dirname(path))
except OSError:
pass
f = open(path, "w")
f.write("#new\n")
f.close()
iolog.info("%s created", path)
# this was supposed to cut out some extra reads but it
# didn't work:
self.lastWriteTimestamp = os.path.getmtime(path)
self.flushDelay = 2 # seconds until we have to call flush() when dirty
self.writeCall = None # or DelayedCall
self.notifier = notifier
self.addWatch()
def addWatch(self):
# emacs save comes in as IN_MOVE_SELF, maybe
# I was hoping not to watch IN_CHANGED and get lots of
# half-written files, but emacs doesn't close its files after
# a write, so there's no other event. I could try to sleep
# until after all the writes are done, but I think the only
# bug left is that we'll retry too agressively on a file
# that's being written
from twisted.internet.inotify import IN_CLOSE_WRITE, IN_MOVED_FROM, IN_MODIFY, IN_DELETE, IN_DELETE_SELF, IN_CHANGED
log.info("add watch on %s", self.path)
self.notifier.watch(FilePath(self.path), callbacks=[self.notify])
def notify(self, notifier, filepath, mask):
try:
maskNames = humanReadableMask(mask)
if maskNames[0] == 'delete_self':
if not filepath.exists():
log.info("%s delete_self", filepath)
self.fileGone()
return
else:
log.warn("%s delete_self event but file is here. "
"probably a new version moved in",
filepath)
# we could filter these out in the watch() call, but I want
# the debugging
if maskNames[0] in ['open', 'access', 'close_nowrite', 'attrib']:
log.debug("%s %s event, ignoring" % (filepath, maskNames))
return
try:
if filepath.getModificationTime() == self.lastWriteTimestamp:
log.debug("%s changed, but we did this write", filepath)
return
except OSError as e:
log.error("%s: %r" % (filepath, e))
# getting OSError no such file, followed by no future reads
reactor.callLater(.5, self.addWatch) # ?
return
log.info("reread %s because of %s event", filepath, maskNames)
self.reread()
except Exception:
traceback.print_exc()
def fileGone(self):
"""
our file is gone; remove the statements from that context
"""
myQuads = [(s,p,o,self.uri) for s,p,o in self.getSubgraph(self.uri)]
log.debug("dropping all statements from context %s", self.uri)
if myQuads:
self.patch(Patch(delQuads=myQuads), dueToFileChange=True)
def reread(self):
"""update the graph with any diffs from this file
n3 parser fails on "1.e+0" even though rdflib was emitting that itself
"""
old = self.getSubgraph(self.uri)
new = Graph()
try:
contents = open(self.path).read()
if contents.startswith("#new"):
log.debug("%s ignoring empty contents of my new file", self.path)
# this is a new file we're starting, and we should not
# patch our graph as if it had just been cleared. We
# shouldn't even be here reading this, but
# lastWriteTimestamp didn't work.
return
new.parse(location=self.path, format='n3')
self.namespaces.update(dict(new.namespaces()))
except SyntaxError as e:
print e
traceback.print_exc()
log.error("%s syntax error", self.path)
# todo: likely bug- if a file has this error upon first
# read, I think we don't retry it right.
return
except IOError as e:
log.error("%s rereading %s: %r", self.path, self.uri, e)
return
old = inContext(old, self.uri)
new = inContext(new, self.uri)
p = Patch.fromDiff(old, new)
if p:
log.debug("%s applying patch for changes in file", self.path)
self.patch(p, dueToFileChange=True)
else:
log.debug("old == new after reread of %s", self.path)
def dirty(self, graph):
"""
there are new contents to write to our file
graph is the rdflib.Graph that contains the contents of the
file. It is allowed to change. Note that dirty() will probably
do the save later when the graph might be different.
after a timer has passed, write it out. Any scheduling issues
between files? i don't think so. the timer might be kind of
huge, and then we might want to take a hint from a client that
it's a good time to save the files that it was editing, like
when the mouse moves out of the client's window and might be
going towards a text file editor
"""
log.info("%s dirty, needs write", self.path)
self.graphToWrite = graph
if self.writeCall:
self.writeCall.reset(self.flushDelay)
else:
self.writeCall = reactor.callLater(self.flushDelay, self.flush)
def flush(self):
self.writeCall = None
tmpOut = self.path + ".rdfdb-temp"
f = open(tmpOut, 'w')
t1 = time.time()
for p, n in self.namespaces.items():
self.graphToWrite.bind(p, n)
self.graphToWrite.serialize(destination=f, format='n3')
serializeTime = time.time() - t1
f.close()
self.lastWriteTimestamp = os.path.getmtime(tmpOut)
os.rename(tmpOut, self.path)
iolog.info("%s rewrote in %.1f ms",
self.path, serializeTime * 1000)
def __repr__(self):
return "%s(path=%r, uri=%r, ...)" % (
self.__class__.__name__, self.path, self.uri)
|