Mercurial > code > home > repos > homeauto
comparison service/rfid_pn532_py/rfid.py @ 1524:13b7e4de3824
whitespace
Ignore-this: c727f388f197a6fae88595fe6d455c7d
darcs-hash:971e4474b4dbd35a722be670f5599298fb5ec83f
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 05 Feb 2020 00:29:13 -0800 |
parents | 0da337780f22 |
children |
comparison
equal
deleted
inserted
replaced
1523:0da337780f22 | 1524:13b7e4de3824 |
---|---|
41 try: | 41 try: |
42 obj = Literal(float(turtleLiteral)) | 42 obj = Literal(float(turtleLiteral)) |
43 except ValueError: | 43 except ValueError: |
44 obj = Literal(turtleLiteral) | 44 obj = Literal(turtleLiteral) |
45 self._onStatements([(subj, pred, obj)]) | 45 self._onStatements([(subj, pred, obj)]) |
46 | 46 |
47 def _onGraphBodyStatements(self, body, headers): | 47 def _onGraphBodyStatements(self, body, headers): |
48 g = Graph() | 48 g = Graph() |
49 g.parse(StringInputSource(body), format='nt') | 49 g.parse(StringInputSource(body), format='nt') |
50 if not g: | 50 if not g: |
51 raise ValueError("expected graph body") | 51 raise ValueError("expected graph body") |
52 self._onStatements(list(g.triples((None, None, None)))) | 52 self._onStatements(list(g.triples((None, None, None)))) |
53 post = put | 53 post = put |
54 | 54 |
55 def _onStatements(self, stmts): | 55 def _onStatements(self, stmts): |
56 # write rfid to new key, etc. | 56 # write rfid to new key, etc. |
57 if len(stmts) > 0 and stmts[0][1] == ROOM['keyContents']: | 57 if len(stmts) > 0 and stmts[0][1] == ROOM['keyContents']: |
58 return | 58 return |
59 log.warn("ignoring %s", stmts) | 59 log.warn("ignoring %s", stmts) |
65 def randomBody(): | 65 def randomBody(): |
66 return BODY_VERSION + '*' + ''.join(random.choice(string.ascii_uppercase) for n in range(16 - 2)) | 66 return BODY_VERSION + '*' + ''.join(random.choice(string.ascii_uppercase) for n in range(16 - 2)) |
67 | 67 |
68 def looksLikeBigasterisk(text): | 68 def looksLikeBigasterisk(text): |
69 return text.startswith(BODY_VERSION + "*") | 69 return text.startswith(BODY_VERSION + "*") |
70 | 70 |
71 class Rewrite(cyclone.web.RequestHandler): | 71 class Rewrite(cyclone.web.RequestHandler): |
72 def post(self): | 72 def post(self): |
73 agent = URIRef(self.request.headers['x-foaf-agent']) | 73 agent = URIRef(self.request.headers['x-foaf-agent']) |
74 body = json.loads(self.request.body) | 74 body = json.loads(self.request.body) |
75 | 75 |
77 log.info('current card id: %r %r', _, uid) | 77 log.info('current card id: %r %r', _, uid) |
78 if uid is None: | 78 if uid is None: |
79 self.set_status(404, "no card present") | 79 self.set_status(404, "no card present") |
80 # maybe retry a few more times since the card might be nearby | 80 # maybe retry a few more times since the card might be nearby |
81 return | 81 return |
82 | 82 |
83 text = randomBody() | 83 text = randomBody() |
84 log.info('%s rewrites %s to %s, to be owned by %s', | 84 log.info('%s rewrites %s to %s, to be owned by %s', |
85 agent, uid, text, body['user']) | 85 agent, uid, text, body['user']) |
86 | 86 |
87 #reader.KEY = private.rfid_key | 87 #reader.KEY = private.rfid_key |
88 reader.write(uid, text) | 88 reader.write(uid, text) |
89 log.info('done with write') | 89 log.info('done with write') |
90 | 90 |
91 | 91 |
92 sensor = ROOM['frontDoorWindowRfid'] | 92 sensor = ROOM['frontDoorWindowRfid'] |
93 | 93 |
94 class ReadLoop(object): | 94 class ReadLoop(object): |
95 def __init__(self, reader, masterGraph, overwrite_any_tag): | 95 def __init__(self, reader, masterGraph, overwrite_any_tag): |
96 self.reader = reader | 96 self.reader = reader |
164 (sensor, ROOM['reading'], cardUri)): | 164 (sensor, ROOM['reading'], cardUri)): |
165 delQuads.append(spo + (ctx,)) | 165 delQuads.append(spo + (ctx,)) |
166 for spo in self.masterGraph._graph.triples( | 166 for spo in self.masterGraph._graph.triples( |
167 (cardUri, ROOM['cardText'], None)): | 167 (cardUri, ROOM['cardText'], None)): |
168 delQuads.append(spo + (ctx,)) | 168 delQuads.append(spo + (ctx,)) |
169 | 169 |
170 self.masterGraph.patch(Patch(addQuads=[], delQuads=delQuads)) | 170 self.masterGraph.patch(Patch(addQuads=[], delQuads=delQuads)) |
171 | 171 |
172 def _sendOneshot(self, oneshot): | 172 def _sendOneshot(self, oneshot): |
173 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) | 173 body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) |
174 for s,p,o in oneshot)).encode('utf8') | 174 for s,p,o in oneshot)).encode('utf8') |
175 url = b'http://bang:9071/oneShot' | 175 url = b'http://bang:9071/oneShot' |
176 d = fetch(method=b'POST', | 176 d = fetch(method=b'POST', |
181 def err(e): | 181 def err(e): |
182 log.info('oneshot post to %r failed: %s', | 182 log.info('oneshot post to %r failed: %s', |
183 url, e.getErrorMessage()) | 183 url, e.getErrorMessage()) |
184 d.addErrback(err) | 184 d.addErrback(err) |
185 | 185 |
186 | 186 |
187 | 187 |
188 if __name__ == '__main__': | 188 if __name__ == '__main__': |
189 arg = docopt(""" | 189 arg = docopt(""" |
190 Usage: rfid.py [options] | 190 Usage: rfid.py [options] |
191 | 191 |
192 -v Verbose | 192 -v Verbose |
197 if arg['-v']: | 197 if arg['-v']: |
198 enableTwistedLog() | 198 enableTwistedLog() |
199 log.setLevel(logging.DEBUG) | 199 log.setLevel(logging.DEBUG) |
200 log.info(f'cyclone {cyclone.__version__}') | 200 log.info(f'cyclone {cyclone.__version__}') |
201 defer.setDebugging(True) | 201 defer.setDebugging(True) |
202 | 202 |
203 masterGraph = PatchableGraph() | 203 masterGraph = PatchableGraph() |
204 reader = NfcDevice() if not arg['-n'] else FakeNfc() | 204 reader = NfcDevice() if not arg['-n'] else FakeNfc() |
205 | 205 |
206 ie=InfluxExporter(Graph()) | 206 ie=InfluxExporter(Graph()) |
207 ie.exportStats(STATS, ['root.cardReadPoll.count', | 207 ie.exportStats(STATS, ['root.cardReadPoll.count', |