view bin/attic/load_test_rdfdb @ 2381:3a2e58725037

make bin/* match
author drewp@bigasterisk.com
date Sun, 12 May 2024 19:56:25 -0700
parents 4556eebe5d73
children
line wrap: on
line source

#!bin/python
from run_local import log
from twisted.internet import reactor, task, defer
from rdflib import URIRef, Literal
from twisted.internet.defer import ensureDeferred
from rdfdb.syncedgraph import SyncedGraph
import time, logging

from light9 import networking, showconfig
from light9.namespaces import L9


class BusyClient:

    def __init__(self, subj, rate):
        self.subj = subj
        self.rate = rate

        self.graph = SyncedGraph(networking.rdfdb.url, "collector")
        self.graph.initiallySynced.addCallback(self.go)

    def go(self, _):
        task.LoopingCall(self.loop).start(1 / self.rate)

    def loop(self):
        self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
                               subject=self.subj,
                               predicate=L9['time'],
                               newObject=Literal(str(time.time())))


def main():
    log.setLevel(logging.INFO)

    clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
    reactor.run()


if __name__ == "__main__":
    main()