comparison front_door_lock.py @ 8:caea36c8289f

don't try to reconnect mqtt (was broken); just fail a k8s health check
author drewp@bigasterisk.com
date Thu, 30 Nov 2023 22:43:58 -0800
parents 18c0dbfab73a
children
comparison
equal deleted inserted replaced
7:18c0dbfab73a 8:caea36c8289f
26 from rdfdb.patch import Patch 26 from rdfdb.patch import Patch
27 from rdflib import Literal, Namespace, URIRef 27 from rdflib import Literal, Namespace, URIRef
28 from starlette.applications import Starlette 28 from starlette.applications import Starlette
29 from starlette.exceptions import HTTPException 29 from starlette.exceptions import HTTPException
30 from starlette.requests import Request 30 from starlette.requests import Request
31 from starlette.responses import JSONResponse 31 from starlette.responses import JSONResponse, PlainTextResponse
32 from starlette.routing import Route 32 from starlette.routing import Route
33 from starlette_exporter import PrometheusMiddleware, handle_metrics 33 from starlette_exporter import PrometheusMiddleware, handle_metrics
34 from prometheus_client import Gauge 34 from prometheus_client import Gauge
35 from get_agent import getFoafAgent 35 from get_agent import getFoafAgent
36 36
113 113
114 hw: LockHardware 114 hw: LockHardware
115 topicRoot: str = 'frontdoorlock' 115 topicRoot: str = 'frontdoorlock'
116 116
117 def startup(self): 117 def startup(self):
118 asyncio.create_task(self._go()) 118 self.task = asyncio.create_task(self._go())
119 119
120 async def _go(self): 120 async def _go(self):
121 self.client = aiomqtt.Client("mqtt1", 1883, client_id="lock-service-%s" % time.time(), keepalive=6) 121 self.client = aiomqtt.Client("mqtt1", 1883, client_id="lock-service-%s" % time.time(), keepalive=6)
122 while True: 122 try:
123 try: 123 async with self.client:
124 async with self.client: 124 MQTT_CONNECTED.set(1)
125 MQTT_CONNECTED.set(1) 125 await self._handleMessages()
126 await self._handleMessages() 126 except aiomqtt.MqttError:
127 except aiomqtt.MqttError: 127 MQTT_CONNECTED.set(0)
128 MQTT_CONNECTED.set(0) 128 log.error('mqtt down', exc_info=True)
129 log.error('mqtt reconnecting', exc_info=True)
130 await asyncio.sleep(5)
131 finally:
132 MQTT_CONNECTED.set(0)
133 129
134 async def _handleMessages(self): 130 async def _handleMessages(self):
135 async with self.client.messages() as messages: 131 async with self.client.messages() as messages:
136 await self.client.subscribe(self.topicRoot + '/#') 132 await self.client.subscribe(self.topicRoot + '/#')
137 async for message in messages: 133 async for message in messages:
184 await hw.unlock(foafAgent, autoLock=False) 180 await hw.unlock(foafAgent, autoLock=False)
185 case _: 181 case _:
186 raise NotImplementedError(command) 182 raise NotImplementedError(command)
187 return JSONResponse({'ok': True}) 183 return JSONResponse({'ok': True})
188 184
185 def health(mqtt: MqttConnection, req: Request) -> PlainTextResponse:
186 if mqtt.task.done():
187 return PlainTextResponse('no mqtt task', status_code=500)
188 return PlainTextResponse('ok')
189
189 190
190 def main(): 191 def main():
191 graph = PatchableGraph() 192 graph = PatchableGraph()
192 hw = LockHardware(graph) 193 hw = LockHardware(graph)
193 mqtt = MqttConnection(hw) 194 mqtt = MqttConnection(hw)
194 hw.mqtt = mqtt 195 hw.mqtt = mqtt
195 app = Starlette(debug=True, 196 app = Starlette(debug=True,
196 on_startup=[mqtt.startup], 197 on_startup=[mqtt.startup],
197 routes=[ 198 routes=[
199 Route('/health', partial(health, mqtt)),
198 Route('/api/status', partial(status, graph)), 200 Route('/api/status', partial(status, graph)),
199 Route('/api/output', partial(output, graph)), 201 Route('/api/output', partial(output, graph)),
200 Route('/api/graph', StaticGraph(graph)), 202 Route('/api/graph', StaticGraph(graph)),
201 Route('/api/graph/events', GraphEvents(graph)), 203 Route('/api/graph/events', GraphEvents(graph)),
202 Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']), 204 Route('/api/simple/{command:str}', partial(simpleCommand, hw), methods=['PUT']),