# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: © 2016 The glucometerutils Authors
# SPDX-License-Identifier: MIT
"""Driver for LifeScan OneTouch Verio (2015) and Select Plus devices.
Verio 2015 devices can be recognized by microUSB connectors.
Supported features:
- get readings, including pre-/post-meal notes †;
- use the glucose unit preset on the device by default;
- get and set date and time;
- get serial number and software version;
- memory reset (caution!)
Expected device path: /dev/sdb or similar USB block device.
† Pre-/post-meal notes are only supported on Select Plus devices.
Further information on the device protocol can be found at
https://protocols.glucometers.tech/lifescan/onetouch-verio-2015
"""
import binascii
import datetime
import logging
from typing import Any, Dict, Generator, Optional
import construct
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice
from glucometerutils import common, driver, exceptions
from glucometerutils.support import lifescan, lifescan_binary_protocol
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
_PACKET = construct.Padded(
_REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False)
)
_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
_QUERY_REQUEST = construct.Struct(
const=construct.Const(b"\x03\xe6\x02"),
selector=construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02),
)
_QUERY_RESPONSE = construct.Struct(
const=construct.Const(b"\x03\x06"), value=construct.CString(encoding="utf-16-le"),
)
_READ_PARAMETER_REQUEST = construct.Struct(
const=construct.Const(b"\x03"), selector=construct.Enum(construct.Byte, unit=0x04),
)
_READ_UNIT_RESPONSE = construct.Struct(
success=_COMMAND_SUCCESS,
unit=lifescan_binary_protocol.GLUCOSE_UNIT,
padding=construct.Padding(3),
)
_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
success=_COMMAND_SUCCESS,
timestamp=lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
const=construct.Const(b"\x03\x20\x01"),
timestamp=lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
success=_COMMAND_SUCCESS, count=construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
const_1=construct.Const(b"\x03\x31\x02"),
record_id=construct.Int16ul,
const_2=construct.Const(b"\x00"),
)
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
common.Meal.BEFORE: 0x01,
common.Meal.AFTER: 0x02,
}
_READ_RECORD_RESPONSE = construct.Struct(
success=_COMMAND_SUCCESS,
inverse_counter=construct.Int16ul,
padding_1=construct.Padding(1),
lifetime_counter=construct.Int16ul,
timestamp=lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
value=construct.Int16ul,
meal=construct.Mapping(construct.Byte, _MEAL_FLAG),
padding_2=construct.Padding(4),
)
class Device(driver.GlucometerDevice):
def __init__(self, device: Optional[str]) -> None:
if not device:
raise exceptions.CommandLineError(
"--device parameter is required, should point to the disk "
"device representing the meter."
)
super().__init__(device)
self.device_name_ = device
self.scsi_device_ = SCSIDevice(device, readwrite=True)
self.scsi_ = SCSI(self.scsi_device_)
self.scsi_.blocksize = _REGISTER_SIZE
def connect(self) -> None:
inq = self.scsi_.inquiry()
logging.debug("Device connected: %r", inq.result)
vendor = inq.result["t10_vendor_identification"][:32]
if vendor != b"LifeScan":
raise exceptions.ConnectionFailed(
f"Device {self.device_name_} is not a LifeScan glucometer."
)
def disconnect(self) -> None: # pylint: disable=no-self-use
return
def _send_request(
self,
lba: int,
request_format: construct.Struct,
request_obj: Optional[Dict[str, Any]],
response_format: construct.Struct,
) -> construct.Container:
"""Send a request to the meter, and read its response.
Args:
lba: the address of the block register to use, known
valid addresses are 3, 4 and 5.
request_format: a construct format identifier of the request to send
request_obj: the object to format with the provided identifier
response_format: a construct format identifier to parse the returned
message with.
Returns:
The Container object parsed from the response received by the meter.
Raises:
lifescan.MalformedCommand if Construct fails to build the request or
parse the response.
"""
try:
request = request_format.build(request_obj)
request_raw = _PACKET.build({"data": {"value": {"message": request}}})
logging.debug("Request sent: %s", binascii.hexlify(request_raw))
self.scsi_.write10(lba, 1, request_raw)
response_raw = self.scsi_.read10(lba, 1)
logging.debug(
"Response received: %s", binascii.hexlify(response_raw.datain)
)
response_pkt = _PACKET.parse(response_raw.datain).data
logging.debug("Response packet: %r", response_pkt)
response = response_format.parse(response_pkt.value.message)
logging.debug("Response parsed: %r", response)
return response
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
def _query_string(self, selector: str) -> str:
response = self._send_request(
3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE
)
return response.value
def get_meter_info(self) -> common.MeterInfo:
model = self._query_string("model")
return common.MeterInfo(
f"OneTouch {model} glucometer",
serial_number=self.get_serial_number(),
version_info=(f"Software version: {self.get_version()}",),
native_unit=self.get_glucose_unit(),
)
def get_serial_number(self) -> str:
return self._query_string("serial")
def get_version(self) -> str:
return self._query_string("software")
def get_datetime(self) -> datetime.datetime:
response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self) -> None:
self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE
)
return response.unit
def _get_reading_count(self) -> int:
response = self._send_request(
3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
)
return response.count
def _get_reading(self, record_id: int) -> common.GlucoseReading:
response = self._send_request(
3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE
)
return common.GlucoseReading(
response.timestamp, float(response.value), meal=response.meal
)
def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)