view service/environment/environment.py @ 413:5fc75de6b905

use the right py3 cyclone patch Ignore-this: b6d7f3fbe10fc777040d0db67cd61751
author drewp@bigasterisk.com
date Sat, 23 Mar 2019 04:37:47 -0700
parents 596c645a1fc5
children 6304b0370491
line wrap: on
line source

#!/usr/bin/python
"""
return some rdf about the environment, e.g. the current time,
daytime/night, overall modes like 'maintenance mode', etc

"""
import sys, datetime, cyclone.web
from twisted.internet import reactor, task
from dateutil.tz import tzlocal
from dateutil.relativedelta import relativedelta, FR
from rdflib import Namespace, Literal
sys.path.append("/my/proj/homeauto/lib")
from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
from twilight import isWithinTwilight

from rdfdoc import Doc

ROOM = Namespace("http://projects.bigasterisk.com/room/")
DEV = Namespace("http://projects.bigasterisk.com/device/")

def update(masterGraph):
    stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o)
    
    now = datetime.datetime.now(tzlocal())

    stmt(DEV.environment, ROOM.localHour, Literal(now.hour))
    stmt(DEV.environment, ROOM.localTimeToMinute,
         Literal(now.strftime("%H:%M")))

    stmt(DEV.environment, ROOM.localTimeToSecond,
         Literal(now.strftime("%H:%M:%S")))

    stmt(DEV.environment, ROOM.localDayOfWeek,
         Literal(now.strftime("%A")))
    stmt(DEV.environment, ROOM.localMonthDay,
         Literal(now.strftime("%B %e")))
    stmt(DEV.environment, ROOM.localDate,
         Literal(now.strftime("%Y-%m-%d")))

    for offset in range(-12, 7):
        d = now.date() + datetime.timedelta(days=offset)
        if d == d + relativedelta(day=31, weekday=FR(-1)):
            stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset))

    stmt(DEV.calendar, ROOM.twilight,
         ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime'])

       
def main():
    from twisted.python import log as twlog
    twlog.startLogging(sys.stderr)
    masterGraph = PatchableGraph()

    class Application(cyclone.web.Application):
        def __init__(self):
            handlers = [
                (r"/()",
                 cyclone.web.StaticFileHandler,
                 {"path": ".", "default_filename": "index.html"}),
                (r'/graph',
                 CycloneGraphHandler, {'masterGraph': masterGraph}),
                (r'/graph/events',
                 CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
                (r'/doc', Doc), # to be shared
            ]
            cyclone.web.Application.__init__(self, handlers,
                                             masterGraph=masterGraph)
    task.LoopingCall(update, masterGraph).start(1)
    reactor.listenTCP(9075, Application())
    reactor.run()

if __name__ == '__main__':
    main()