Mercurial > code > home > repos > sco-bot
changeset 10:13438795d896
rewrite with prefect flows and whoosh search, but it's in a nested pdm env
author | drewp@bigasterisk.com |
---|---|
date | Thu, 11 Jul 2024 17:35:31 -0700 |
parents | d1b54241a731 |
children | 6622bacb0b84 |
files | flow/build_index.py flow/download.py flow/env flow/local_types.py flow/pyproject.toml flow/schema.py flow/search_index.py search/meeting_docs.py |
diffstat | 8 files changed, 180 insertions(+), 54 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/build_index.py Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,81 @@ +from pathlib import Path +from typing import Iterable, cast + +import search_index +from download import getCityMutableJson, getCityPermanent +from local_types import MeetingRow, Url +from lxml.html import fromstring +from prefect import flow, task +from prefect.logging import get_run_logger +from search_index import SearchIndex + +log = None + + +@task() +def meetingListUrls() -> Iterable[Url]: + return [ + "https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024", + "https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings", + ] + + +def meetingAgendaUrl(mtg: MeetingRow) -> Url: + for doc in mtg.get('documentList', []): + if doc['templateName'] == 'HTML Agenda Packet': + tid = doc['templateId'] + return f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}' + raise ValueError(f"no agenda doc found for {mtg['id']=}") + + +def extractMeetingText(mhtml: str) -> list[str]: + el = fromstring(mhtml) + m = el.cssselect('div#meetingSection')[0] + for st in m.cssselect('style'): + st.clear() + meetingText = [ + chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap) + if chunk.strip() + ] + return meetingText + + +def addMeeting(index: SearchIndex, mtg: MeetingRow): + try: + agendaUrl = meetingAgendaUrl(mtg) + except ValueError: + pass + else: + html = getCityPermanent(agendaUrl) + text = extractMeetingText(html) + # todo group phrases + index.addDoc(title=f'{mtg["date"]} {mtg["title"]}', content=text) + + try: + videoUrl = mtg['videoUrl'] + if not videoUrl: + raise KeyError + except KeyError: + pass + else: + '''transcribe and index video...''' + + +@flow(persist_result=True) +def buildIndex(): + global log + log = get_run_logger() + search_index.log = log + + index = SearchIndex(Path('/tmp/scoindex')) + for url in meetingListUrls(): + mtgs = cast(list[MeetingRow], getCityMutableJson(url)) + log.info(f'got {len(mtgs)=}') + + for mtg in mtgs: + addMeeting(index, mtg) + index.commit() + + +if __name__ == '__main__': + buildIndex.serve()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/download.py Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,36 @@ +import datetime +import time +from local_types import Url + +import httpx +from prefect import task +from prefect.artifacts import create_link_artifact + + +@task( + task_run_name=lambda: f'getHttp-{int(time.time())}', + cache_key_fn=lambda _, args: f'getHttp-{args["url"]}', + cache_expiration=datetime.timedelta(seconds=86400), + tags=['city'], # todo ratelimit based on tag +) +def getCityMutableJson(url: Url): + create_link_artifact("get", url) + req = httpx.get(url) # todo async + req.raise_for_status() + return req.json() + + +@task(task_run_name=lambda: f'getHttp-{int(time.time())}', + cache_key_fn=lambda _, args: f'getHttp-{args["url"]}', + tags=['city']) +def getCityPermanent(url: Url) -> str: + create_link_artifact("get", url) + req = httpx.get(url) + req.raise_for_status() + return req.text + + +@task +def getYoutubePermanent(url: str): + time.sleep(5) + return 'video' * 10000
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/env Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,2 @@ +PREFECT_API_URL=http://127.0.0.1:4200/api +PREFECT_HOME=./prefect
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/local_types.py Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,5 @@ +from typing import NewType + + +Url = NewType('Url', str) +MeetingRow = NewType('MeetingRow', dict) \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/pyproject.toml Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,28 @@ +[project] +name = "flow" +version = "0.1.0" +description = "Default template for PDM package" +authors = [ + {name = "", email = ""}, +] +dependencies = [ + "prefect>=2.19.7", + "lxml>=5.2.2", + "httpx>=0.27.0", + "cssselect>=1.2.0", + "whoosh>=2.7.4", + "ipython>=8.26.0", +] +requires-python = "==3.11.*" +readme = "README.md" +license = {text = "MIT"} + + +[tool.pdm] +distribution = false + +[tool.pdm.scripts] +_.env_file = "env" +run_prefect_server = "prefect server start" +run_build_flow = "python build_index.py" +start_build = "prefect deployment run buildIndex/buildIndex" \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/schema.py Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,4 @@ +from whoosh.fields import TEXT, Schema + + +schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True)) \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flow/search_index.py Thu Jul 11 17:35:31 2024 -0700 @@ -0,0 +1,24 @@ +from pathlib import Path + +from whoosh.fields import ID +from whoosh.index import create_in + +from schema import schema + +log = None # set by flow + + +class SearchIndex: + + def __init__(self, indexDir: Path): + indexDir.mkdir(parents=True, exist_ok=True) + self.ix = create_in(indexDir, schema) + self.writer = self.ix.writer() + + def addDoc(self, **kw): + self.writer.add_document(**kw) + + def commit(self): + self.writer.commit() + with self.ix.searcher() as searcher: + log.info(f'index doc count = {searcher.doc_count()}')
--- a/search/meeting_docs.py Wed Jul 10 12:25:06 2024 -0700 +++ b/search/meeting_docs.py Thu Jul 11 17:35:31 2024 -0700 @@ -1,54 +0,0 @@ -import json -from pathlib import Path -from pprint import pprint -import time -import requests -from lxml.html import fromstring - - -def getMeetingText(meetingUrl) -> list[str]: - mhtml = requests.get(meetingUrl).text - el = fromstring(mhtml) - m = el.cssselect('div#meetingSection')[0] - for st in m.cssselect('style'): - st.clear() - meetingText = [ - chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap) - if chunk.strip() - ] - return meetingText - - -def gatherMtgs(mtg): - outDir = Path(f'data/albany/meetingId/{mtg["id"]}') - outDir.mkdir(parents=True, exist_ok=True) - outFile = outDir / 'agenda.json' - if outFile.exists(): - return - meetingUrl = None - for doc in mtg.get('documentList', []): - if doc['templateName'] == 'HTML Agenda Packet': - tid = doc['templateId'] - meetingUrl = f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}' - - row = { - 'created': time.time(), - 'mtg': mtg, - 'videoUrl': mtg['videoUrl'], - 'meetingUrl': meetingUrl, - 'phrases': getMeetingText(meetingUrl) if meetingUrl else [], - } - outFile.write_text(json.dumps(row, indent=2)) - print(f'wrote {outFile}') - - -for mtg in (requests.get( - "https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024" -).json()): - gatherMtgs(mtg) - -for mtg in (requests.get( - "https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings" -).json()): - gatherMtgs(mtg) - break