Mercurial > code > home > repos > homeauto
view service/environment/environment.py @ 1139:db955e7943af
arduinonode reads config from etcd. use pushConfig.py to inform all nodes
Ignore-this: 3155d873bb30ca82b48ace7531a550e5
darcs-hash:0b2be13fbb4dc2571bb1bb8ac97f6a1350f36e77
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 03 Mar 2018 17:53:37 -0800 |
parents | 4d36cae32a4c |
children | 6304b0370491 |
line wrap: on
line source
#!/usr/bin/python """ return some rdf about the environment, e.g. the current time, daytime/night, overall modes like 'maintenance mode', etc """ import sys, datetime, cyclone.web from twisted.internet import reactor, task from dateutil.tz import tzlocal from dateutil.relativedelta import relativedelta, FR from rdflib import Namespace, Literal sys.path.append("/my/proj/homeauto/lib") from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler from twilight import isWithinTwilight from rdfdoc import Doc ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") def update(masterGraph): stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o) now = datetime.datetime.now(tzlocal()) stmt(DEV.environment, ROOM.localHour, Literal(now.hour)) stmt(DEV.environment, ROOM.localTimeToMinute, Literal(now.strftime("%H:%M"))) stmt(DEV.environment, ROOM.localTimeToSecond, Literal(now.strftime("%H:%M:%S"))) stmt(DEV.environment, ROOM.localDayOfWeek, Literal(now.strftime("%A"))) stmt(DEV.environment, ROOM.localMonthDay, Literal(now.strftime("%B %e"))) stmt(DEV.environment, ROOM.localDate, Literal(now.strftime("%Y-%m-%d"))) for offset in range(-12, 7): d = now.date() + datetime.timedelta(days=offset) if d == d + relativedelta(day=31, weekday=FR(-1)): stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset)) stmt(DEV.calendar, ROOM.twilight, ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime']) def main(): from twisted.python import log as twlog twlog.startLogging(sys.stderr) masterGraph = PatchableGraph() class Application(cyclone.web.Application): def __init__(self): handlers = [ (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}), (r'/graph', CycloneGraphHandler, {'masterGraph': masterGraph}), (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/doc', Doc), # to be shared ] cyclone.web.Application.__init__(self, handlers, masterGraph=masterGraph) task.LoopingCall(update, masterGraph).start(1) reactor.listenTCP(9075, Application()) reactor.run() if __name__ == '__main__': main()