From d4322fc94a02261ebabbca2f24c03e87095720de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 14 Feb 2016 17:14:07 +0000 Subject: otverio2015: new driver for OneTouch Verio (2015) and similar devices. This should be working for OneTouch Select Plus devices, too. It currently does not support reporting in the native unit of the device. As of today, you'll also need my forked repository of python-scsi, as the current upstream one does not have working SGIO for Linux. --- README | 5 + glucometerutils/drivers/otverio2015.py | 226 +++++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 glucometerutils/drivers/otverio2015.py diff --git a/README b/README index bde999f..0f51b4c 100644 --- a/README +++ b/README @@ -23,9 +23,14 @@ Supported devices * **LifeScan OneTouch Ultra Easy** (also known as **Ultra Mini**): get information, get and set time, dump of readings in native units, memory reset. + * **LifeScan OneTouch Verio** (USB version), **LifeScan OneTouch + Select Plus**: get information, get and set time, dump of + readings. Requires [python-scsi][1]. * **Abbott FreeStyle Optium**: get information, get and set time, dump of readings in native units. +[1] https://github.com/rosjat/python-scsi + Dump format ----------- diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py new file mode 100644 index 0000000..44f3573 --- /dev/null +++ b/glucometerutils/drivers/otverio2015.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +"""Driver for LifeScan OneTouch Verio 2015 devices. + +Further information on the device protocol can be found at + +https://github.com/Flameeyes/glucometer-protocols/blob/master/lifescan/onetouch-verio-2015.md +""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2016, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime +import struct + +from pyscsi.pyscsi.scsi import SCSI +from pyscsi.pyscsi.scsi_device import SCSIDevice + +from glucometerutils import common +from glucometerutils import exceptions +from glucometerutils.drivers import lifescan_common + +# Match the same values in the otultraeasy driver. +_STX = 0x02 +_ETX = 0x03 + +# This device uses SCSI blocks as registers. +_REGISTER_SIZE = 512 + +_STRUCT_PREAMBLE = struct.Struct(' _REGISTER_SIZE: + raise lifescan_common.MalformedCommand( + 'invalid length: %d > REGISTER_SIZE' % length) + + # 2 is the length of the checksum, so it should be ignored. + calculated_checksum = lifescan_common.crc_ccitt(register[:(length-2)]) + + coda_offset = length - _STRUCT_CODA.size + etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:]) + if etx != _ETX: + raise lifescan_common.MalformedCommand( + 'invalid ETX byte: %02x' % etx) + if encoded_checksum != calculated_checksum: + raise lifescan_common.InvalidChecksum( + encoded_checksum, calculated_checksum) + + response = register[_STRUCT_PREAMBLE.size:coda_offset] + return response + +def _encode_message(cmd): + """Add message preamble and calculate checksum, add padding.""" + length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size + preamble = _STRUCT_PREAMBLE.pack(_STX, length) + message = preamble + cmd + bytes((_ETX,)) + checksum = _STRUCT_CHECKSUM.pack(lifescan_common.crc_ccitt(message)) + + # Pad the message to match the size of the register. + return message + checksum + bytes( + _REGISTER_SIZE - 2 - len(message)) + +def _convert_timestamp(timestamp): + return datetime.datetime.fromtimestamp(timestamp + _EPOCH_BASE) + +class Device(object): + def __init__(self, device): + self.device_name_ = device + self.scsi_device_ = SCSIDevice(device) + self.scsi_ = SCSI(self.scsi_device_) + self.scsi_.blocksize = _REGISTER_SIZE + + def _send_message(self, cmd, lba): + """Send a request to the meter, and read its response. + + Args: + cmd: (bytes) the raw command to send the device, without + preamble or checksum. + lba: (int) the address of the block register to use, known + valid addresses are 3, 4 and 5. + + Returns: + (bytes) The raw response from the meter. No preamble or coda is + present, and the checksum has already been validated. + """ + self.scsi_.write10(lba, 1, _encode_message(cmd)) + response = self.scsi_.read10(lba, 1) + # TODO: validate that the response is valid. + return _extract_message(response.datain) + + def connect(self): + inq = self.scsi_.inquiry() + vendor = inq.result['t10_vendor_identification'][:32] + if vendor != b'LifeScan': + raise exceptions.ConnectionFailed( + 'Device %s is not a LifeScan glucometer.' % self.device_name_) + + def disconnect(self): + return + + def get_information_string(self): + return ('OneTouch %s glucometer\n' + 'Serial number: %s\n' + 'Software version: %s\n' + 'Time: %s\n' + 'Default unit: unknown\n' % ( + self._query_string(_QUERY_KEY_MODEL), + self.get_serial_number(), + self.get_version(), + self.get_datetime())) + + def _query_string(self, query_key): + response = self._send_message(_QUERY_REQUEST + query_key, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + # Strings are encoded in wide characters (LE), but they should + # only contain ASCII characters. Note that the string is + # null-terminated, so the last character should be dropped. + return response[2:].decode('utf-16-le')[:-1] + + def get_serial_number(self): + return self._query_string(_QUERY_KEY_SERIAL) + + def get_version(self): + return self._query_string(_QUERY_KEY_SOFTWARE) + + def get_datetime(self): + response = self._send_message(_READ_RTC_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:]) + return _convert_timestamp(timestamp) + + def set_datetime(self, date=datetime.datetime.now()): + epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE) + delta = date - epoch + timestamp = int(delta.total_seconds()) + + timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) + response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3) + + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + # The device does not return the new datetime, so confirm by + # calling READ RTC again. + return self.get_datetime() + + def zero_log(self): + response = self._send_message(_MEMORY_ERASE_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + def _get_reading_count(self): + response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + (record_count,) = _STRUCT_RECORDID.unpack(response[2:]) + return record_count + + def get_glucose_unit(self): + return common.UNIT_MGDL + + def _get_reading(self, record_number): + request = (_READ_RECORD_REQUEST_PREFIX + + _STRUCT_RECORDID.pack(record_number) + + _READ_RECORD_REQUEST_SUFFIX) + response = self._send_message(request, 3) + if response[0:2] != b'\x04\06': + raise lifescan_common.MalformedCommand( + 'invalid response, expected 04 06, received %02x %02x' % ( + response[0], response[1])) + + (unused_const1, unused_const2, unused_counter, unused_const3, + unused_counter2, timestamp, value, unused_flags, unused_const4, + unused_const5) = _STRUCT_RECORD.unpack(response) + + return common.Reading(_convert_timestamp(timestamp), float(value)) + + def get_readings(self): + record_count = self._get_reading_count() + for record_number in range(record_count): + yield self._get_reading(record_number) -- cgit v1.2.3