Mercurial > code > home > repos > homeauto
view service/reasoning/oneShot @ 1699:411f5e013201 0.12.0
release 0.12.0
author | drewp@bigasterisk.com |
---|---|
date | Tue, 12 Oct 2021 20:18:59 -0700 |
parents | c8562ace4917 |
children |
line wrap: on
line source
#!/usr/bin/python3 """ send a statement to the reasoning server for one update cycle. Args are s/p/o in n3 notation, with many prefixes predefined here. """ import json import os import subprocess import sys import time import requests s, p, o = sys.argv[1:] prefixes = { '': 'http://projects.bigasterisk.com/room/', 'room': 'http://projects.bigasterisk.com/room/', 'shuttle': 'http://bigasterisk.com/room/livingRoom/shuttlepro/', 'sensor': 'http://bigasterisk.com/homeauto/sensor/', } def expand(term): if ':' not in term or term.startswith(('<', '"', "'")): return term left, right = term.split(':', 1) if left in prefixes: return '<%s%s>' % (prefixes[left], right) return term pod = json.loads(subprocess.check_output(["kubectl", "get", "pod", "--selector=app=reasoning", "-o", "json"])) ip = pod['items'][0]['status']['podIP'] stmt = '%s %s %s .' % (expand(s), expand(p), expand(o)) print("Sending: %s" % stmt) t1 = time.time() ret = requests.post('http://%s/oneShot' % os.environ.get('REASONING', f'{ip}:9071'), headers={"content-type": "text/n3"}, data=stmt.encode('ascii')) g = float(ret.headers['x-graph-ms']) print("%.1f ms for graph update; %.1f ms other overhead" % (g, 1000 * (time.time() - t1) - g))