comparison repo_local_status.py @ 20:b59912649fc4

rewrite local hg scanner
author drewp@bigasterisk.com
date Sun, 09 Jan 2022 20:47:57 -0800
parents 6f38aa08408d
children cb71722bb75c
comparison
equal deleted inserted replaced
19:5751ef191454 20:b59912649fc4
5 import json 5 import json
6 import time 6 import time
7 import traceback 7 import traceback
8 from dataclasses import dataclass, field 8 from dataclasses import dataclass, field
9 from pathlib import Path 9 from pathlib import Path
10 from typing import Dict, Optional, Tuple 10 from typing import Dict, Optional, Set, Tuple
11 11
12 import cyclone.httpserver 12 import cyclone.httpserver
13 import cyclone.sse 13 import cyclone.sse
14 import cyclone.web 14 import cyclone.web
15 import docopt 15 import docopt
16 import treq 16 import treq
17 import tzlocal 17 from background_loop import loop_forever_async
18 from cycloneerr import PrettyErrorHandler
19 from dateutil.parser import parse 18 from dateutil.parser import parse
20 from dateutil.tz import tzlocal 19 from dateutil.tz import tzlocal
20 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
21 from prometheus_client import Counter, Gauge
21 from prometheus_client.exposition import generate_latest 22 from prometheus_client.exposition import generate_latest
22 from prometheus_client.registry import REGISTRY 23 from prometheus_client.registry import REGISTRY
24 from rdfdb.currentstategraphapi import CurrentStateGraphApi
25 from rdfdb.patch import Patch
26 from rdflib import RDF, Literal, Namespace, URIRef
27 from rdflib.term import Identifier
23 from ruamel.yaml import YAML 28 from ruamel.yaml import YAML
24 from standardservice.logsetup import log, verboseLogging 29 from standardservice.logsetup import log, verboseLogging
25 from twisted.internet import reactor 30 from twisted.internet import reactor
26 from twisted.internet.defer import inlineCallbacks, returnValue 31 from twisted.internet.defer import inlineCallbacks, returnValue
27 from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput 32 from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput
28 33
29 from patch_cyclone_sse import patchCycloneSse 34 from patch_cyclone_sse import patchCycloneSse
35
30 patchCycloneSse() 36 patchCycloneSse()
31 37
32 githubOwner = 'drewp' 38 Quad = Tuple[Identifier, Identifier, Identifier, Identifier]
39 Triple = Tuple[Identifier, Identifier, Identifier]
40 EX = Namespace('http://example.com/') # todo
41
42 HG_SYNC = Gauge('hg_sync', 'sync passes to hg')
33 43
34 44
35 @inlineCallbacks 45 @inlineCallbacks
36 def runHg(cwd, args): 46 def runHg(cwd, args):
37 if args[0] not in ['push']: 47 if args[0] not in ['push']:
38 args.extend(['-T', 'json']) 48 args.extend(['-T', 'json'])
39 j = yield getProcessOutput('/usr/local/bin/hg', args, path=cwd) 49 j = yield getProcessOutput('/usr/local/bin/hg', args, path=cwd)
40 returnValue(json.loads(j) if j else None) 50 returnValue(json.loads(j) if j else None)
41 51
42 52
53 # merge this into setToGraph
54 def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]):
55 prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx)))
56
57 currentStmts: Set[Quad] = set()
58 for tri in newTriples:
59 currentStmts.add(tri + (ctx,))
60
61 p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts))
62
63 pg.patch(p)
64
65
66 class Metrics(cyclone.web.RequestHandler):
67
68 def get(self):
69 self.add_header('content-type', 'text/plain')
70 self.write(generate_latest(REGISTRY))
71
72
73 class Index(cyclone.web.RequestHandler):
74
75 def get(self, *args):
76 self.add_header('content-type', 'text/html')
77 self.write('''<!DOCTYPE html>
78 <html>
79 <head>
80 <title>repo_local_status</title>
81 </head>
82 <body>
83 <a href="graph/localRepos">graph</a>
84 </body>
85 </html>''')
86
87
88 # share with other files
43 @dataclass 89 @dataclass
44 class Repo: 90 class Repo:
45 path: Path 91 path: Path
46 github: bool 92 github: bool
47 _cache: Dict[str, Tuple[float, object]] = field(default_factory=dict) 93
48 94 def uri(self):
49 def _isStale(self, group) -> Optional[object]: 95 return URIRef(f'http://bigasterisk.com/repo/{self.path.name}')
50 now = time.time() 96
51 if group not in self._cache: 97 def localInstance(self):
52 return True 98 return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local')
53 if now > self._cache[group][0] + 86400: 99
54 return True 100
55 print('fresh') 101 @inlineCallbacks
56 return False 102 def updateOne(graph, repo: Repo):
57 103 try:
58 def _save(self, group, obj): 104 writeLocalRepo(graph, repo)
59 now = time.time() 105 yield writeStatus(graph, repo)
60 self._cache[group] = (now, obj) 106 writeLatestHgCommit(graph, repo)
61 107 except Exception:
62 def _get(self, group): 108 log.warning(f'While updating {repo}:')
63 print('get') 109 log.warning(traceback.format_exc())
64 return self._cache[group][1] 110
65 111
66 @inlineCallbacks 112 def uriForHgNode(n: str) -> URIRef:
67 def getStatus(self): 113 return URIRef(f'http://bigasterisk.com/hg/node/{n}')
68 if self._isStale('status'): 114
69 try: 115 def uriForHgPhase(phase: str) -> URIRef:
70 statusResp = yield runHg(self.path, ['status']) 116 return URIRef(f'http://bigasterisk.com/hg/phase/{phase}')
71 except Exception as e: 117
72 status = {'error': repr(e)} 118 @inlineCallbacks
73 else: 119 def writeLatestHgCommit(graph, repo):
74 unknowns = len([row for row in statusResp if row['status'] == '?']) 120 rows = yield runHg(repo.path, ['log', '--limit', '1'])
75 status = {'unknown': unknowns, 'changed': len(statusResp) - unknowns} 121 commit = rows[0]
76 self._save('status', status) 122 sec = commit['date'][0]
77 returnValue(self._get('status')) 123 t = datetime.datetime.fromtimestamp(sec, tzlocal())
78 124 latest = uriForHgNode(commit['node'])
79 @inlineCallbacks 125
80 def getLatestHgCommit(self): 126 new: Set[Triple] = set()
81 if self._isStale('log'): 127 new.add((repo.localInstance(), EX['latestCommit'], latest))
82 rows = yield runHg(self.path, ['log', '--limit', '1']) 128 new.add((latest, RDF.type, EX['HgCommit']))
83 commit = rows[0] 129 new.add((latest, EX['email'], Literal(commit['user'])))
84 sec = commit['date'][0] 130 new.add((latest, EX['commitMessage'], Literal(commit['desc'])))
85 t = datetime.datetime.fromtimestamp(sec, tzlocal()) 131 new.add((latest, EX['created'], Literal(t)))
86 self._save('log', {'email': commit['user'], 't': t.isoformat(), 'message': commit['desc']}) 132 new.add((latest, EX['phase'], uriForHgNode(commit['phase'])))
87 returnValue(self._get('log')) 133
88 134 for t in commit['tags']:
89 @inlineCallbacks 135 new.add((latest, EX['tag'], Literal(t)))
90 def getLatestGithubCommit(self): 136 for p in commit['parents']:
91 if self._isStale('github'): 137 new.add((latest, EX['parent'], uriForHgNode(p)))
92 resp = yield treq.get(f'https://api.github.com/repos/{githubOwner}/{self.path.name}/commits?per_page=1', 138
93 timeout=5, 139 replaceContext(graph, URIRef(repo.localInstance() + '/log'), new)
94 headers={ 140
95 'User-agent': 'reposync by github.com/drewp', 141
96 'Accept': 'application/vnd.github.v3+json' 142 @inlineCallbacks
97 }) 143 def writeStatus(graph, repo):
98 ret = yield treq.json_content(resp) 144 statusResp = yield runHg(repo.path, ['status'])
99 commit = ret[0]['commit'] 145 unknowns = len([row for row in statusResp if row['status'] == '?'])
100 t = parse(commit['committer']['date']).astimezone(tzlocal()).isoformat() 146 replaceContext(
101 self._save('github', {'email': commit['committer']['email'], 't': t, 'message': commit['message']}) 147 graph, URIRef(repo.localInstance() + '/status'), {
102 returnValue(self._get('github')) 148 (repo.localInstance(), EX['unknownCount'], Literal(unknowns)),
103 149 (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)),
104 @inlineCallbacks 150 })
105 def clearGithubMaster(self): 151
106 '''bang(pts/13):/tmp/reset% git init 152
107 Initialized empty Git repository in /tmp/reset/.git/ 153 def writeLocalRepo(graph, repo: Repo):
108 then github set current to a new branch called 'clearing' with https://developer.github.com/v3/repos/#update-a-repository 154 replaceContext(graph, URIRef(repo.uri() + '/config'), {
109 bang(pts/13):/tmp/reset% git remote add origin git@github.com:drewp/href.git 155 (repo.uri(), EX['localRepo'], repo.localInstance()),
110 bang(pts/13):/tmp/reset% git push origin :master 156 (repo.localInstance(), RDF.type, EX['HgRepo']),
111 To github.com:drewp/href.git 157 })
112 - [deleted] master 158
113 maybe --set-upstream origin 159
114 bang(pts/13):/tmp/reset% git remote set-branches origin master 160 @inlineCallbacks
115 ? 161 def update(graph, repos):
116 then push 162 for r in repos:
117 then github setdefault to master 163 yield updateOne(graph, r)
118 then github delete clearing
119 '''
120
121 @inlineCallbacks
122 def pushToGithub(self):
123 if not self.github:
124 raise ValueError
125 yield runHg(self.path, ['bookmark', '--rev', 'default', 'master'])
126 out = yield runHg(self.path, ['push', f'git+ssh://git@github.com/{githubOwner}/{self.path.name}.git'])
127 print(f'out fompushh {out}')
128
129
130 class GithubSync(PrettyErrorHandler, cyclone.web.RequestHandler):
131
132 @inlineCallbacks
133 def post(self):
134 try:
135 path = self.get_argument('repo')
136 repo = [r for r in self.settings.repos if str(r.path) == path][0]
137 yield repo.pushToGithub()
138 except Exception:
139 traceback.print_exc()
140 raise
141
142
143 class Statuses(cyclone.sse.SSEHandler):
144
145 def update(self, key, data):
146 self.sendEvent(json.dumps({'key': key, 'update': data}).encode('utf8'))
147
148 def bind(self):
149 self.toProcess = self.settings.repos[:]
150 reactor.callLater(0, self.runOne)
151
152 @inlineCallbacks
153 def runOne(self):
154 if not self.toProcess:
155 print('done')
156 return
157 repo = self.toProcess.pop(0)
158
159 try:
160 update = {'path': str(repo.path), 'github': repo.github, 'status': (yield repo.getStatus()), 'hgLatest': (yield repo.getLatestHgCommit())}
161 if repo.github:
162 update['githubLatest'] = (yield repo.getLatestGithubCommit())
163 self.update(str(repo.path), update)
164 except Exception:
165 log.warn(f'not reporting on {repo}')
166 traceback.print_exc()
167 reactor.callLater(0, self.runOne)
168
169
170 class Metrics(cyclone.web.RequestHandler):
171
172 def get(self):
173 self.add_header('content-type', 'text/plain')
174 self.write(generate_latest(REGISTRY))
175 164
176 165
177 def main(): 166 def main():
178 args = docopt.docopt(''' 167 args = docopt.docopt('''
179 Usage: 168 Usage:
180 hg_status.py [options] 169 repo_local_status.py [options]
181 170
182 Options: 171 Options:
183 -v, --verbose more logging 172 -v, --verbose more logging
184 ''') 173 ''')
185 verboseLogging(args['--verbose']) 174 verboseLogging(args['--verbose'])
186 175
187 # import sys
188 # sys.path.append('/usr/lib/python3/dist-packages')
189 # import OpenSSL
190
191 yaml = YAML(typ='safe') 176 yaml = YAML(typ='safe')
192 config = yaml.load(open('config.yaml')) 177 config = yaml.load(open('config.yaml'))
193 repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']] 178 repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']]
194 179
180 class PG2(PatchableGraph, CurrentStateGraphApi):
181 pass
182
183 graph = PG2()
184
185 @inlineCallbacks
186 def f(first):
187 yield update(graph, repos)
188
189 loop_forever_async(f, 3600, HG_SYNC)
190
195 class Application(cyclone.web.Application): 191 class Application(cyclone.web.Application):
196 192
197 def __init__(self): 193 def __init__(self):
198 handlers = [ 194 handlers = [
199 (r"/()", cyclone.web.StaticFileHandler, { 195 (r"/()", Index),
200 'path': '.', 196 (r'/graph/localRepos', CycloneGraphHandler, {
201 'default_filename': 'index.html' 197 'masterGraph': graph
202 }), 198 }),
203 (r'/build/(bundle\.js)', cyclone.web.StaticFileHandler, { 199 (r'/graph/localRepos/events', CycloneGraphEventsHandler, {
204 'path': './build/' 200 'masterGraph': graph,
205 }), 201 }),
206 (r'/status/events', Statuses),
207 (r'/githubSync', GithubSync),
208 (r'/metrics', Metrics), 202 (r'/metrics', Metrics),
209 ] 203 ]
210 cyclone.web.Application.__init__( 204 cyclone.web.Application.__init__(
211 self, 205 self,
212 handlers, 206 handlers,
213 repos=repos,
214 debug=args['--verbose'], 207 debug=args['--verbose'],
215 template_path='.',
216 ) 208 )
217 209
218 reactor.listenTCP(10001, Application()) 210 reactor.listenTCP(8001, Application(), interface='::')
219 reactor.run() 211 reactor.run()
220 212
221 213
222 if __name__ == '__main__': 214 if __name__ == '__main__':
223 main() 215 main()