Changeset - c0742e710eeb
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 9 years ago 2016-06-13 19:25:00
drewp@bigasterisk.com
refactor collector json parsing
Ignore-this: ae540a86d86ff9a45e5b2b66ee5f82d4
1 file changed with 9 insertions and 8 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
#!bin/python
 
from __future__ import division
 
from rdflib import Graph, URIRef, Literal
 
from twisted.internet import reactor
 
from twisted.web.server import Site
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 
import json
 
import logging
 
import klein
 
import optparse
 
from greplin import scales
 
from greplin.scales.twistedweb import StatsResource
 

	
 
from run_local import log
 
from light9.collector.output import EnttecDmx, Udmx
 
from light9.collector.collector import Collector
 
from light9.namespaces import L9
 
from light9 import networking
 
from light9.rdfdb.syncedgraph import SyncedGraph
 
from light9.rdfdb import clientsession
 

	
 
def parseJsonMessage(msg):
 
    body = json.load(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 
class WebServer(object):
 
    stats = scales.collection('/webServer',
 
                              scales.PmfStat('setAttr'))
 
    app = klein.Klein()
 
    def __init__(self, collector):
 
        self.collector = collector
 
        
 
    @app.route('/attrs', methods=['PUT'])
 
    def putAttrs(self, request):
 
        with WebServer.stats.setAttr.time():
 
            body = json.load(request.content)
 
            settings = []
 
            for device, attr, value in body['settings']:
 
                settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
            self.collector.setAttrs(body['client'],
 
                                    body['clientSession'],
 
                                    settings,
 
                                    body['sendTime'])
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
 
            self.collector.setAttrs(client, clientSession, settings, sendTime)
 
            request.setResponseCode(202)
 

	
 
    @app.route('/stats', methods=['GET'])
 
    def getStats(self, request):
 
        return StatsResource('collector')
 
        
 
def startZmq(port, collector):
 
    stats = scales.collection('/zmqServer',
 
                              scales.PmfStat('setAttr'))
 
    
 
    zf = ZmqFactory()
 
    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttrZmq.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            message[0]
 
            collector.setAttrs()
 
    s.onPull = onPull
 

	
 
def launch(graph, doLoadTest=False):
 

	
0 comments (0 inline, 0 general)