changeset 10:13438795d896

rewrite with prefect flows and whoosh search, but it's in a nested pdm env
author drewp@bigasterisk.com
date Thu, 11 Jul 2024 17:35:31 -0700
parents d1b54241a731
children 6622bacb0b84
files flow/build_index.py flow/download.py flow/env flow/local_types.py flow/pyproject.toml flow/schema.py flow/search_index.py search/meeting_docs.py
diffstat 8 files changed, 180 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/build_index.py	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,81 @@
+from pathlib import Path
+from typing import Iterable, cast
+
+import search_index
+from download import getCityMutableJson, getCityPermanent
+from local_types import MeetingRow, Url
+from lxml.html import fromstring
+from prefect import flow, task
+from prefect.logging import get_run_logger
+from search_index import SearchIndex
+
+log = None
+
+
+@task()
+def meetingListUrls() -> Iterable[Url]:
+    return [
+        "https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024",
+        "https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings",
+    ]
+
+
+def meetingAgendaUrl(mtg: MeetingRow) -> Url:
+    for doc in mtg.get('documentList', []):
+        if doc['templateName'] == 'HTML Agenda Packet':
+            tid = doc['templateId']
+            return f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}'
+    raise ValueError(f"no agenda doc found for {mtg['id']=}")
+
+
+def extractMeetingText(mhtml: str) -> list[str]:
+    el = fromstring(mhtml)
+    m = el.cssselect('div#meetingSection')[0]
+    for st in m.cssselect('style'):
+        st.clear()
+    meetingText = [
+        chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap)
+        if chunk.strip()
+    ]
+    return meetingText
+
+
+def addMeeting(index: SearchIndex, mtg: MeetingRow):
+    try:
+        agendaUrl = meetingAgendaUrl(mtg)
+    except ValueError:
+        pass
+    else:
+        html = getCityPermanent(agendaUrl)
+        text = extractMeetingText(html)
+        # todo group phrases
+        index.addDoc(title=f'{mtg["date"]} {mtg["title"]}', content=text)
+
+    try:
+        videoUrl = mtg['videoUrl']
+        if not videoUrl:
+            raise KeyError
+    except KeyError:
+        pass
+    else:
+        '''transcribe and index video...'''
+
+
+@flow(persist_result=True)
+def buildIndex():
+    global log
+    log = get_run_logger()
+    search_index.log = log
+
+    index = SearchIndex(Path('/tmp/scoindex'))
+    for url in meetingListUrls():
+        mtgs = cast(list[MeetingRow], getCityMutableJson(url))
+        log.info(f'got {len(mtgs)=}')
+
+        for mtg in mtgs:
+            addMeeting(index, mtg)
+    index.commit()
+
+
+if __name__ == '__main__':
+    buildIndex.serve()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/download.py	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,36 @@
+import datetime
+import time
+from local_types import Url
+
+import httpx
+from prefect import task
+from prefect.artifacts import create_link_artifact
+
+
+@task(
+    task_run_name=lambda: f'getHttp-{int(time.time())}',
+    cache_key_fn=lambda _, args: f'getHttp-{args["url"]}',
+    cache_expiration=datetime.timedelta(seconds=86400),
+    tags=['city'],  # todo ratelimit based on tag
+)
+def getCityMutableJson(url: Url):
+    create_link_artifact("get", url)
+    req = httpx.get(url)  # todo async
+    req.raise_for_status()
+    return req.json()
+
+
+@task(task_run_name=lambda: f'getHttp-{int(time.time())}',
+      cache_key_fn=lambda _, args: f'getHttp-{args["url"]}',
+      tags=['city'])
+def getCityPermanent(url: Url) -> str:
+    create_link_artifact("get", url)
+    req = httpx.get(url)
+    req.raise_for_status()
+    return req.text
+
+
+@task
+def getYoutubePermanent(url: str):
+    time.sleep(5)
+    return 'video' * 10000
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/env	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,2 @@
+PREFECT_API_URL=http://127.0.0.1:4200/api
+PREFECT_HOME=./prefect
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/local_types.py	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,5 @@
+from typing import NewType
+
+
+Url = NewType('Url', str)
+MeetingRow = NewType('MeetingRow', dict)
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/pyproject.toml	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,28 @@
+[project]
+name = "flow"
+version = "0.1.0"
+description = "Default template for PDM package"
+authors = [
+    {name = "", email = ""},
+]
+dependencies = [
+    "prefect>=2.19.7",
+    "lxml>=5.2.2",
+    "httpx>=0.27.0",
+    "cssselect>=1.2.0",
+    "whoosh>=2.7.4",
+    "ipython>=8.26.0",
+]
+requires-python = "==3.11.*"
+readme = "README.md"
+license = {text = "MIT"}
+
+
+[tool.pdm]
+distribution = false
+
+[tool.pdm.scripts]
+_.env_file = "env"
+run_prefect_server = "prefect server start"
+run_build_flow = "python build_index.py"
+start_build = "prefect deployment run buildIndex/buildIndex"
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/schema.py	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,4 @@
+from whoosh.fields import TEXT, Schema
+
+
+schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True))
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flow/search_index.py	Thu Jul 11 17:35:31 2024 -0700
@@ -0,0 +1,24 @@
+from pathlib import Path
+
+from whoosh.fields import ID
+from whoosh.index import create_in
+
+from schema import schema
+
+log = None  # set by flow
+
+
+class SearchIndex:
+
+    def __init__(self, indexDir: Path):
+        indexDir.mkdir(parents=True, exist_ok=True)
+        self.ix = create_in(indexDir, schema)
+        self.writer = self.ix.writer()
+
+    def addDoc(self, **kw):
+        self.writer.add_document(**kw)
+
+    def commit(self):
+        self.writer.commit()
+        with self.ix.searcher() as searcher:
+            log.info(f'index doc count = {searcher.doc_count()}')
--- a/search/meeting_docs.py	Wed Jul 10 12:25:06 2024 -0700
+++ b/search/meeting_docs.py	Thu Jul 11 17:35:31 2024 -0700
@@ -1,54 +0,0 @@
-import json
-from pathlib import Path
-from pprint import pprint
-import time
-import requests
-from lxml.html import fromstring
-
-
-def getMeetingText(meetingUrl) -> list[str]:
-    mhtml = requests.get(meetingUrl).text
-    el = fromstring(mhtml)
-    m = el.cssselect('div#meetingSection')[0]
-    for st in m.cssselect('style'):
-        st.clear()
-    meetingText = [
-        chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap)
-        if chunk.strip()
-    ]
-    return meetingText
-
-
-def gatherMtgs(mtg):
-    outDir = Path(f'data/albany/meetingId/{mtg["id"]}')
-    outDir.mkdir(parents=True, exist_ok=True)
-    outFile = outDir / 'agenda.json'
-    if outFile.exists():
-        return
-    meetingUrl = None
-    for doc in mtg.get('documentList', []):
-        if doc['templateName'] == 'HTML Agenda Packet':
-            tid = doc['templateId']
-            meetingUrl = f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}'
-
-    row = {
-        'created': time.time(),
-        'mtg': mtg,
-        'videoUrl': mtg['videoUrl'],
-        'meetingUrl': meetingUrl,
-        'phrases': getMeetingText(meetingUrl) if meetingUrl else [],
-    }
-    outFile.write_text(json.dumps(row, indent=2))
-    print(f'wrote {outFile}')
-
-
-for mtg in (requests.get(
-        "https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024"
-).json()):
-    gatherMtgs(mtg)
-
-for mtg in (requests.get(
-        "https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings"
-).json()):
-    gatherMtgs(mtg)
-    break