view bin/load_test_rdfdb @ 1949:88ca10c9e31a

hack up stubs to pass mypy. they're very weak right now Ignore-this: f4c854346bc206725a1065079d8d158f
author Drew Perttula <drewp@bigasterisk.com>
date Thu, 06 Jun 2019 00:06:00 +0000
parents 4718ca6f812e
children
line wrap: on
line source

#!bin/python
from run_local import log
from twisted.internet import reactor, task, defer
from rdflib import URIRef, Literal
from twisted.internet.defer import ensureDeferred
from rdfdb.syncedgraph import SyncedGraph
import time, logging

from light9 import networking, showconfig
from light9.namespaces import L9


class BusyClient:

    def __init__(self, subj, rate):
        self.subj = subj
        self.rate = rate

        self.graph = SyncedGraph(networking.rdfdb.url, "collector")
        self.graph.initiallySynced.addCallback(self.go)

    def go(self, _):
        task.LoopingCall(self.loop).start(1 / self.rate)

    def loop(self):
        self.graph.patchObject(showconfig.showUri() + '/loadTestContext',
                               subject=self.subj,
                               predicate=L9['time'],
                               newObject=Literal(str(time.time())))


def main():
    log.setLevel(logging.INFO)

    clients = [BusyClient(L9['loadTest_%d' % i], 20) for i in range(10)]
    reactor.run()


if __name__ == "__main__":
    main()