changeset 20:b59912649fc4

rewrite local hg scanner
author drewp@bigasterisk.com
date Sun, 09 Jan 2022 20:47:57 -0800
parents 5751ef191454
children cb71722bb75c
files deploy.yaml repo_github_status.py repo_local_status.py
diffstat 3 files changed, 146 insertions(+), 145 deletions(-) [+]
line wrap: on
line diff
--- a/deploy.yaml	Sun Jan 09 16:02:08 2022 -0800
+++ b/deploy.yaml	Sun Jan 09 20:47:57 2022 -0800
@@ -29,8 +29,16 @@
             - python3
             - repo_github_status.py
             - "-v"
-        #   volumeMounts:
-        #     - {name: my, mountPath: /my}
+        - name: hg-status
+          image: bang5:5000/reposync_image
+          ports:
+            - containerPort: 8001
+          command:
+            - python3
+            - repo_local_status.py
+            - "-v"
+          volumeMounts:
+            - { name: my, mountPath: /my }
 
       volumes:
         - { name: my, persistentVolumeClaim: { claimName: my } }
@@ -51,5 +59,6 @@
   ports:
     - { port: 80, targetPort: 3000, name: http }
     - { port: 8000, targetPort: 8000, name: localrepos }
+    - { port: 8001, targetPort: 8001, name: githubrepos }
   selector:
     app: reposync
