Mercurial > code > home > repos > environment
view environment.py @ 1:0f532eb91364
switch from slow pipenv to fast pdm
author | drewp@bigasterisk.com |
---|---|
date | Thu, 31 Mar 2022 23:49:08 -0700 |
parents | 3c1bc3bc5a6c |
children | e7f33fa31883 |
line wrap: on
line source
#!/usr/bin/python """ return some rdf about the environment, e.g. the current time, daytime/night, overall modes like 'maintenance mode', etc """ import datetime import cyclone.web from dateutil.relativedelta import FR, relativedelta from dateutil.tz import tzlocal from docopt import docopt from rdflib import Literal, Namespace from twisted.internet import defer, reactor, task from standardservice.logsetup import log, verboseLogging from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) from patch_cyclone import patch_sse from rdfdoc import Doc from twilight import isWithinTwilight patch_sse() ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") # STATS = scales.collection( # '/root', # scales.PmfStat('update'), # ) # see pending fix in patchablegraph.py class CycloneGraphEventsHandlerWithCors(CycloneGraphEventsHandler): def __init__(self, application, request, masterGraph, allowOrigins=None): super().__init__(application, request, masterGraph) self.allowOrigins = allowOrigins or [] def flush(self): origin = self.request.headers.get('origin', None) if origin and origin in self.allowOrigins: self.set_header(b"Access-Control-Allow-Origin", origin.encode('utf8')) self.set_header(b"Access-Control-Allow-Credentials", b"true") return CycloneGraphEventsHandler.flush(self) # @STATS.update.time() def update(masterGraph): stmt = lambda s, p, o: masterGraph.patchObject(ROOM.environment, s, p, o) now = datetime.datetime.now(tzlocal()) stmt(DEV.environment, ROOM.localHour, Literal(now.hour)) stmt(DEV.environment, ROOM.localTimeToMinute, Literal(now.strftime("%H:%M"))) stmt(DEV.environment, ROOM.localTimeToSecond, Literal(now.strftime("%H:%M:%S"))) stmt(DEV.environment, ROOM.localDayOfWeek, Literal(now.strftime("%A"))) stmt(DEV.environment, ROOM.localMonthDay, Literal(now.strftime("%B %e"))) stmt(DEV.environment, ROOM.localDate, Literal(now.strftime("%Y-%m-%d"))) for offset in range(-12, 7): d = now.date() + datetime.timedelta(days=offset) if d == d + relativedelta(day=31, weekday=FR(-1)): stmt(DEV.calendar, ROOM.daysToLastFridayOfMonth, Literal(offset)) stmt(DEV.calendar, ROOM.twilight, ROOM['withinTwilight'] if isWithinTwilight(now) else ROOM['daytime']) def main(): arg = docopt(""" Usage: environment.py [options] -v Verbose """) verboseLogging(arg['-v']) masterGraph = PatchableGraph() class Application(cyclone.web.Application): def __init__(self): handlers = [ (r"/()", cyclone.web.StaticFileHandler, { "path": ".", "default_filename": "index.html" }), (r'/graph/environment', CycloneGraphHandler, { 'masterGraph': masterGraph }), (r'/graph/environment/events', CycloneGraphEventsHandlerWithCors, { 'masterGraph': masterGraph, 'allowOrigins': ['http://localhost:8001'], }), (r'/doc', Doc), # to be shared # (r'/stats/(.*)', StatsHandler, { # 'serverName': 'environment' # }), ] cyclone.web.Application.__init__(self, handlers, masterGraph=masterGraph) task.LoopingCall(update, masterGraph).start(1) reactor.listenTCP(8000, Application()) reactor.run() if __name__ == '__main__': main()