view bin/load_test_rdfdb @ 2366:b42cb1ea1d20

rm - we don't do a kernel module anymore; just usb api
author drewp@bigasterisk.com
date Mon, 05 Jun 2023 17:34:01 -0700
parents 4718ca6f812e
children
line wrap: on
line source

#!bin/python
from run_local import log
from twisted.internet import reactor, task, defer
from rdflib import URIRef, Literal
from twisted.internet.defer import ensureDeferred
from rdfdb.syncedgraph import SyncedGraph
import time, logging

from light9 import networking, showconfig
from light9.namespaces import L9


class BusyClient:

    def __init__(self, subj, rate):
        self.subj = subj
        self.rate = rate

        self.graph = SyncedGraph(networking.rdfdb.url, "collector")
        self.graph.initiallySynced.addCallback(self.go)

    def go(self, _):
        task.LoopingCall(self.loop).start(1 / self.rate)

    def loop(self):
        self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
                               subject=self.subj,
                               predicate=L9['time'],
                               newObject=Literal(str(time.time())))


def main():
    log.setLevel(logging.INFO)

    clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
    reactor.run()


if __name__ == "__main__":
    main()