Files
@ f77dfa8e82b9
Branch filter:
Location: light9/bin/load_test_rdfdb - annotation
f77dfa8e82b9
1.0 KiB
text/plain
new 'image' effect for animating light colors
Ignore-this: 8df08111173bd96ff8d8093d4243c3ba
Ignore-this: 8df08111173bd96ff8d8093d4243c3ba
713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 4718ca6f812e 713e56e8e2b9 4718ca6f812e 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 4718ca6f812e 713e56e8e2b9 713e56e8e2b9 4718ca6f812e 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 4718ca6f812e 4718ca6f812e 4718ca6f812e 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 713e56e8e2b9 4718ca6f812e 4718ca6f812e 713e56e8e2b9 713e56e8e2b9 | #!bin/python
from run_local import log
from twisted.internet import reactor, task, defer
from rdflib import URIRef, Literal
from twisted.internet.defer import ensureDeferred
from rdfdb.syncedgraph import SyncedGraph
import time, logging
from light9 import networking, showconfig
from light9.namespaces import L9
class BusyClient:
def __init__(self, subj, rate):
self.subj = subj
self.rate = rate
self.graph = SyncedGraph(networking.rdfdb.url, "collector")
self.graph.initiallySynced.addCallback(self.go)
def go(self, _):
task.LoopingCall(self.loop).start(1 / self.rate)
def loop(self):
self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
subject=self.subj,
predicate=L9['time'],
newObject=Literal(str(time.time())))
def main():
log.setLevel(logging.INFO)
clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
reactor.run()
if __name__ == "__main__":
main()
|