Mercurial > code > home > repos > light9
annotate light9/collector/collector_client.py @ 2187:ccdfdc8183ad
remove py2-style 'object' superclass
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 May 2023 21:14:01 -0700 |
parents | 9aa046cc9b33 |
children | 62b832e9aac5 |
rev | line source |
---|---|
1830 | 1 from light9 import networking |
2 from light9.effect.settings import DeviceSettings | |
2046
9aa046cc9b33
replace greplin with prometheus throughout (untested)
drewp@bigasterisk.com
parents:
2043
diff
changeset
|
3 from light9.metrics import metrics |
1830 | 4 from twisted.internet import defer |
5 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection | |
1858 | 6 import json, time, logging |
1830 | 7 import treq |
8 | |
9 log = logging.getLogger('coll_client') | |
10 | |
1858 | 11 _zmqClient = None |
12 | |
13 | |
2187
ccdfdc8183ad
remove py2-style 'object' superclass
drewp@bigasterisk.com
parents:
2046
diff
changeset
|
14 class TwistedZmqClient: |
1877 | 15 |
1830 | 16 def __init__(self, service): |
17 zf = ZmqFactory() | |
18 e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) | |
19 self.conn = ZmqPushConnection(zf, e) | |
1858 | 20 |
1830 | 21 def send(self, msg): |
22 self.conn.push(msg) | |
23 | |
1927 | 24 |
1971
b26a1e7fcfbe
dmx out: lots of stats and more reconnection attempts after usb errors
drewp@bigasterisk.com
parents:
1960
diff
changeset
|
25 def toCollectorJson(client, session, settings: DeviceSettings) -> str: |
1830 | 26 assert isinstance(settings, DeviceSettings) |
1858 | 27 return json.dumps({ |
28 'settings': settings.asList(), | |
29 'client': client, | |
30 'clientSession': session, | |
31 'sendTime': time.time(), | |
32 }) | |
33 | |
34 | |
1830 | 35 def sendToCollectorZmq(msg): |
36 global _zmqClient | |
37 if _zmqClient is None: | |
38 _zmqClient = TwistedZmqClient(networking.collectorZmq) | |
39 _zmqClient.send(msg) | |
1923 | 40 return defer.succeed(0.0) |
1858 | 41 |
42 | |
1927 | 43 def sendToCollector(client, session, settings: DeviceSettings, |
44 useZmq=False) -> defer.Deferred: | |
1830 | 45 """deferred to the time in seconds it took to get a response from collector""" |
46 sendTime = time.time() | |
1876
8da5b4edcb7e
type fixes, especially for collector_client
Drew Perttula <drewp@bigasterisk.com>
parents:
1873
diff
changeset
|
47 msg = toCollectorJson(client, session, settings).encode('utf8') |
1830 | 48 |
49 if useZmq: | |
50 d = sendToCollectorZmq(msg) | |
51 else: | |
52 d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1) | |
1858 | 53 |
1830 | 54 def onDone(result): |
55 dt = time.time() - sendTime | |
2046
9aa046cc9b33
replace greplin with prometheus throughout (untested)
drewp@bigasterisk.com
parents:
2043
diff
changeset
|
56 metrics('send').observe(dt) |
1830 | 57 if dt > .1: |
58 log.warn('sendToCollector request took %.1fms', dt * 1000) | |
59 return dt | |
1858 | 60 |
1830 | 61 d.addCallback(onDone) |
1858 | 62 |
1830 | 63 def onErr(err): |
64 log.warn('sendToCollector failed: %r', err) | |
1858 | 65 |
1830 | 66 d.addErrback(onErr) |
67 return d |