Mercurial > code > home > repos > reposync
changeset 20:b59912649fc4
rewrite local hg scanner
author | drewp@bigasterisk.com |
---|---|
date | Sun, 09 Jan 2022 20:47:57 -0800 |
parents | 5751ef191454 |
children | cb71722bb75c |
files | deploy.yaml repo_github_status.py repo_local_status.py |
diffstat | 3 files changed, 146 insertions(+), 145 deletions(-) [+] |
line wrap: on
line diff
--- a/deploy.yaml Sun Jan 09 16:02:08 2022 -0800 +++ b/deploy.yaml Sun Jan 09 20:47:57 2022 -0800 @@ -29,8 +29,16 @@ - python3 - repo_github_status.py - "-v" - # volumeMounts: - # - {name: my, mountPath: /my} + - name: hg-status + image: bang5:5000/reposync_image + ports: + - containerPort: 8001 + command: + - python3 + - repo_local_status.py + - "-v" + volumeMounts: + - { name: my, mountPath: /my } volumes: - { name: my, persistentVolumeClaim: { claimName: my } } @@ -51,5 +59,6 @@ ports: - { port: 80, targetPort: 3000, name: http } - { port: 8000, targetPort: 8000, name: localrepos } + - { port: 8001, targetPort: 8001, name: githubrepos } selector: app: reposync
--- a/repo_github_status.py Sun Jan 09 16:02:08 2022 -0800 +++ b/repo_github_status.py Sun Jan 09 20:47:57 2022 -0800 @@ -183,10 +183,10 @@ def __init__(self): handlers = [ (r"/()", Index), - (r'/graph/localRepos', CycloneGraphHandler, { + (r'/graph/githubRepos', CycloneGraphHandler, { 'masterGraph': graph }), - (r'/graph/localRepos/events', CycloneGraphEventsHandler, { + (r'/graph/githubRepos/events', CycloneGraphEventsHandler, { 'masterGraph': graph, }), (r'/metrics', Metrics),
--- a/repo_local_status.py Sun Jan 09 16:02:08 2022 -0800 +++ b/repo_local_status.py Sun Jan 09 20:47:57 2022 -0800 @@ -7,19 +7,24 @@ import traceback from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional, Tuple +from typing import Dict, Optional, Set, Tuple import cyclone.httpserver import cyclone.sse import cyclone.web import docopt import treq -import tzlocal -from cycloneerr import PrettyErrorHandler +from background_loop import loop_forever_async from dateutil.parser import parse from dateutil.tz import tzlocal +from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) +from prometheus_client import Counter, Gauge from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY +from rdfdb.currentstategraphapi import CurrentStateGraphApi +from rdfdb.patch import Patch +from rdflib import RDF, Literal, Namespace, URIRef +from rdflib.term import Identifier from ruamel.yaml import YAML from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor @@ -27,9 +32,14 @@ from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput from patch_cyclone_sse import patchCycloneSse + patchCycloneSse() -githubOwner = 'drewp' +Quad = Tuple[Identifier, Identifier, Identifier, Identifier] +Triple = Tuple[Identifier, Identifier, Identifier] +EX = Namespace('http://example.com/') # todo + +HG_SYNC = Gauge('hg_sync', 'sync passes to hg') @inlineCallbacks @@ -40,131 +50,17 @@ returnValue(json.loads(j) if j else None) -@dataclass -class Repo: - path: Path - github: bool - _cache: Dict[str, Tuple[float, object]] = field(default_factory=dict) - - def _isStale(self, group) -> Optional[object]: - now = time.time() - if group not in self._cache: - return True - if now > self._cache[group][0] + 86400: - return True - print('fresh') - return False - - def _save(self, group, obj): - now = time.time() - self._cache[group] = (now, obj) - - def _get(self, group): - print('get') - return self._cache[group][1] - - @inlineCallbacks - def getStatus(self): - if self._isStale('status'): - try: - statusResp = yield runHg(self.path, ['status']) - except Exception as e: - status = {'error': repr(e)} - else: - unknowns = len([row for row in statusResp if row['status'] == '?']) - status = {'unknown': unknowns, 'changed': len(statusResp) - unknowns} - self._save('status', status) - returnValue(self._get('status')) - - @inlineCallbacks - def getLatestHgCommit(self): - if self._isStale('log'): - rows = yield runHg(self.path, ['log', '--limit', '1']) - commit = rows[0] - sec = commit['date'][0] - t = datetime.datetime.fromtimestamp(sec, tzlocal()) - self._save('log', {'email': commit['user'], 't': t.isoformat(), 'message': commit['desc']}) - returnValue(self._get('log')) - - @inlineCallbacks - def getLatestGithubCommit(self): - if self._isStale('github'): - resp = yield treq.get(f'https://api.github.com/repos/{githubOwner}/{self.path.name}/commits?per_page=1', - timeout=5, - headers={ - 'User-agent': 'reposync by github.com/drewp', - 'Accept': 'application/vnd.github.v3+json' - }) - ret = yield treq.json_content(resp) - commit = ret[0]['commit'] - t = parse(commit['committer']['date']).astimezone(tzlocal()).isoformat() - self._save('github', {'email': commit['committer']['email'], 't': t, 'message': commit['message']}) - returnValue(self._get('github')) +# merge this into setToGraph +def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]): + prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx))) - @inlineCallbacks - def clearGithubMaster(self): - '''bang(pts/13):/tmp/reset% git init -Initialized empty Git repository in /tmp/reset/.git/ -then github set current to a new branch called 'clearing' with https://developer.github.com/v3/repos/#update-a-repository -bang(pts/13):/tmp/reset% git remote add origin git@github.com:drewp/href.git -bang(pts/13):/tmp/reset% git push origin :master -To github.com:drewp/href.git - - [deleted] master -maybe --set-upstream origin -bang(pts/13):/tmp/reset% git remote set-branches origin master -? -then push -then github setdefault to master -then github delete clearing -''' - - @inlineCallbacks - def pushToGithub(self): - if not self.github: - raise ValueError - yield runHg(self.path, ['bookmark', '--rev', 'default', 'master']) - out = yield runHg(self.path, ['push', f'git+ssh://git@github.com/{githubOwner}/{self.path.name}.git']) - print(f'out fompushh {out}') - - -class GithubSync(PrettyErrorHandler, cyclone.web.RequestHandler): + currentStmts: Set[Quad] = set() + for tri in newTriples: + currentStmts.add(tri + (ctx,)) - @inlineCallbacks - def post(self): - try: - path = self.get_argument('repo') - repo = [r for r in self.settings.repos if str(r.path) == path][0] - yield repo.pushToGithub() - except Exception: - traceback.print_exc() - raise - - -class Statuses(cyclone.sse.SSEHandler): - - def update(self, key, data): - self.sendEvent(json.dumps({'key': key, 'update': data}).encode('utf8')) + p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts)) - def bind(self): - self.toProcess = self.settings.repos[:] - reactor.callLater(0, self.runOne) - - @inlineCallbacks - def runOne(self): - if not self.toProcess: - print('done') - return - repo = self.toProcess.pop(0) - - try: - update = {'path': str(repo.path), 'github': repo.github, 'status': (yield repo.getStatus()), 'hgLatest': (yield repo.getLatestHgCommit())} - if repo.github: - update['githubLatest'] = (yield repo.getLatestGithubCommit()) - self.update(str(repo.path), update) - except Exception: - log.warn(f'not reporting on {repo}') - traceback.print_exc() - reactor.callLater(0, self.runOne) + pg.patch(p) class Metrics(cyclone.web.RequestHandler): @@ -174,48 +70,144 @@ self.write(generate_latest(REGISTRY)) +class Index(cyclone.web.RequestHandler): + + def get(self, *args): + self.add_header('content-type', 'text/html') + self.write('''<!DOCTYPE html> + <html> + <head> + <title>repo_local_status</title> + </head> + <body> + <a href="graph/localRepos">graph</a> + </body> + </html>''') + + +# share with other files +@dataclass +class Repo: + path: Path + github: bool + + def uri(self): + return URIRef(f'http://bigasterisk.com/repo/{self.path.name}') + + def localInstance(self): + return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local') + + +@inlineCallbacks +def updateOne(graph, repo: Repo): + try: + writeLocalRepo(graph, repo) + yield writeStatus(graph, repo) + writeLatestHgCommit(graph, repo) + except Exception: + log.warning(f'While updating {repo}:') + log.warning(traceback.format_exc()) + + +def uriForHgNode(n: str) -> URIRef: + return URIRef(f'http://bigasterisk.com/hg/node/{n}') + +def uriForHgPhase(phase: str) -> URIRef: + return URIRef(f'http://bigasterisk.com/hg/phase/{phase}') + +@inlineCallbacks +def writeLatestHgCommit(graph, repo): + rows = yield runHg(repo.path, ['log', '--limit', '1']) + commit = rows[0] + sec = commit['date'][0] + t = datetime.datetime.fromtimestamp(sec, tzlocal()) + latest = uriForHgNode(commit['node']) + + new: Set[Triple] = set() + new.add((repo.localInstance(), EX['latestCommit'], latest)) + new.add((latest, RDF.type, EX['HgCommit'])) + new.add((latest, EX['email'], Literal(commit['user']))) + new.add((latest, EX['commitMessage'], Literal(commit['desc']))) + new.add((latest, EX['created'], Literal(t))) + new.add((latest, EX['phase'], uriForHgNode(commit['phase']))) + + for t in commit['tags']: + new.add((latest, EX['tag'], Literal(t))) + for p in commit['parents']: + new.add((latest, EX['parent'], uriForHgNode(p))) + + replaceContext(graph, URIRef(repo.localInstance() + '/log'), new) + + +@inlineCallbacks +def writeStatus(graph, repo): + statusResp = yield runHg(repo.path, ['status']) + unknowns = len([row for row in statusResp if row['status'] == '?']) + replaceContext( + graph, URIRef(repo.localInstance() + '/status'), { + (repo.localInstance(), EX['unknownCount'], Literal(unknowns)), + (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)), + }) + + +def writeLocalRepo(graph, repo: Repo): + replaceContext(graph, URIRef(repo.uri() + '/config'), { + (repo.uri(), EX['localRepo'], repo.localInstance()), + (repo.localInstance(), RDF.type, EX['HgRepo']), + }) + + +@inlineCallbacks +def update(graph, repos): + for r in repos: + yield updateOne(graph, r) + + def main(): args = docopt.docopt(''' Usage: - hg_status.py [options] + repo_local_status.py [options] Options: -v, --verbose more logging ''') verboseLogging(args['--verbose']) - # import sys - # sys.path.append('/usr/lib/python3/dist-packages') - # import OpenSSL - yaml = YAML(typ='safe') config = yaml.load(open('config.yaml')) repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']] + class PG2(PatchableGraph, CurrentStateGraphApi): + pass + + graph = PG2() + + @inlineCallbacks + def f(first): + yield update(graph, repos) + + loop_forever_async(f, 3600, HG_SYNC) + class Application(cyclone.web.Application): def __init__(self): handlers = [ - (r"/()", cyclone.web.StaticFileHandler, { - 'path': '.', - 'default_filename': 'index.html' + (r"/()", Index), + (r'/graph/localRepos', CycloneGraphHandler, { + 'masterGraph': graph }), - (r'/build/(bundle\.js)', cyclone.web.StaticFileHandler, { - 'path': './build/' + (r'/graph/localRepos/events', CycloneGraphEventsHandler, { + 'masterGraph': graph, }), - (r'/status/events', Statuses), - (r'/githubSync', GithubSync), (r'/metrics', Metrics), ] cyclone.web.Application.__init__( self, handlers, - repos=repos, debug=args['--verbose'], - template_path='.', ) - reactor.listenTCP(10001, Application()) + reactor.listenTCP(8001, Application(), interface='::') reactor.run()