Mercurial > code > home > repos > reposync
comparison repo_local_status.py @ 20:b59912649fc4
rewrite local hg scanner
author | drewp@bigasterisk.com |
---|---|
date | Sun, 09 Jan 2022 20:47:57 -0800 |
parents | 6f38aa08408d |
children | cb71722bb75c |
comparison
equal
deleted
inserted
replaced
19:5751ef191454 | 20:b59912649fc4 |
---|---|
5 import json | 5 import json |
6 import time | 6 import time |
7 import traceback | 7 import traceback |
8 from dataclasses import dataclass, field | 8 from dataclasses import dataclass, field |
9 from pathlib import Path | 9 from pathlib import Path |
10 from typing import Dict, Optional, Tuple | 10 from typing import Dict, Optional, Set, Tuple |
11 | 11 |
12 import cyclone.httpserver | 12 import cyclone.httpserver |
13 import cyclone.sse | 13 import cyclone.sse |
14 import cyclone.web | 14 import cyclone.web |
15 import docopt | 15 import docopt |
16 import treq | 16 import treq |
17 import tzlocal | 17 from background_loop import loop_forever_async |
18 from cycloneerr import PrettyErrorHandler | |
19 from dateutil.parser import parse | 18 from dateutil.parser import parse |
20 from dateutil.tz import tzlocal | 19 from dateutil.tz import tzlocal |
20 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) | |
21 from prometheus_client import Counter, Gauge | |
21 from prometheus_client.exposition import generate_latest | 22 from prometheus_client.exposition import generate_latest |
22 from prometheus_client.registry import REGISTRY | 23 from prometheus_client.registry import REGISTRY |
24 from rdfdb.currentstategraphapi import CurrentStateGraphApi | |
25 from rdfdb.patch import Patch | |
26 from rdflib import RDF, Literal, Namespace, URIRef | |
27 from rdflib.term import Identifier | |
23 from ruamel.yaml import YAML | 28 from ruamel.yaml import YAML |
24 from standardservice.logsetup import log, verboseLogging | 29 from standardservice.logsetup import log, verboseLogging |
25 from twisted.internet import reactor | 30 from twisted.internet import reactor |
26 from twisted.internet.defer import inlineCallbacks, returnValue | 31 from twisted.internet.defer import inlineCallbacks, returnValue |
27 from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput | 32 from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput |
28 | 33 |
29 from patch_cyclone_sse import patchCycloneSse | 34 from patch_cyclone_sse import patchCycloneSse |
35 | |
30 patchCycloneSse() | 36 patchCycloneSse() |
31 | 37 |
32 githubOwner = 'drewp' | 38 Quad = Tuple[Identifier, Identifier, Identifier, Identifier] |
39 Triple = Tuple[Identifier, Identifier, Identifier] | |
40 EX = Namespace('http://example.com/') # todo | |
41 | |
42 HG_SYNC = Gauge('hg_sync', 'sync passes to hg') | |
33 | 43 |
34 | 44 |
35 @inlineCallbacks | 45 @inlineCallbacks |
36 def runHg(cwd, args): | 46 def runHg(cwd, args): |
37 if args[0] not in ['push']: | 47 if args[0] not in ['push']: |
38 args.extend(['-T', 'json']) | 48 args.extend(['-T', 'json']) |
39 j = yield getProcessOutput('/usr/local/bin/hg', args, path=cwd) | 49 j = yield getProcessOutput('/usr/local/bin/hg', args, path=cwd) |
40 returnValue(json.loads(j) if j else None) | 50 returnValue(json.loads(j) if j else None) |
41 | 51 |
42 | 52 |
53 # merge this into setToGraph | |
54 def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]): | |
55 prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx))) | |
56 | |
57 currentStmts: Set[Quad] = set() | |
58 for tri in newTriples: | |
59 currentStmts.add(tri + (ctx,)) | |
60 | |
61 p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts)) | |
62 | |
63 pg.patch(p) | |
64 | |
65 | |
66 class Metrics(cyclone.web.RequestHandler): | |
67 | |
68 def get(self): | |
69 self.add_header('content-type', 'text/plain') | |
70 self.write(generate_latest(REGISTRY)) | |
71 | |
72 | |
73 class Index(cyclone.web.RequestHandler): | |
74 | |
75 def get(self, *args): | |
76 self.add_header('content-type', 'text/html') | |
77 self.write('''<!DOCTYPE html> | |
78 <html> | |
79 <head> | |
80 <title>repo_local_status</title> | |
81 </head> | |
82 <body> | |
83 <a href="graph/localRepos">graph</a> | |
84 </body> | |
85 </html>''') | |
86 | |
87 | |
88 # share with other files | |
43 @dataclass | 89 @dataclass |
44 class Repo: | 90 class Repo: |
45 path: Path | 91 path: Path |
46 github: bool | 92 github: bool |
47 _cache: Dict[str, Tuple[float, object]] = field(default_factory=dict) | 93 |
48 | 94 def uri(self): |
49 def _isStale(self, group) -> Optional[object]: | 95 return URIRef(f'http://bigasterisk.com/repo/{self.path.name}') |
50 now = time.time() | 96 |
51 if group not in self._cache: | 97 def localInstance(self): |
52 return True | 98 return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local') |
53 if now > self._cache[group][0] + 86400: | 99 |
54 return True | 100 |
55 print('fresh') | 101 @inlineCallbacks |
56 return False | 102 def updateOne(graph, repo: Repo): |
57 | 103 try: |
58 def _save(self, group, obj): | 104 writeLocalRepo(graph, repo) |
59 now = time.time() | 105 yield writeStatus(graph, repo) |
60 self._cache[group] = (now, obj) | 106 writeLatestHgCommit(graph, repo) |
61 | 107 except Exception: |
62 def _get(self, group): | 108 log.warning(f'While updating {repo}:') |
63 print('get') | 109 log.warning(traceback.format_exc()) |
64 return self._cache[group][1] | 110 |
65 | 111 |
66 @inlineCallbacks | 112 def uriForHgNode(n: str) -> URIRef: |
67 def getStatus(self): | 113 return URIRef(f'http://bigasterisk.com/hg/node/{n}') |
68 if self._isStale('status'): | 114 |
69 try: | 115 def uriForHgPhase(phase: str) -> URIRef: |
70 statusResp = yield runHg(self.path, ['status']) | 116 return URIRef(f'http://bigasterisk.com/hg/phase/{phase}') |
71 except Exception as e: | 117 |
72 status = {'error': repr(e)} | 118 @inlineCallbacks |
73 else: | 119 def writeLatestHgCommit(graph, repo): |
74 unknowns = len([row for row in statusResp if row['status'] == '?']) | 120 rows = yield runHg(repo.path, ['log', '--limit', '1']) |
75 status = {'unknown': unknowns, 'changed': len(statusResp) - unknowns} | 121 commit = rows[0] |
76 self._save('status', status) | 122 sec = commit['date'][0] |
77 returnValue(self._get('status')) | 123 t = datetime.datetime.fromtimestamp(sec, tzlocal()) |
78 | 124 latest = uriForHgNode(commit['node']) |
79 @inlineCallbacks | 125 |
80 def getLatestHgCommit(self): | 126 new: Set[Triple] = set() |
81 if self._isStale('log'): | 127 new.add((repo.localInstance(), EX['latestCommit'], latest)) |
82 rows = yield runHg(self.path, ['log', '--limit', '1']) | 128 new.add((latest, RDF.type, EX['HgCommit'])) |
83 commit = rows[0] | 129 new.add((latest, EX['email'], Literal(commit['user']))) |
84 sec = commit['date'][0] | 130 new.add((latest, EX['commitMessage'], Literal(commit['desc']))) |
85 t = datetime.datetime.fromtimestamp(sec, tzlocal()) | 131 new.add((latest, EX['created'], Literal(t))) |
86 self._save('log', {'email': commit['user'], 't': t.isoformat(), 'message': commit['desc']}) | 132 new.add((latest, EX['phase'], uriForHgNode(commit['phase']))) |
87 returnValue(self._get('log')) | 133 |
88 | 134 for t in commit['tags']: |
89 @inlineCallbacks | 135 new.add((latest, EX['tag'], Literal(t))) |
90 def getLatestGithubCommit(self): | 136 for p in commit['parents']: |
91 if self._isStale('github'): | 137 new.add((latest, EX['parent'], uriForHgNode(p))) |
92 resp = yield treq.get(f'https://api.github.com/repos/{githubOwner}/{self.path.name}/commits?per_page=1', | 138 |
93 timeout=5, | 139 replaceContext(graph, URIRef(repo.localInstance() + '/log'), new) |
94 headers={ | 140 |
95 'User-agent': 'reposync by github.com/drewp', | 141 |
96 'Accept': 'application/vnd.github.v3+json' | 142 @inlineCallbacks |
97 }) | 143 def writeStatus(graph, repo): |
98 ret = yield treq.json_content(resp) | 144 statusResp = yield runHg(repo.path, ['status']) |
99 commit = ret[0]['commit'] | 145 unknowns = len([row for row in statusResp if row['status'] == '?']) |
100 t = parse(commit['committer']['date']).astimezone(tzlocal()).isoformat() | 146 replaceContext( |
101 self._save('github', {'email': commit['committer']['email'], 't': t, 'message': commit['message']}) | 147 graph, URIRef(repo.localInstance() + '/status'), { |
102 returnValue(self._get('github')) | 148 (repo.localInstance(), EX['unknownCount'], Literal(unknowns)), |
103 | 149 (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)), |
104 @inlineCallbacks | 150 }) |
105 def clearGithubMaster(self): | 151 |
106 '''bang(pts/13):/tmp/reset% git init | 152 |
107 Initialized empty Git repository in /tmp/reset/.git/ | 153 def writeLocalRepo(graph, repo: Repo): |
108 then github set current to a new branch called 'clearing' with https://developer.github.com/v3/repos/#update-a-repository | 154 replaceContext(graph, URIRef(repo.uri() + '/config'), { |
109 bang(pts/13):/tmp/reset% git remote add origin git@github.com:drewp/href.git | 155 (repo.uri(), EX['localRepo'], repo.localInstance()), |
110 bang(pts/13):/tmp/reset% git push origin :master | 156 (repo.localInstance(), RDF.type, EX['HgRepo']), |
111 To github.com:drewp/href.git | 157 }) |
112 - [deleted] master | 158 |
113 maybe --set-upstream origin | 159 |
114 bang(pts/13):/tmp/reset% git remote set-branches origin master | 160 @inlineCallbacks |
115 ? | 161 def update(graph, repos): |
116 then push | 162 for r in repos: |
117 then github setdefault to master | 163 yield updateOne(graph, r) |
118 then github delete clearing | |
119 ''' | |
120 | |
121 @inlineCallbacks | |
122 def pushToGithub(self): | |
123 if not self.github: | |
124 raise ValueError | |
125 yield runHg(self.path, ['bookmark', '--rev', 'default', 'master']) | |
126 out = yield runHg(self.path, ['push', f'git+ssh://git@github.com/{githubOwner}/{self.path.name}.git']) | |
127 print(f'out fompushh {out}') | |
128 | |
129 | |
130 class GithubSync(PrettyErrorHandler, cyclone.web.RequestHandler): | |
131 | |
132 @inlineCallbacks | |
133 def post(self): | |
134 try: | |
135 path = self.get_argument('repo') | |
136 repo = [r for r in self.settings.repos if str(r.path) == path][0] | |
137 yield repo.pushToGithub() | |
138 except Exception: | |
139 traceback.print_exc() | |
140 raise | |
141 | |
142 | |
143 class Statuses(cyclone.sse.SSEHandler): | |
144 | |
145 def update(self, key, data): | |
146 self.sendEvent(json.dumps({'key': key, 'update': data}).encode('utf8')) | |
147 | |
148 def bind(self): | |
149 self.toProcess = self.settings.repos[:] | |
150 reactor.callLater(0, self.runOne) | |
151 | |
152 @inlineCallbacks | |
153 def runOne(self): | |
154 if not self.toProcess: | |
155 print('done') | |
156 return | |
157 repo = self.toProcess.pop(0) | |
158 | |
159 try: | |
160 update = {'path': str(repo.path), 'github': repo.github, 'status': (yield repo.getStatus()), 'hgLatest': (yield repo.getLatestHgCommit())} | |
161 if repo.github: | |
162 update['githubLatest'] = (yield repo.getLatestGithubCommit()) | |
163 self.update(str(repo.path), update) | |
164 except Exception: | |
165 log.warn(f'not reporting on {repo}') | |
166 traceback.print_exc() | |
167 reactor.callLater(0, self.runOne) | |
168 | |
169 | |
170 class Metrics(cyclone.web.RequestHandler): | |
171 | |
172 def get(self): | |
173 self.add_header('content-type', 'text/plain') | |
174 self.write(generate_latest(REGISTRY)) | |
175 | 164 |
176 | 165 |
177 def main(): | 166 def main(): |
178 args = docopt.docopt(''' | 167 args = docopt.docopt(''' |
179 Usage: | 168 Usage: |
180 hg_status.py [options] | 169 repo_local_status.py [options] |
181 | 170 |
182 Options: | 171 Options: |
183 -v, --verbose more logging | 172 -v, --verbose more logging |
184 ''') | 173 ''') |
185 verboseLogging(args['--verbose']) | 174 verboseLogging(args['--verbose']) |
186 | 175 |
187 # import sys | |
188 # sys.path.append('/usr/lib/python3/dist-packages') | |
189 # import OpenSSL | |
190 | |
191 yaml = YAML(typ='safe') | 176 yaml = YAML(typ='safe') |
192 config = yaml.load(open('config.yaml')) | 177 config = yaml.load(open('config.yaml')) |
193 repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']] | 178 repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']] |
194 | 179 |
180 class PG2(PatchableGraph, CurrentStateGraphApi): | |
181 pass | |
182 | |
183 graph = PG2() | |
184 | |
185 @inlineCallbacks | |
186 def f(first): | |
187 yield update(graph, repos) | |
188 | |
189 loop_forever_async(f, 3600, HG_SYNC) | |
190 | |
195 class Application(cyclone.web.Application): | 191 class Application(cyclone.web.Application): |
196 | 192 |
197 def __init__(self): | 193 def __init__(self): |
198 handlers = [ | 194 handlers = [ |
199 (r"/()", cyclone.web.StaticFileHandler, { | 195 (r"/()", Index), |
200 'path': '.', | 196 (r'/graph/localRepos', CycloneGraphHandler, { |
201 'default_filename': 'index.html' | 197 'masterGraph': graph |
202 }), | 198 }), |
203 (r'/build/(bundle\.js)', cyclone.web.StaticFileHandler, { | 199 (r'/graph/localRepos/events', CycloneGraphEventsHandler, { |
204 'path': './build/' | 200 'masterGraph': graph, |
205 }), | 201 }), |
206 (r'/status/events', Statuses), | |
207 (r'/githubSync', GithubSync), | |
208 (r'/metrics', Metrics), | 202 (r'/metrics', Metrics), |
209 ] | 203 ] |
210 cyclone.web.Application.__init__( | 204 cyclone.web.Application.__init__( |
211 self, | 205 self, |
212 handlers, | 206 handlers, |
213 repos=repos, | |
214 debug=args['--verbose'], | 207 debug=args['--verbose'], |
215 template_path='.', | |
216 ) | 208 ) |
217 | 209 |
218 reactor.listenTCP(10001, Application()) | 210 reactor.listenTCP(8001, Application(), interface='::') |
219 reactor.run() | 211 reactor.run() |
220 | 212 |
221 | 213 |
222 if __name__ == '__main__': | 214 if __name__ == '__main__': |
223 main() | 215 main() |