# -*- coding: utf-8 -*- """Driver for LifeScan OneTouch Verio 2015 devices. Further information on the device protocol can be found at https://github.com/Flameeyes/glucometer-protocols/blob/master/lifescan/onetouch-verio-2015.md """ __author__ = 'Diego Elio Pettenò' __email__ = 'flameeyes@flameeyes.eu' __copyright__ = 'Copyright © 2016, Diego Elio Pettenò' __license__ = 'MIT' import datetime import struct from pyscsi.pyscsi.scsi import SCSI from pyscsi.pyscsi.scsi_device import SCSIDevice from glucometerutils import common from glucometerutils import exceptions from glucometerutils.drivers import lifescan_common # Match the same values in the otultraeasy driver. _STX = 0x02 _ETX = 0x03 # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 _STRUCT_PREAMBLE = struct.Struct(' _REGISTER_SIZE: raise lifescan_common.MalformedCommand( 'invalid length: %d > REGISTER_SIZE' % length) # 2 is the length of the checksum, so it should be ignored. calculated_checksum = lifescan_common.crc_ccitt(register[:(length-2)]) coda_offset = length - _STRUCT_CODA.size etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:]) if etx != _ETX: raise lifescan_common.MalformedCommand( 'invalid ETX byte: %02x' % etx) if encoded_checksum != calculated_checksum: raise exceptions.InvalidChecksum(encoded_checksum, calculated_checksum) response = register[_STRUCT_PREAMBLE.size:coda_offset] return response def _encode_message(cmd): """Add message preamble and calculate checksum, add padding.""" length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size preamble = _STRUCT_PREAMBLE.pack(_STX, length) message = preamble + cmd + bytes((_ETX,)) checksum = _STRUCT_CHECKSUM.pack(lifescan_common.crc_ccitt(message)) # Pad the message to match the size of the register. return message + checksum + bytes( _REGISTER_SIZE - 2 - len(message)) def _convert_timestamp(timestamp): return datetime.datetime.utcfromtimestamp(timestamp + _EPOCH_BASE) class Device(object): def __init__(self, device): self.device_name_ = device self.scsi_device_ = SCSIDevice(device) self.scsi_ = SCSI(self.scsi_device_) self.scsi_.blocksize = _REGISTER_SIZE def _send_message(self, cmd, lba): """Send a request to the meter, and read its response. Args: cmd: (bytes) the raw command to send the device, without preamble or checksum. lba: (int) the address of the block register to use, known valid addresses are 3, 4 and 5. Returns: (bytes) The raw response from the meter. No preamble or coda is present, and the checksum has already been validated. """ self.scsi_.write10(lba, 1, _encode_message(cmd)) response = self.scsi_.read10(lba, 1) # TODO: validate that the response is valid. return _extract_message(response.datain) def connect(self): inq = self.scsi_.inquiry() vendor = inq.result['t10_vendor_identification'][:32] if vendor != b'LifeScan': raise exceptions.ConnectionFailed( 'Device %s is not a LifeScan glucometer.' % self.device_name_) def disconnect(self): return def get_information_string(self): return ('OneTouch %s glucometer\n' 'Serial number: %s\n' 'Software version: %s\n' 'Time: %s\n' 'Default unit: %s\n' % ( self._query_string(_QUERY_KEY_MODEL), self.get_serial_number(), self.get_version(), self.get_datetime(), self.get_glucose_unit())) def _query_string(self, query_key): response = self._send_message(_QUERY_REQUEST + query_key, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) # Strings are encoded in wide characters (LE), but they should # only contain ASCII characters. Note that the string is # null-terminated, so the last character should be dropped. return response[2:].decode('utf-16-le')[:-1] def _read_parameter(self, parameter_key): response = self._send_message( _READ_PARAMETER_REQUEST + parameter_key, 4) if response[0:2] != b'\x03\x06': raise lifescan_common.MalformedCommand( 'invalid response, expected 03 06, received %02x %02x' % ( response[0], response[1])) return response[2:] def get_serial_number(self): return self._query_string(_QUERY_KEY_SERIAL) def get_version(self): return self._query_string(_QUERY_KEY_SOFTWARE) def get_datetime(self): response = self._send_message(_READ_RTC_REQUEST, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:]) return _convert_timestamp(timestamp) def set_datetime(self, date=datetime.datetime.now()): epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE) delta = date - epoch timestamp = int(delta.total_seconds()) timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) # The device does not return the new datetime, so confirm by # calling READ RTC again. return self.get_datetime() def zero_log(self): response = self._send_message(_MEMORY_ERASE_REQUEST, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) def _get_reading_count(self): response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) (record_count,) = _STRUCT_RECORDID.unpack(response[2:]) return record_count def get_glucose_unit(self): unit_value = self._read_parameter(_PARAMETER_KEY_UNIT) if unit_value == b'\x00\x00\x00\x00': return common.UNIT_MGDL elif unit_value == b'\x01\x00\x00\x00': return common.UNIT_MMOLL else: raise exceptions.InvalidGlucoseUnit('%r' % unit_value) def _get_reading(self, record_number): request = (_READ_RECORD_REQUEST_PREFIX + _STRUCT_RECORDID.pack(record_number) + _READ_RECORD_REQUEST_SUFFIX) response = self._send_message(request, 3) if response[0:2] != b'\x04\06': raise lifescan_common.MalformedCommand( 'invalid response, expected 04 06, received %02x %02x' % ( response[0], response[1])) (unused_const1, unused_const2, unused_counter, unused_const3, unused_counter2, timestamp, value, unused_flags, unused_const4, unused_const5) = _STRUCT_RECORD.unpack(response) return common.Reading(_convert_timestamp(timestamp), float(value)) def get_readings(self): record_count = self._get_reading_count() for record_number in range(record_count): yield self._get_reading(record_number)