Mercurial > code > home > repos > light9
annotate light9/collector/collector_client.py @ 1876:8da5b4edcb7e
type fixes, especially for collector_client
Ignore-this: c4ad6f85a477fc034ceb6e996e701de8
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Mon, 27 May 2019 07:03:27 +0000 |
parents | f001d689b3e2 |
children | d01e21621975 |
rev | line source |
---|---|
1830 | 1 from light9 import networking |
2 from light9.effect.settings import DeviceSettings | |
3 from twisted.internet import defer | |
4 from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection | |
1858 | 5 import json, time, logging |
1830 | 6 import treq |
7 | |
8 log = logging.getLogger('coll_client') | |
9 | |
1858 | 10 _zmqClient = None |
11 | |
12 | |
1830 | 13 class TwistedZmqClient(object): |
14 def __init__(self, service): | |
15 zf = ZmqFactory() | |
16 e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) | |
17 self.conn = ZmqPushConnection(zf, e) | |
1858 | 18 |
1830 | 19 def send(self, msg): |
20 self.conn.push(msg) | |
21 | |
22 | |
1876
8da5b4edcb7e
type fixes, especially for collector_client
Drew Perttula <drewp@bigasterisk.com>
parents:
1873
diff
changeset
|
23 def toCollectorJson(client, session, settings) -> str: |
1830 | 24 assert isinstance(settings, DeviceSettings) |
1858 | 25 return json.dumps({ |
26 'settings': settings.asList(), | |
27 'client': client, | |
28 'clientSession': session, | |
29 'sendTime': time.time(), | |
30 }) | |
31 | |
32 | |
1830 | 33 def sendToCollectorZmq(msg): |
34 global _zmqClient | |
35 if _zmqClient is None: | |
36 _zmqClient = TwistedZmqClient(networking.collectorZmq) | |
37 _zmqClient.send(msg) | |
38 return defer.succeed(0) | |
1858 | 39 |
40 | |
1830 | 41 def sendToCollector(client, session, settings, useZmq=False): |
42 """deferred to the time in seconds it took to get a response from collector""" | |
43 sendTime = time.time() | |
1876
8da5b4edcb7e
type fixes, especially for collector_client
Drew Perttula <drewp@bigasterisk.com>
parents:
1873
diff
changeset
|
44 msg = toCollectorJson(client, session, settings).encode('utf8') |
1830 | 45 |
46 if useZmq: | |
47 d = sendToCollectorZmq(msg) | |
48 else: | |
49 d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1) | |
1858 | 50 |
1830 | 51 def onDone(result): |
52 dt = time.time() - sendTime | |
53 if dt > .1: | |
54 log.warn('sendToCollector request took %.1fms', dt * 1000) | |
55 return dt | |
1858 | 56 |
1830 | 57 d.addCallback(onDone) |
1858 | 58 |
1830 | 59 def onErr(err): |
60 log.warn('sendToCollector failed: %r', err) | |
1858 | 61 |
1830 | 62 d.addErrback(onErr) |
63 return d |