Mercurial > code > home > repos > light9
view bin/load_test_rdfdb @ 1917:713e56e8e2b9
rdfdb load tester
Ignore-this: ca714a9f84b3c0f0aa05b3805cd7344b
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Sat, 01 Jun 2019 20:07:27 +0000 |
parents | |
children | 4718ca6f812e |
line wrap: on
line source
#!bin/python from run_local import log from twisted.internet import reactor, task, defer from rdflib import URIRef, Literal from twisted.internet.defer import ensureDeferred from rdfdb.syncedgraph import SyncedGraph import time, logging from light9 import networking, showconfig from light9.namespaces import L9 class BusyClient: def __init__(self, subj, rate): self.subj = subj self.rate = rate self.graph = SyncedGraph(networking.rdfdb.url, "collector") self.graph.initiallySynced.addCallback(self.go) def go(self, _): task.LoopingCall(self.loop).start(1 / self.rate) def loop(self): self.graph.patchObject(showconfig.showUri() + '/loadTestContext', subject=self.subj, predicate=L9['time'], newObject=Literal(str(time.time()))) def main(): log.setLevel(logging.INFO) clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)] reactor.run() if __name__ == "__main__": main()