Mercurial > code > home > repos > sco-bot
view scobot/index/build_index_flow.py @ 15:6ed25bcaaf1f
add prefect and rebuild flow to k8s
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Jul 2024 00:30:47 -0700 |
parents | b9c2b7fedbcd |
children | 7a87ba2f00d9 |
line wrap: on
line source
import json import re from pathlib import Path from typing import Iterable, cast import lxml.html import nltk from prefect import flow, task from prefect.logging import get_run_logger import scobot.index.access from scobot.index.access import SearchIndex from scobot.index.download_tasks import getCityMutableJson, getCityPermanent from scobot.local_types import MeetingRow, Url log = None @task() def meetingListUrls() -> Iterable[Url]: return [ "https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024", "https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings", ] def meetingAgendaUrl(mtg: MeetingRow) -> Url: for doc in mtg.get('documentList', []): if doc['templateName'] == 'HTML Agenda Packet': tid = doc['templateId'] return f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}' raise ValueError(f"no agenda doc found for {mtg['id']=}") def extractMeetingText(mhtml: str) -> list[str]: el = lxml.html.fromstring(mhtml) m = el.cssselect('div#meetingSection')[0] for st in m.cssselect('style'): st.clear() meetingText = [ chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap) if chunk.strip() ] return meetingText def phrasesFromFile(p: Path) -> Iterable[dict]: mtg = json.loads(p.read_text()) print(f' has {len(mtg["phrases"])} phrases') text = ' '.join(mtg['phrases']) i = 0 for sentence in nltk.sent_tokenize(text): sentence = re.sub(r'\s+', ' ', sentence).strip() if len(sentence) < 5: continue if not re.search(r'\w\w\w\w\w', sentence): continue yield dict(id=f"{mtg['mtg']['id']}_sentence{i}", title=f"{mtg['mtg']['date']} {mtg['mtg']['title']}", sourceFile=str(p), posJson="[]", phrase=sentence) def addMeeting(index: SearchIndex, mtg: MeetingRow): try: agendaUrl = meetingAgendaUrl(mtg) except ValueError: pass else: html = getCityPermanent(agendaUrl) texts = extractMeetingText(html) for se in nltk.sent_tokenize(' '.join(texts)): index.addDoc(sourceTitle=f'{mtg["date"]} {mtg["title"]}', phrase=se) try: videoUrl = mtg['videoUrl'] if not videoUrl: raise KeyError except KeyError: pass else: '''transcribe and index video...''' @flow(persist_result=True) def buildIndex(): global log log = get_run_logger() scobot.index.access.log = log index = SearchIndex(Path('data/build/index0')) for url in meetingListUrls(): mtgs = cast(list[MeetingRow], getCityMutableJson(url)) log.info(f'got {len(mtgs)=}') for mtg in mtgs: addMeeting(index, mtg) index.commit() if __name__ == '__main__': buildIndex.serve(name='buildIndex')