view scobot/index/build_index_flow.py @ 18:a527228aa353 default tip

prefect use postgres
author drewp@bigasterisk.com
date Fri, 19 Jul 2024 21:01:09 -0700
parents 7a87ba2f00d9
children
line wrap: on
line source

import json
import re
from pathlib import Path
from typing import Iterable, cast

import lxml.html
import nltk
from prefect import flow, task
from prefect.logging import get_run_logger

import scobot.index.access
from scobot.index.access import SearchIndex
from scobot.index.download_tasks import getCityMutableJson, getCityPermanent
from scobot.local_types import MeetingRow, Url

log = None


@task()
def meetingListUrls() -> Iterable[Url]:
    return [
        Url("https://albanyca.primegov.com/api/v2/PublicPortal/ListArchivedMeetings?year=2024"
            ),
        Url("https://albanyca.primegov.com/api/v2/PublicPortal/ListUpcomingMeetings"
            ),
    ]


def meetingAgendaUrl(mtg: MeetingRow) -> Url:
    for doc in mtg.get('documentList', []):
        if doc['templateName'] == 'HTML Agenda Packet':
            tid = doc['templateId']
            return Url(
                f'https://albanyca.primegov.com/Portal/Meeting?meetingTemplateId={tid}'
            )
    raise ValueError(f"no agenda doc found for {mtg['id']=}")


def extractMeetingText(mhtml: str) -> list[str]:
    el = lxml.html.fromstring(mhtml)
    m = el.cssselect('div#meetingSection')[0]
    for st in m.cssselect('style'):
        st.clear()
    meetingText = [
        chunk for chunk in m.xpath('.//text()', namespaces=el.nsmap)
        if chunk.strip()
    ]
    return meetingText


def phrasesFromFile(p: Path) -> Iterable[dict]:
    mtg = json.loads(p.read_text())
    print(f'  has {len(mtg["phrases"])} phrases')
    text = ' '.join(mtg['phrases'])

    i = 0
    for sentence in nltk.sent_tokenize(text):
        sentence = re.sub(r'\s+', ' ', sentence).strip()
        if len(sentence) < 5:
            continue
        if not re.search(r'\w\w\w\w\w', sentence):
            continue

        yield dict(id=f"{mtg['mtg']['id']}_sentence{i}",
                   title=f"{mtg['mtg']['date']} {mtg['mtg']['title']}",
                   sourceFile=str(p),
                   posJson="[]",
                   phrase=sentence)


async def addMeeting(index: SearchIndex, mtg: MeetingRow):
    try:
        agendaUrl = meetingAgendaUrl(mtg)
    except ValueError:
        pass
    else:
        html = await getCityPermanent(agendaUrl)
        texts = extractMeetingText(html)
        for sentence in nltk.sent_tokenize(' '.join(texts)):
            index.addDoc(sourceTitle=f'{mtg["date"]} {mtg["title"]}',
                         phrase=sentence)

    try:
        videoUrl = mtg['videoUrl']
        if not videoUrl:
            raise KeyError
    except KeyError:
        pass
    else:
        '''transcribe and index video...'''


@flow(persist_result=True)
async def buildIndex():
    global log
    log = get_run_logger()
    scobot.index.access.log = log

    index = SearchIndex(Path('data/build/index0'))
    for url in meetingListUrls():
        mtgs = cast(list[MeetingRow], await getCityMutableJson(url))
        log.info(f'got {len(mtgs)=}')

        for mtg in mtgs:
            await addMeeting(index, mtg)
    index.commit()
    # todo: kill search to restart it


if __name__ == '__main__':
    buildIndex.serve(name='buildIndex')