Mercurial > code > home > repos > doorbell
changeset 4:6182841fb92e
use serial_asyncio; rm dead code
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:14:01 -0800 |
parents | 67402d8b4e0d |
children | 696a46a1b239 |
files | telemetrix_local_serial.py |
diffstat | 1 files changed, 16 insertions(+), 177 deletions(-) [+] |
line wrap: on
line diff
--- a/telemetrix_local_serial.py Sun Feb 05 14:06:45 2023 -0800 +++ b/telemetrix_local_serial.py Sun Feb 05 14:14:01 2023 -0800 @@ -17,11 +17,11 @@ """ import asyncio -import sys -import serial -import time +import logging -LF = 0x0a +import serial_asyncio + +log = logging.getLogger('serial') # noinspection PyStatementEffect,PyUnresolvedReferences,PyUnresolvedReferences @@ -32,183 +32,22 @@ It provides a 'futures' interface to make Pyserial compatible with asyncio """ - def __init__(self, com_port='/dev/ttyACM0', baud_rate=115200, sleep_tune=.0001, - telemetrix_aio_instance=None, close_loop_on_error=True): - - """ - This is the constructor for the aio serial handler + reader: asyncio.StreamReader + writer: asyncio.StreamWriter - :param com_port: Com port designator - - :param baud_rate: UART baud rate - - :param telemetrix_aio_instance: reference to caller - - :return: None - """ - # print('Initializing Arduino - Please wait...', end=" ") - sys.stdout.flush() - self.my_serial = serial.Serial(com_port, baud_rate, timeout=1, - writeTimeout=1) + def __init__(self, com_port='/dev/ttyACM0', baud_rate=115200): + self.conn = serial_asyncio.open_serial_connection( + url=com_port, baudrate=baud_rate, timeout=1, + writeTimeout=1) self.com_port = com_port - self.sleep_tune = sleep_tune - self.telemetrix_aio_instance = telemetrix_aio_instance - self.close_loop_on_error = close_loop_on_error - # used by read_until - self.start_time = None - - async def get_serial(self): - """ - This method returns a reference to the serial port in case the - user wants to call pyserial methods directly - - :return: pyserial instance - """ - return self.my_serial - - async def write(self, data): - """ - This is an asyncio adapted version of pyserial write. It provides a - non-blocking write and returns the number of bytes written upon - completion + async def open(self): + self.reader, self.writer = await self.conn - :param data: Data to be written - :return: Number of bytes written - """ - # the secret sauce - it is in your future - future = asyncio.Future() - result = None - try: - # result = self.my_serial.write(bytes([ord(data)])) - result = self.my_serial.write(bytes(data)) - - except serial.SerialException: - # noinspection PyBroadException - loop = None - await self.close() - future.cancel() - if self.close_loop_on_error: - loop = asyncio.get_event_loop() - loop.stop() - - if self.telemetrix_aio_instance.the_task: - self.telemetrix_aio_instance.the_task.cancel() - await asyncio.sleep(1) - if self.close_loop_on_error: - loop.close() - - if result: - future.set_result(result) - while True: - if not future.done(): - # spin our asyncio wheels until future completes - await asyncio.sleep(self.sleep_tune) - - else: - return future.result() + async def write(self, data: bytes): + self.writer.write(data) + await self.writer.drain() async def read(self, size=1): - """ - This is an asyncio adapted version of pyserial read - that provides non-blocking read. - - :return: One character - """ - - # create an asyncio Future - future = asyncio.Future() - - # create a flag to indicate when data becomes available - data_available = False - - # wait for a character to become available and read from - # the serial port - while True: - if not data_available: - # test to see if a character is waiting to be read. - # if not, relinquish control back to the event loop through the - # short sleep - if not self.my_serial.in_waiting: - await asyncio.sleep(self.sleep_tune*2) - - # data is available. - # set the flag to true so that the future can "wait" until the - # read is completed. - else: - data_available = True - data = self.my_serial.read(size) - # set future result to make the character available - if size == 1: - future.set_result(ord(data)) - else: - future.set_result(list(data)) - else: - # wait for the future to complete - if not future.done(): - await asyncio.sleep(self.sleep_tune) - else: - # future is done, so return the character - return future.result() - - async def read_until(self, expected=LF, size=None, timeout=1): - """ - This is an asyncio adapted version of pyserial read - that provides non-blocking read. - - :return: Data delimited by expected - """ - - expected = str(expected).encode() - # create an asyncio Future - future = asyncio.Future() - - # create a flag to indicate when data becomes available - data_available = False - - if timeout: - self.start_time = time.time() - - # wait for a character to become available and read from - # the serial port - while True: - if not data_available: - # test to see if a character is waiting to be read. - # if not, relinquish control back to the event loop through the - # short sleep - if not self.my_serial.in_waiting: - if timeout: - elapsed_time = time.time() - self.start_time - if elapsed_time > timeout: - return None - await asyncio.sleep(self.sleep_tune) - # data is available. - # set the flag to true so that the future can "wait" until the - # read is completed. - else: - data_available = True - data = self.my_serial.read_until(expected, size) - # set future result to make the character available - return_value = list(data) - future.set_result(return_value) - else: - # wait for the future to complete - if not future.done(): - await asyncio.sleep(self.sleep_tune) - else: - # future is done, so return the character - return future.result() - - async def reset_input_buffer(self): - """ - Reset the input buffer - """ - self.my_serial.reset_input_buffer() - - async def close(self): - """ - Close the serial port - """ - if self.my_serial: - self.my_serial.close() + return await self.reader.readexactly(size)