# -*- coding: utf-8 -*- # # SPDX-FileCopyrightText: © 2014 The glucometerutils Authors # SPDX-License-Identifier: MIT """Driver for LifeScan OneTouch Ultra Easy devices. Also supports OneTouch Ultra Mini devices (different name, same device). Supported features: - get readings; - use the glucose unit preset on the device by default; - get and set date and time; - get serial number and software version; - memory reset (caution!) Expected device path: /dev/ttyUSB0 or similar serial port device. """ import binascii import datetime import logging from typing import Any, Dict, Generator, Optional import construct from glucometerutils import common, driver from glucometerutils.support import ( construct_extras, lifescan, lifescan_binary_protocol, serial, ) _PACKET = lifescan_binary_protocol.LifeScanPacket(True) _INVALID_RECORD = 501 _COMMAND_SUCCESS = construct.Const(b"\x05\x06") _VERSION_REQUEST = construct.Const(b"\x05\x0d\x02") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, "version" / construct.PascalString(construct.Byte, encoding="ascii"), ) _SERIAL_NUMBER_REQUEST = construct.Const( b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" ) _SERIAL_NUMBER_RESPONSE = construct.Struct( _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"), ) _DATETIME_REQUEST = construct.Struct( construct.Const(b"\x05\x20"), # 0x20 is the datetime "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02), "timestamp" / construct.Default( construct_extras.Timestamp(construct.Int32ul), # type: ignore datetime.datetime(1970, 1, 1, 0, 0), ), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore ) _GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) _MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A") _READING_COUNT_RESPONSE = construct.Struct( construct.Const(b"\x0f"), "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul, ) _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore "value" / construct.Int32ul, ) def _make_packet( message: bytes, sequence_number: int, expect_receive: bool, acknowledge: bool, disconnect: bool, ): return _PACKET.build( { "data": { "value": { "message": message, "link_control": { "sequence_number": sequence_number, "expect_receive": expect_receive, "acknowledge": acknowledge, "disconnect": disconnect, }, } } } ) class Device(serial.SerialDevice, driver.GlucometerDevice): BAUDRATE = 9600 DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 def __init__(self, device: Optional[str]) -> None: super().__init__(device) self.sent_counter_ = False self.expect_receive_ = False self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def connect(self) -> None: try: self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) def disconnect(self) -> None: self.connect() def _send_packet( self, message: bytes, acknowledge: bool = False, disconnect: bool = False ) -> None: pkt = _make_packet( message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect ) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value if not pkt.link_control.disconnect and ( pkt.link_control.sequence_number != self.expect_receive_ ): raise lifescan.MalformedCommand( f"at position 2[0b] expected {self.expect_receive_:02x}, received {pkt.link_connect.sequence_count:02x}" ) return pkt def _send_ack(self) -> None: self._send_packet(b"", acknowledge=True, disconnect=False) def _read_ack(self) -> None: pkt = self._read_packet() assert pkt.link_control.acknowledge def _send_request( self, request_format: construct.Struct, request_obj: Optional[Dict[str, Any]], response_format: construct.Struct, ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request, acknowledge=False, disconnect=False) self.sent_counter_ = not self.sent_counter_ self._read_ack() response_pkt = self._read_packet() assert not response_pkt.link_control.acknowledge self.expect_receive_ = not self.expect_receive_ self._send_ack() return response_format.parse(response_pkt.message) except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), version_info=("Software version: " + self.get_version(),), native_unit=self.get_glucose_unit(), ) def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number def get_datetime(self) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE ) return response.timestamp def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "write", "timestamp": date}, _DATETIME_RESPONSE, ) return response.timestamp def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": _INVALID_RECORD}, _READING_COUNT_RESPONSE, ) return response.count def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) return common.GlucoseReading(response.timestamp, float(response.value)) def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id)