Mercurial > code > home > repos > doorbell
view telemetrix_local_serial.py @ 3:67402d8b4e0d
copy from telemetrix repo
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:06:45 -0800 |
parents | |
children | 6182841fb92e |
line wrap: on
line source
# -*- coding: utf-8 -*- """ Copyright (c) 2015-2020 Alan Yorinks All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE Version 3 as published by the Free Software Foundation; either or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ import asyncio import sys import serial import time LF = 0x0a # noinspection PyStatementEffect,PyUnresolvedReferences,PyUnresolvedReferences class TelemetrixAioSerial: """ This class encapsulates management of the serial port that communicates with the Arduino Firmata It provides a 'futures' interface to make Pyserial compatible with asyncio """ def __init__(self, com_port='/dev/ttyACM0', baud_rate=115200, sleep_tune=.0001, telemetrix_aio_instance=None, close_loop_on_error=True): """ This is the constructor for the aio serial handler :param com_port: Com port designator :param baud_rate: UART baud rate :param telemetrix_aio_instance: reference to caller :return: None """ # print('Initializing Arduino - Please wait...', end=" ") sys.stdout.flush() self.my_serial = serial.Serial(com_port, baud_rate, timeout=1, writeTimeout=1) self.com_port = com_port self.sleep_tune = sleep_tune self.telemetrix_aio_instance = telemetrix_aio_instance self.close_loop_on_error = close_loop_on_error # used by read_until self.start_time = None async def get_serial(self): """ This method returns a reference to the serial port in case the user wants to call pyserial methods directly :return: pyserial instance """ return self.my_serial async def write(self, data): """ This is an asyncio adapted version of pyserial write. It provides a non-blocking write and returns the number of bytes written upon completion :param data: Data to be written :return: Number of bytes written """ # the secret sauce - it is in your future future = asyncio.Future() result = None try: # result = self.my_serial.write(bytes([ord(data)])) result = self.my_serial.write(bytes(data)) except serial.SerialException: # noinspection PyBroadException loop = None await self.close() future.cancel() if self.close_loop_on_error: loop = asyncio.get_event_loop() loop.stop() if self.telemetrix_aio_instance.the_task: self.telemetrix_aio_instance.the_task.cancel() await asyncio.sleep(1) if self.close_loop_on_error: loop.close() if result: future.set_result(result) while True: if not future.done(): # spin our asyncio wheels until future completes await asyncio.sleep(self.sleep_tune) else: return future.result() async def read(self, size=1): """ This is an asyncio adapted version of pyserial read that provides non-blocking read. :return: One character """ # create an asyncio Future future = asyncio.Future() # create a flag to indicate when data becomes available data_available = False # wait for a character to become available and read from # the serial port while True: if not data_available: # test to see if a character is waiting to be read. # if not, relinquish control back to the event loop through the # short sleep if not self.my_serial.in_waiting: await asyncio.sleep(self.sleep_tune*2) # data is available. # set the flag to true so that the future can "wait" until the # read is completed. else: data_available = True data = self.my_serial.read(size) # set future result to make the character available if size == 1: future.set_result(ord(data)) else: future.set_result(list(data)) else: # wait for the future to complete if not future.done(): await asyncio.sleep(self.sleep_tune) else: # future is done, so return the character return future.result() async def read_until(self, expected=LF, size=None, timeout=1): """ This is an asyncio adapted version of pyserial read that provides non-blocking read. :return: Data delimited by expected """ expected = str(expected).encode() # create an asyncio Future future = asyncio.Future() # create a flag to indicate when data becomes available data_available = False if timeout: self.start_time = time.time() # wait for a character to become available and read from # the serial port while True: if not data_available: # test to see if a character is waiting to be read. # if not, relinquish control back to the event loop through the # short sleep if not self.my_serial.in_waiting: if timeout: elapsed_time = time.time() - self.start_time if elapsed_time > timeout: return None await asyncio.sleep(self.sleep_tune) # data is available. # set the flag to true so that the future can "wait" until the # read is completed. else: data_available = True data = self.my_serial.read_until(expected, size) # set future result to make the character available return_value = list(data) future.set_result(return_value) else: # wait for the future to complete if not future.done(): await asyncio.sleep(self.sleep_tune) else: # future is done, so return the character return future.result() async def reset_input_buffer(self): """ Reset the input buffer """ self.my_serial.reset_input_buffer() async def close(self): """ Close the serial port """ if self.my_serial: self.my_serial.close()