Mercurial > code > home > repos > doorbell
view telemetrix_local_serial.py @ 5:696a46a1b239
logging and minor refactors
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:26:04 -0800 |
parents | 6182841fb92e |
children | 7e19dffb767b |
line wrap: on
line source
# -*- coding: utf-8 -*- """ Copyright (c) 2015-2020 Alan Yorinks All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE Version 3 as published by the Free Software Foundation; either or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ import asyncio import logging import serial_asyncio log = logging.getLogger('serial') # noinspection PyStatementEffect,PyUnresolvedReferences,PyUnresolvedReferences class TelemetrixAioSerial: """ This class encapsulates management of the serial port that communicates with the Arduino Firmata It provides a 'futures' interface to make Pyserial compatible with asyncio """ reader: asyncio.StreamReader writer: asyncio.StreamWriter def __init__(self, com_port='/dev/ttyACM0', baud_rate=115200): self.conn = serial_asyncio.open_serial_connection( url=com_port, baudrate=baud_rate, timeout=1, writeTimeout=1) self.com_port = com_port async def open(self): self.reader, self.writer = await self.conn async def write(self, data: bytes): self.writer.write(data) await self.writer.drain() async def read(self, size=1): return await self.reader.readexactly(size)