view repo_local_status.py @ 24:03803832a087 default tip

add view definition for streamed-graph viewer
author drewp@bigasterisk.com
date Tue, 29 Mar 2022 21:17:56 -0700
parents b9fe6d26b3fa
children
line wrap: on
line source

"""
configured hg dirs and settings -> rdf graph
"""
import datetime
import json
import time
import traceback
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Set, Tuple

import cyclone.httpserver
import cyclone.sse
import cyclone.web
import docopt
import treq
from background_loop import loop_forever_async
from dateutil.parser import parse
from dateutil.tz import tzlocal
from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
from prometheus_client import Counter, Gauge
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY
from rdfdb.currentstategraphapi import CurrentStateGraphApi
from rdfdb.patch import Patch
from rdflib import RDF, Literal, Namespace, URIRef
from rdflib.term import Identifier
from ruamel.yaml import YAML
from standardservice.logsetup import log, verboseLogging
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput

from patch_cyclone_sse import patchCycloneSse

patchCycloneSse()

Quad = Tuple[Identifier, Identifier, Identifier, Identifier]
Triple = Tuple[Identifier, Identifier, Identifier]
EX = Namespace('http://example.com/')  # todo

HG_SYNC = Gauge('hg_sync', 'sync passes to hg')

_venv = None


@inlineCallbacks
def runHg(cwd, args):
    global _venv
    if _venv is None:
        _venv = yield getProcessOutput('pipenv', ['--venv'])
        _venv = _venv.decode('ascii').strip()

    if args[0] not in ['push']:
        args.extend(['-T', 'json'])
    j = yield getProcessOutput(_venv + '/bin/hg', ['--cwd', str(cwd)] + args, env={'LANG': 'C.UTF-8'})
    returnValue(json.loads(j) if j else None)


# merge this into setToGraph
def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]):
    prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx)))

    currentStmts: Set[Quad] = set()
    for tri in newTriples:
        currentStmts.add(tri + (ctx,))

    p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts))

    pg.patch(p)


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))


class Index(cyclone.web.RequestHandler):

    def get(self, *args):
        self.add_header('content-type', 'text/html')
        self.write('''<!DOCTYPE html>
        <html>
          <head>
            <title>repo_local_status</title>
          </head>
          <body>
            <a href="graph/localRepos">graph</a>
          </body>
        </html>''')


# share with other files
@dataclass
class Repo:
    path: Path
    github: bool

    def uri(self):
        return URIRef(f'http://bigasterisk.com/repo/{self.path.name}')

    def localInstance(self):
        return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local')


@inlineCallbacks
def updateOne(graph, repo: Repo):
    try:
        writeLocalRepo(graph, repo)
        yield writeStatus(graph, repo)
        writeLatestHgCommit(graph, repo)
    except Exception:
        log.warning(f'While updating {repo}:')
        log.warning(traceback.format_exc())


def uriForHgNode(n: str) -> URIRef:
    return URIRef(f'http://bigasterisk.com/hg/node/{n}')


def uriForHgPhase(phase: str) -> URIRef:
    return URIRef(f'http://bigasterisk.com/hg/phase/{phase}')


@inlineCallbacks
def writeLatestHgCommit(graph, repo):
    rows = yield runHg(repo.path, ['log', '--limit', '1'])
    commit = rows[0]
    sec = commit['date'][0]
    t = datetime.datetime.fromtimestamp(sec, tzlocal())
    latest = uriForHgNode(commit['node'])

    new: Set[Triple] = set()
    new.add((repo.localInstance(), EX['latestCommit'], latest))
    new.add((latest, RDF.type, EX['HgCommit']))
    new.add((latest, EX['email'], Literal(commit['user'])))
    new.add((latest, EX['commitMessage'], Literal(commit['desc'])))
    new.add((latest, EX['created'], Literal(t)))
    new.add((latest, EX['phase'], uriForHgNode(commit['phase'])))

    for t in commit['tags']:
        new.add((latest, EX['tag'], Literal(t)))
    for p in commit['parents']:
        new.add((latest, EX['parent'], uriForHgNode(p)))

    replaceContext(graph, URIRef(repo.localInstance() + '/log'), new)


@inlineCallbacks
def writeStatus(graph, repo):
    statusResp = yield runHg(repo.path, ['status'])
    unknowns = len([row for row in statusResp if row['status'] == '?'])
    replaceContext(graph, URIRef(repo.localInstance() + '/status'), {
        (repo.localInstance(), EX['unknownCount'], Literal(unknowns)),
        (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)),
    })


def writeLocalRepo(graph, repo: Repo):
    replaceContext(graph, URIRef(repo.uri() + '/config'), {
        (repo.uri(), RDF.type, EX['Repo']),
        (repo.uri(), EX['localRepo'], repo.localInstance()),
        (repo.localInstance(), RDF.type, EX['HgRepo']),
    })


@inlineCallbacks
def update(graph, repos):
    for r in repos:
        yield updateOne(graph, r)


def main():
    args = docopt.docopt('''
Usage:
  repo_local_status.py [options]

Options:
  -v, --verbose  more logging
''')
    verboseLogging(args['--verbose'])

    yaml = YAML(typ='safe')
    config = yaml.load(open('config.yaml'))
    repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']]

    class PG2(PatchableGraph, CurrentStateGraphApi):
        pass

    graph = PG2()

    @inlineCallbacks
    def f(first):
        yield update(graph, repos)

    loop_forever_async(f, 3600, HG_SYNC)

    class Application(cyclone.web.Application):

        def __init__(self):
            handlers = [
                (r"/()", Index),
                (r'/graph/localRepos', CycloneGraphHandler, {
                    'masterGraph': graph
                }),
                (r'/graph/localRepos/events', CycloneGraphEventsHandler, {
                    'masterGraph': graph,
                }),
                (r'/metrics', Metrics),
            ]
            cyclone.web.Application.__init__(
                self,
                handlers,
                debug=args['--verbose'],
            )

    reactor.listenTCP(8001, Application(), interface='::')
    reactor.run()


if __name__ == '__main__':
    main()