view telemetrix_local_serial.py @ 9:7e19dffb767b default tip

rm more unused code and logging
author drewp@bigasterisk.com
date Mon, 06 Feb 2023 11:53:42 -0800
parents 6182841fb92e
children
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
 Copyright (c) 2015-2020 Alan Yorinks All rights reserved.

 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
 Version 3 as published by the Free Software Foundation; either
 or (at your option) any later version.
 This library is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 General Public License for more details.

 You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
 along with this library; if not, write to the Free Software
 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
"""

import asyncio
import logging

import serial_asyncio

log = logging.getLogger('serial')


# noinspection PyStatementEffect,PyUnresolvedReferences,PyUnresolvedReferences
class TelemetrixAioSerial:
    reader: asyncio.StreamReader
    writer: asyncio.StreamWriter

    def __init__(self, com_port='/dev/ttyACM0', baud_rate=115200):
        self.conn = serial_asyncio.open_serial_connection(
            url=com_port, baudrate=baud_rate, timeout=1,
            writeTimeout=1)

        self.com_port = com_port

    async def open(self):
        self.reader, self.writer = await self.conn

    async def write(self, data: bytes):
        self.writer.write(data)
        await self.writer.drain()

    async def read(self, size=1):
        return await self.reader.readexactly(size)