view light9/collector/collector.py @ 1412:4916b397b5b8

collector: don't fail to start just because some devs can't be mapped Ignore-this: f05e0d764e7a646f7cd8594b90f66b52
author drewp@bigasterisk.com
date Fri, 10 Jun 2016 03:26:27 +0000
parents f427801da9f6
children ab7b40d20af0
line wrap: on
line source

from __future__ import division
import time
import logging
from light9.namespaces import L9, RDF, DEV
from light9.collector.output import setListElem
from light9.collector.device import toOutputAttrs, resolve

log = logging.getLogger('collector')

def outputMap(graph, outputs):
    """From rdf config graph, compute a map of
       (device, attr) : (output, index)
    that explains which output index to set for any device update.
    """
    ret = {}

    outIndex = {} # port : (output, index)
    for out in outputs:
        for index, uri in out.allConnections():
            outIndex[uri] = (out, index)

    for dev in graph.subjects(RDF.type, L9['Device']):
        for attr, connectedTo in graph.predicate_objects(dev):
            if attr == RDF.type:
                continue
            outputPorts = list(graph.subjects(L9['connectedTo'], connectedTo))
            if len(outputPorts) == 0:
                raise ValueError('no output port :connectedTo %r' % connectedTo)
            elif len(outputPorts) > 1:
                raise ValueError('multiple output ports (%r) :connectedTo %r' %
                                 (outputPorts, connectedTo))
            else:
                try:
                    output, index = outIndex[outputPorts[0]]
                except KeyError:
                    log.warn('skipping %r', outputPorts[0])
                    continue
            ret[(dev, attr)] = output, index
            log.debug('outputMap (%r, %r) -> %r, %r', dev, attr, output, index)
    
    return ret
        
class Collector(object):
    def __init__(self, graph, outputs, clientTimeoutSec=10):
        self.graph = graph
        self.outputs = outputs
        self.clientTimeoutSec = clientTimeoutSec

        self.graph.addHandler(self.rebuildOutputMap)
        self.lastRequest = {} # client : (session, time, {(dev,attr): latestValue})

    def rebuildOutputMap(self):
        self.outputMap = outputMap(self.graph, self.outputs) # (device, attr) : (output, index)
        self.deviceType = {} # uri: type that's a subclass of Device
        for dev in self.graph.subjects(RDF.type, L9['Device']):
            for t in self.graph.objects(dev, RDF.type):
                if t != L9['Device']:
                    self.deviceType[dev] = t

    def _forgetStaleClients(self, now):
        staleClients = []
        for c, (_, t, _) in self.lastRequest.iteritems():
            if t < now - self.clientTimeoutSec:
                staleClients.append(c)
        for c in staleClients:
            del self.lastRequest[c]

    def resolvedSettingsDict(self, settingsList):
        out = {}
        for d, a, v in settingsList:
            if (d, a) in out:
                out[(d, a)] = resolve(d, a, [out[(d, a)], v])
            else:
                out[(d, a)] = v
        return out

    def setAttrs(self, client, clientSession, settings):
        """
        settings is a list of (device, attr, value). These attrs are
        device attrs. We resolve conflicting values, process them into
        output attrs, and call Output.update/Output.flush to send the
        new outputs.

        Call with settings=[] to ping us that your session isn't dead.
        """
        now = time.time()

        self._forgetStaleClients(now)
        row = self.lastRequest.get(client)
        if row is not None:
            sess, _, prevClientSettings = row
            if sess != clientSession:
                prevClientSettings = {}
        else:
            prevClientSettings = {}
        prevClientSettings.update(self.resolvedSettingsDict(settings))
        self.lastRequest[client] = (clientSession, now, prevClientSettings)


        deviceAttrs = {} # device: {attr: value}
        for _, _, lastSettings in self.lastRequest.itervalues():
            for (device, attr), value in lastSettings.iteritems():
                attrs = deviceAttrs.setdefault(device, {})
                if attr in attrs:
                    value = resolve(device, attr, [attrs[attr], value])
                attrs[attr] = value

        outputAttrs = {} # device: {attr: value}
        for d in deviceAttrs:
            outputAttrs[d] = toOutputAttrs(self.deviceType[d], deviceAttrs[d])
        
        pendingOut = {} # output : values
        for device, attrs in outputAttrs.iteritems():
            for attr, value in attrs.iteritems():
                self.setAttr(device, attr, value, pendingOut)

        self.flush(pendingOut)

    def setAttr(self, device, attr, value, pendingOut):
        output, index = self.outputMap[(device, attr)]
        outList = pendingOut.setdefault(output, [])
        setListElem(outList, index, value, combine=max)

    def flush(self, pendingOut):
        """write any changed outputs"""
        for out, vals in pendingOut.iteritems():
            out.update(vals)
            out.flush()