From 57d719974e9acb40146e41906b3184b2feac7091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sat, 6 Jan 2018 21:37:30 +0000 Subject: otverioiq: add totally untested driver. This is the first commit to support OneTouch Verio IQ (Issue #30). It's untested despite me having the device because it needs a new kernel I have not planned to build yet. Most of the protocol has been reverse engineered from the Tidepool driver (https://github.com/tidepool-org/chrome-uploader/blob/master/lib/drivers/onetouch/oneTouchVerioIQ.js) with a few assumption brought in from the UltraEasy and Verio 2015. --- glucometerutils/drivers/otverioiq.py | 194 +++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 glucometerutils/drivers/otverioiq.py diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py new file mode 100644 index 0000000..9cd9e16 --- /dev/null +++ b/glucometerutils/drivers/otverioiq.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +"""Driver for LifeScan OneTouch Verio IQ devices. + +Currently work in progress, untested. + +Expected device path: /dev/ttyUSB0 or similar serial port device. Device should +be auto-detected if not provided. +""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2018, Diego Elio Pettenò' +__license__ = 'MIT' + +import binascii +import datetime +import logging + +import construct + +from glucometerutils import common +from glucometerutils.support import construct_extras +from glucometerutils.support import lifescan +from glucometerutils.support import lifescan_binary_protocol +from glucometerutils.support import serial + +_COMMAND_SUCCESS = construct.Const(b'\x04\x06') + +_VERSION_REQUEST = construct.Const(b'\x04\x0d\x02') # Untested + +_VERSION_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'version' / construct.PascalString(construct.Byte, encoding='ascii'), +) + +_SERIAL_NUMBER_REQUEST = construct.Const( + b'\x04\x0b\x00\x02') + +_SERIAL_NUMBER_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'serial_number' / construct.CString(encoding='ascii'), +) + +_READ_RTC_REQUEST = construct.Const(b'\x04\x20\x02') + +_READ_RTC_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, +) + +_WRITE_RTC_REQUEST = construct.Struct( + construct.Const(b'\x04\x20\x01'), + 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, +) + +_GLUCOSE_UNIT_REQUEST = construct.Const( + b'\x04\x09\x02\x02') + + +_GLUCOSE_UNIT_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + construct.Padding(3), +) + +_MEMORY_ERASE_REQUEST = construct.Const(b'\x04\x1a') # Untested + +_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x04\x27\x00') + +_READ_RECORD_COUNT_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'count' / construct.Int16ul, +) + +_READ_RECORD_REQUEST = construct.Struct( + construct.Const(b'\x04\x21'), + 'record_id' / construct.Int16ul, +) + +_READING_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / construct_extras.Timestamp(construct.Int32ul), + 'value' / construct.Int32ul, + 'control' / construct.Byte, # Unknown value +) + + +class Device(serial.SerialDevice): + BAUDRATE = 9600 + DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x + TIMEOUT = 0.5 + + def __init__(self, device): + super(Device, self).__init__(device) + self.buffered_reader_ = construct.Rebuffered( + lifescan_binary_protocol.PACKET, tailcutoff=1024) + + def connect(self): + pass + + def disconnect(self): + pass + + def _send_packet(self, message): + pkt = lifescan_binary_protocol.PACKET.build( + {'value': { + 'message': request, + 'link_control': {}, # Verio does not use link_control. + }}) + logging.debug('sending packet: %s', binascii.hexlify(pkt)) + + self.serial_.write(pkt) + self.serial_.flush() + + def _read_packet(self): + raw_pkt = self.buffered_reader_.parse_stream(self.serial_) + logging.debug('received packet: %r', raw_pkt) + + # discard the checksum and copy + pkt = raw_pkt.value + + return pkt + + def _send_request(self, request_format, request_obj, response_format): + try: + request = request_format.build(request_obj) + self._send_packet(request) + + response_pkt = self._read_packet() + + return response_format.parse(response_pkt.message) + except construct.ConstructError as e: + raise lifescan.MalformedCommand(str(e)) + + def get_meter_info(self): + return common.MeterInfo( + 'OneTouch Verio IQ glucometer', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) + + def get_version(self): + response = self._send_request( + _VERSION_REQUEST, None, _VERSION_RESPONSE) + + return response.version + + def get_serial_number(self): + response = self._send_request( + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + + return response.serial_number + + def get_datetime(self): + response = self._send_request( + _READ_RTC_REQUEST, _READ_RTC_RESPONSE) + + return response.timestamp + + def set_datetime(self, date=datetime.datetime.now()): + response = self._send_request( + _WRITE_RTC_REQUEST, { + 'timestamp': date, + }, _READ_RTC_RESPONSE) + + return response.timestamp + + def zero_log(self): + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) + + def get_glucose_unit(self): + response = self._send_request( + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + + return response.unit + + def _get_reading_count(self): + response = self._send_request( + _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}, + _READING_COUNT_RESPONSE) + return response.count + + def _get_reading(self, record_id): + response = self._send_request( + _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + + return common.GlucoseReading( + response.timestamp, float(response.value)) + + def get_readings(self): + record_count = self._get_reading_count() + for record_id in range(record_count): + yield self._get_reading(record_id) -- cgit v1.2.3