view patch_cyclone.py @ 10:742e950b0de0 0.15.0

release 0.15.0
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 19:54:25 -0800
parents 1b6718a54c00
children
line wrap: on
line source



def patch_sse():
    from cyclone import escape
    import cyclone.sse
    def new_sendEvent(self, message, event=None, eid=None, retry=None):
        if isinstance(message, dict):
            message = escape.json_encode(message)
        if isinstance(message, str):
            message = message.encode("utf-8")
        assert isinstance(message, bytes)
        if eid:
            self.transport.write(b"id: " + eid.encode("utf-8") + b"\n")
        if event:
            self.transport.write(b"event: " + event + b"\n")
        if retry:
            self.transport.write(b"retry: " + retry.encode("utf-8") + b"\n")
        self.transport.write(b"data: " + message + b"\n\n")

    cyclone.sse.SSEHandler.sendEvent = new_sendEvent