diff options
Diffstat (limited to 'glucometerutils/drivers/otultraeasy.py')
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 54 |
1 files changed, 34 insertions, 20 deletions
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 41e2acd..d5eb3c9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -18,6 +18,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import binascii import datetime import logging +from typing import Any, Dict, Generator, Optional import construct @@ -92,7 +93,13 @@ _READING_RESPONSE = construct.Struct( ) -def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): +def _make_packet( + message: bytes, + sequence_number: int, + expect_receive: bool, + acknowledge: bool, + disconnect: bool, +): return _PACKET.build( { "data": { @@ -115,24 +122,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__(device) + def __init__(self, device: Optional[str]) -> None: + super().__init__(device) self.sent_counter_ = False self.expect_receive_ = False self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def connect(self): + def connect(self) -> None: try: self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def disconnect(self): + def disconnect(self) -> None: self.connect() - def _send_packet(self, message, acknowledge=False, disconnect=False): + def _send_packet( + self, message: bytes, acknowledge: bool = False, disconnect: bool = False + ) -> None: pkt = _make_packet( message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect ) @@ -141,7 +150,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.serial_.write(pkt) self.serial_.flush() - def _read_packet(self): + def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) @@ -157,14 +166,19 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return pkt - def _send_ack(self): + def _send_ack(self) -> None: self._send_packet(b"", acknowledge=True, disconnect=False) - def _read_ack(self): + def _read_ack(self) -> None: pkt = self._read_packet() assert pkt.link_control.acknowledge - def _send_request(self, request_format, request_obj, response_format): + def _send_request( + self, + request_format: construct.Struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request, acknowledge=False, disconnect=False) @@ -182,7 +196,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), @@ -190,26 +204,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version - def get_serial_number(self): + def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE ) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "write", "timestamp": date}, @@ -217,17 +231,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.timestamp - def zero_log(self): + def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": _INVALID_RECORD}, @@ -235,14 +249,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) return common.GlucoseReading(response.timestamp, float(response.value)) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) |