Mercurial > code > home > repos > homeauto
view service/environment/environment.py @ 1134:39c8ddd79cf2
allow bigger eventsource messages
Ignore-this: 97608d5f5b1059c9b92856ec5551f41c
darcs-hash:7d656493078fe41e0add0f1e33824f78c278c35a
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 03 Feb 2018 14:39:10 -0800 |
parents | 4d36cae32a4c |
children | 6304b0370491 |
line wrap: on
line source
#!/usr/bin/python """ return some rdf about the environment, e.g. the current time, daytime/night, overall modes like 'maintenance mode', etc """ import sys, datetime, cyclone.web from twisted.internet import reactor, task from dateutil.tz import tzlocal from dateutil.relativedelta import relativedelta, FR from rdflib import Namespace, Literal sys.path.append("/my/proj/homeauto/lib") from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler from twilight import isWithinTwilight from rdfdoc import Doc ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") def update(masterGraph): stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o) now = datetime.datetime.now(tzlocal()) stmt(DEV.environment, ROOM.localHour, Literal(now.hour)) stmt(DEV.environment, ROOM.localTimeToMinute, Literal(now.strftime("%H:%M"))) stmt(DEV.environment, ROOM.localTimeToSecond, Literal(now.strftime("%H:%M:%S"))) stmt(DEV.environment, ROOM.localDayOfWeek, Literal(now.strftime("%A"))) stmt(DEV.environment, ROOM.localMonthDay, Literal(now.strftime("%B %e"))) stmt(DEV.environment, ROOM.localDate, Literal(now.strftime("%Y-%m-%d"))) for offset in range(-12, 7): d = now.date() + datetime.timedelta(days=offset) if d == d + relativedelta(day=31, weekday=FR(-1)): stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset)) stmt(DEV.calendar, ROOM.twilight, ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime']) def main(): from twisted.python import log as twlog twlog.startLogging(sys.stderr) masterGraph = PatchableGraph() class Application(cyclone.web.Application): def __init__(self): handlers = [ (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}), (r'/graph', CycloneGraphHandler, {'masterGraph': masterGraph}), (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/doc', Doc), # to be shared ] cyclone.web.Application.__init__(self, handlers, masterGraph=masterGraph) task.LoopingCall(update, masterGraph).start(1) reactor.listenTCP(9075, Application()) reactor.run() if __name__ == '__main__': main()