Mercurial > code > home > repos > reposync
view repo_local_status.py @ 22:b9fe6d26b3fa
minor changes, fixes, upgrades
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Mar 2022 21:15:51 -0700 |
parents | cb71722bb75c |
children |
line wrap: on
line source
""" configured hg dirs and settings -> rdf graph """ import datetime import json import time import traceback from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Optional, Set, Tuple import cyclone.httpserver import cyclone.sse import cyclone.web import docopt import treq from background_loop import loop_forever_async from dateutil.parser import parse from dateutil.tz import tzlocal from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) from prometheus_client import Counter, Gauge from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY from rdfdb.currentstategraphapi import CurrentStateGraphApi from rdfdb.patch import Patch from rdflib import RDF, Literal, Namespace, URIRef from rdflib.term import Identifier from ruamel.yaml import YAML from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput from patch_cyclone_sse import patchCycloneSse patchCycloneSse() Quad = Tuple[Identifier, Identifier, Identifier, Identifier] Triple = Tuple[Identifier, Identifier, Identifier] EX = Namespace('http://example.com/') # todo HG_SYNC = Gauge('hg_sync', 'sync passes to hg') _venv = None @inlineCallbacks def runHg(cwd, args): global _venv if _venv is None: _venv = yield getProcessOutput('pipenv', ['--venv']) _venv = _venv.decode('ascii').strip() if args[0] not in ['push']: args.extend(['-T', 'json']) j = yield getProcessOutput(_venv + '/bin/hg', ['--cwd', str(cwd)] + args, env={'LANG': 'C.UTF-8'}) returnValue(json.loads(j) if j else None) # merge this into setToGraph def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]): prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx))) currentStmts: Set[Quad] = set() for tri in newTriples: currentStmts.add(tri + (ctx,)) p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts)) pg.patch(p) class Metrics(cyclone.web.RequestHandler): def get(self): self.add_header('content-type', 'text/plain') self.write(generate_latest(REGISTRY)) class Index(cyclone.web.RequestHandler): def get(self, *args): self.add_header('content-type', 'text/html') self.write('''<!DOCTYPE html> <html> <head> <title>repo_local_status</title> </head> <body> <a href="graph/localRepos">graph</a> </body> </html>''') # share with other files @dataclass class Repo: path: Path github: bool def uri(self): return URIRef(f'http://bigasterisk.com/repo/{self.path.name}') def localInstance(self): return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local') @inlineCallbacks def updateOne(graph, repo: Repo): try: writeLocalRepo(graph, repo) yield writeStatus(graph, repo) writeLatestHgCommit(graph, repo) except Exception: log.warning(f'While updating {repo}:') log.warning(traceback.format_exc()) def uriForHgNode(n: str) -> URIRef: return URIRef(f'http://bigasterisk.com/hg/node/{n}') def uriForHgPhase(phase: str) -> URIRef: return URIRef(f'http://bigasterisk.com/hg/phase/{phase}') @inlineCallbacks def writeLatestHgCommit(graph, repo): rows = yield runHg(repo.path, ['log', '--limit', '1']) commit = rows[0] sec = commit['date'][0] t = datetime.datetime.fromtimestamp(sec, tzlocal()) latest = uriForHgNode(commit['node']) new: Set[Triple] = set() new.add((repo.localInstance(), EX['latestCommit'], latest)) new.add((latest, RDF.type, EX['HgCommit'])) new.add((latest, EX['email'], Literal(commit['user']))) new.add((latest, EX['commitMessage'], Literal(commit['desc']))) new.add((latest, EX['created'], Literal(t))) new.add((latest, EX['phase'], uriForHgNode(commit['phase']))) for t in commit['tags']: new.add((latest, EX['tag'], Literal(t))) for p in commit['parents']: new.add((latest, EX['parent'], uriForHgNode(p))) replaceContext(graph, URIRef(repo.localInstance() + '/log'), new) @inlineCallbacks def writeStatus(graph, repo): statusResp = yield runHg(repo.path, ['status']) unknowns = len([row for row in statusResp if row['status'] == '?']) replaceContext(graph, URIRef(repo.localInstance() + '/status'), { (repo.localInstance(), EX['unknownCount'], Literal(unknowns)), (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)), }) def writeLocalRepo(graph, repo: Repo): replaceContext(graph, URIRef(repo.uri() + '/config'), { (repo.uri(), RDF.type, EX['Repo']), (repo.uri(), EX['localRepo'], repo.localInstance()), (repo.localInstance(), RDF.type, EX['HgRepo']), }) @inlineCallbacks def update(graph, repos): for r in repos: yield updateOne(graph, r) def main(): args = docopt.docopt(''' Usage: repo_local_status.py [options] Options: -v, --verbose more logging ''') verboseLogging(args['--verbose']) yaml = YAML(typ='safe') config = yaml.load(open('config.yaml')) repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']] class PG2(PatchableGraph, CurrentStateGraphApi): pass graph = PG2() @inlineCallbacks def f(first): yield update(graph, repos) loop_forever_async(f, 3600, HG_SYNC) class Application(cyclone.web.Application): def __init__(self): handlers = [ (r"/()", Index), (r'/graph/localRepos', CycloneGraphHandler, { 'masterGraph': graph }), (r'/graph/localRepos/events', CycloneGraphEventsHandler, { 'masterGraph': graph, }), (r'/metrics', Metrics), ] cyclone.web.Application.__init__( self, handlers, debug=args['--verbose'], ) reactor.listenTCP(8001, Application(), interface='::') reactor.run() if __name__ == '__main__': main()