--- a/repo_github_status.py	Sun Jan 09 16:02:08 2022 -0800
+++ b/repo_github_status.py	Sun Jan 09 20:47:57 2022 -0800
@@ -183,10 +183,10 @@
         def __init__(self):
             handlers = [
                 (r"/()", Index),
-                (r'/graph/localRepos', CycloneGraphHandler, {
+                (r'/graph/githubRepos', CycloneGraphHandler, {
                     'masterGraph': graph
                 }),
-                (r'/graph/localRepos/events', CycloneGraphEventsHandler, {
+                (r'/graph/githubRepos/events', CycloneGraphEventsHandler, {
                     'masterGraph': graph,
                 }),
                 (r'/metrics', Metrics),
--- a/repo_local_status.py	Sun Jan 09 16:02:08 2022 -0800
+++ b/repo_local_status.py	Sun Jan 09 20:47:57 2022 -0800
@@ -7,19 +7,24 @@
 import traceback
 from dataclasses import dataclass, field
 from pathlib import Path
-from typing import Dict, Optional, Tuple
+from typing import Dict, Optional, Set, Tuple
 
 import cyclone.httpserver
 import cyclone.sse
 import cyclone.web
 import docopt
 import treq
-import tzlocal
-from cycloneerr import PrettyErrorHandler
+from background_loop import loop_forever_async
 from dateutil.parser import parse
 from dateutil.tz import tzlocal
+from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
+from prometheus_client import Counter, Gauge
 from prometheus_client.exposition import generate_latest
 from prometheus_client.registry import REGISTRY
+from rdfdb.currentstategraphapi import CurrentStateGraphApi
+from rdfdb.patch import Patch
+from rdflib import RDF, Literal, Namespace, URIRef
+from rdflib.term import Identifier
 from ruamel.yaml import YAML
 from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor
@@ -27,9 +32,14 @@
 from twisted.internet.utils import _UnexpectedErrorOutput, getProcessOutput
 
 from patch_cyclone_sse import patchCycloneSse
+
 patchCycloneSse()
 
-githubOwner = 'drewp'
+Quad = Tuple[Identifier, Identifier, Identifier, Identifier]
+Triple = Tuple[Identifier, Identifier, Identifier]
+EX = Namespace('http://example.com/')  # todo
+
+HG_SYNC = Gauge('hg_sync', 'sync passes to hg')
 
 
 @inlineCallbacks
@@ -40,131 +50,17 @@
     returnValue(json.loads(j) if j else None)
 
 
-@dataclass
-class Repo:
-    path: Path
-    github: bool
-    _cache: Dict[str, Tuple[float, object]] = field(default_factory=dict)
-
-    def _isStale(self, group) -> Optional[object]:
-        now = time.time()
-        if group not in self._cache:
-            return True
-        if now > self._cache[group][0] + 86400:
-            return True
-        print('fresh')
-        return False
-
-    def _save(self, group, obj):
-        now = time.time()
-        self._cache[group] = (now, obj)
-
-    def _get(self, group):
-        print('get')
-        return self._cache[group][1]
-
-    @inlineCallbacks
-    def getStatus(self):
-        if self._isStale('status'):
-            try:
-                statusResp = yield runHg(self.path, ['status'])
-            except Exception as e:
-                status = {'error': repr(e)}
-            else:
-                unknowns = len([row for row in statusResp if row['status'] == '?'])
-                status = {'unknown': unknowns, 'changed': len(statusResp) - unknowns}
-            self._save('status', status)
-        returnValue(self._get('status'))
-
-    @inlineCallbacks
-    def getLatestHgCommit(self):
-        if self._isStale('log'):
-            rows = yield runHg(self.path, ['log', '--limit', '1'])
-            commit = rows[0]
-            sec = commit['date'][0]
-            t = datetime.datetime.fromtimestamp(sec, tzlocal())
-            self._save('log', {'email': commit['user'], 't': t.isoformat(), 'message': commit['desc']})
-        returnValue(self._get('log'))
-
-    @inlineCallbacks
-    def getLatestGithubCommit(self):
-        if self._isStale('github'):
-            resp = yield treq.get(f'https://api.github.com/repos/{githubOwner}/{self.path.name}/commits?per_page=1',
-                                  timeout=5,
-                                  headers={
-                                      'User-agent': 'reposync by github.com/drewp',
-                                      'Accept': 'application/vnd.github.v3+json'
-                                  })
-            ret = yield treq.json_content(resp)
-            commit = ret[0]['commit']
-            t = parse(commit['committer']['date']).astimezone(tzlocal()).isoformat()
-            self._save('github', {'email': commit['committer']['email'], 't': t, 'message': commit['message']})
-        returnValue(self._get('github'))
+# merge this into setToGraph
+def replaceContext(pg: PatchableGraph, ctx: URIRef, newTriples: Set[Triple]):
+    prevCtxStmts = set((s, p, o, g.identifier) for s, p, o, g in pg._graph.quads((None, None, None, ctx)))
 
-    @inlineCallbacks
-    def clearGithubMaster(self):
-        '''bang(pts/13):/tmp/reset% git init
-Initialized empty Git repository in /tmp/reset/.git/
-then github set current to a new branch called 'clearing' with https://developer.github.com/v3/repos/#update-a-repository
-bang(pts/13):/tmp/reset% git remote add origin git@github.com:drewp/href.git
-bang(pts/13):/tmp/reset% git push origin :master
-To github.com:drewp/href.git
- - [deleted]         master
-maybe --set-upstream origin
-bang(pts/13):/tmp/reset% git remote set-branches origin master
-?
-then push
-then github setdefault to master
-then github delete clearing
-'''
-
-    @inlineCallbacks
-    def pushToGithub(self):
-        if not self.github:
-            raise ValueError
-        yield runHg(self.path, ['bookmark', '--rev', 'default', 'master'])
-        out = yield runHg(self.path, ['push', f'git+ssh://git@github.com/{githubOwner}/{self.path.name}.git'])
-        print(f'out fompushh {out}')
-
-
-class GithubSync(PrettyErrorHandler, cyclone.web.RequestHandler):
+    currentStmts: Set[Quad] = set()
+    for tri in newTriples:
+        currentStmts.add(tri + (ctx,))
 
-    @inlineCallbacks
-    def post(self):
-        try:
-            path = self.get_argument('repo')
-            repo = [r for r in self.settings.repos if str(r.path) == path][0]
-            yield repo.pushToGithub()
-        except Exception:
-            traceback.print_exc()
-            raise
-
-
-class Statuses(cyclone.sse.SSEHandler):
-
-    def update(self, key, data):
-        self.sendEvent(json.dumps({'key': key, 'update': data}).encode('utf8'))
+    p = Patch(delQuads=prevCtxStmts.difference(currentStmts), addQuads=currentStmts.difference(prevCtxStmts))
 
-    def bind(self):
-        self.toProcess = self.settings.repos[:]
-        reactor.callLater(0, self.runOne)
-
-    @inlineCallbacks
-    def runOne(self):
-        if not self.toProcess:
-            print('done')
-            return
-        repo = self.toProcess.pop(0)
-
-        try:
-            update = {'path': str(repo.path), 'github': repo.github, 'status': (yield repo.getStatus()), 'hgLatest': (yield repo.getLatestHgCommit())}
-            if repo.github:
-                update['githubLatest'] = (yield repo.getLatestGithubCommit())
-            self.update(str(repo.path), update)
-        except Exception:
-            log.warn(f'not reporting on {repo}')
-            traceback.print_exc()
-        reactor.callLater(0, self.runOne)
+    pg.patch(p)
 
 
 class Metrics(cyclone.web.RequestHandler):
@@ -174,48 +70,144 @@
         self.write(generate_latest(REGISTRY))
 
 
+class Index(cyclone.web.RequestHandler):
+
+    def get(self, *args):
+        self.add_header('content-type', 'text/html')
+        self.write('''<!DOCTYPE html>
+        <html>
+          <head>
+            <title>repo_local_status</title>
+          </head>
+          <body>
+            <a href="graph/localRepos">graph</a>
+          </body>
+        </html>''')
+
+
+# share with other files
+@dataclass
+class Repo:
+    path: Path
+    github: bool
+
+    def uri(self):
+        return URIRef(f'http://bigasterisk.com/repo/{self.path.name}')
+
+    def localInstance(self):
+        return URIRef(f'http://bigasterisk.com/repo/{self.path.name}/local')
+
+
+@inlineCallbacks
+def updateOne(graph, repo: Repo):
+    try:
+        writeLocalRepo(graph, repo)
+        yield writeStatus(graph, repo)
+        writeLatestHgCommit(graph, repo)
+    except Exception:
+        log.warning(f'While updating {repo}:')
+        log.warning(traceback.format_exc())
+
+
+def uriForHgNode(n: str) -> URIRef:
+    return URIRef(f'http://bigasterisk.com/hg/node/{n}')
+
+def uriForHgPhase(phase: str) -> URIRef:
+    return URIRef(f'http://bigasterisk.com/hg/phase/{phase}')
+
+@inlineCallbacks
+def writeLatestHgCommit(graph, repo):
+    rows = yield runHg(repo.path, ['log', '--limit', '1'])
+    commit = rows[0]
+    sec = commit['date'][0]
+    t = datetime.datetime.fromtimestamp(sec, tzlocal())
+    latest = uriForHgNode(commit['node'])
+
+    new: Set[Triple] = set()
+    new.add((repo.localInstance(), EX['latestCommit'], latest))
+    new.add((latest, RDF.type, EX['HgCommit']))
+    new.add((latest, EX['email'], Literal(commit['user'])))
+    new.add((latest, EX['commitMessage'], Literal(commit['desc'])))
+    new.add((latest, EX['created'], Literal(t)))
+    new.add((latest, EX['phase'], uriForHgNode(commit['phase'])))
+
+    for t in commit['tags']:
+        new.add((latest, EX['tag'], Literal(t)))
+    for p in commit['parents']:
+        new.add((latest, EX['parent'], uriForHgNode(p)))
+
+    replaceContext(graph, URIRef(repo.localInstance() + '/log'), new)
+
+
+@inlineCallbacks
+def writeStatus(graph, repo):
+    statusResp = yield runHg(repo.path, ['status'])
+    unknowns = len([row for row in statusResp if row['status'] == '?'])
+    replaceContext(
+        graph, URIRef(repo.localInstance() + '/status'), {
+            (repo.localInstance(), EX['unknownCount'], Literal(unknowns)),
+            (repo.localInstance(), EX['changed'], Literal(len(statusResp) - unknowns)),
+        })
+
+
+def writeLocalRepo(graph, repo: Repo):
+    replaceContext(graph, URIRef(repo.uri() + '/config'), {
+        (repo.uri(), EX['localRepo'], repo.localInstance()),
+        (repo.localInstance(), RDF.type, EX['HgRepo']),
+    })
+
+
+@inlineCallbacks
+def update(graph, repos):
+    for r in repos:
+        yield updateOne(graph, r)
+
+
 def main():
     args = docopt.docopt('''
 Usage:
-  hg_status.py [options]
+  repo_local_status.py [options]
 
 Options:
   -v, --verbose  more logging
 ''')
     verboseLogging(args['--verbose'])
 
-    # import sys
-    # sys.path.append('/usr/lib/python3/dist-packages')
-    # import OpenSSL
-
     yaml = YAML(typ='safe')
     config = yaml.load(open('config.yaml'))
     repos = [Repo(Path(row['dir']), row['github']) for row in config['hg_repos']]
 
+    class PG2(PatchableGraph, CurrentStateGraphApi):
+        pass
+
+    graph = PG2()
+
+    @inlineCallbacks
+    def f(first):
+        yield update(graph, repos)
+
+    loop_forever_async(f, 3600, HG_SYNC)
+
     class Application(cyclone.web.Application):
 
         def __init__(self):
             handlers = [
-                (r"/()", cyclone.web.StaticFileHandler, {
-                    'path': '.',
-                    'default_filename': 'index.html'
+                (r"/()", Index),
+                (r'/graph/localRepos', CycloneGraphHandler, {
+                    'masterGraph': graph
                 }),
-                (r'/build/(bundle\.js)', cyclone.web.StaticFileHandler, {
-                    'path': './build/'
+                (r'/graph/localRepos/events', CycloneGraphEventsHandler, {
+                    'masterGraph': graph,
                 }),
-                (r'/status/events', Statuses),
-                (r'/githubSync', GithubSync),
                 (r'/metrics', Metrics),
             ]
             cyclone.web.Application.__init__(
                 self,
                 handlers,
-                repos=repos,
                 debug=args['--verbose'],
-                template_path='.',
             )
 
-    reactor.listenTCP(10001, Application())
+    reactor.listenTCP(8001, Application(), interface='::')
     reactor.run()