diff options
Diffstat (limited to '')
-rw-r--r-- | README | 15 | ||||
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 326 | ||||
-rw-r--r-- | test/test_otultraeasy.py | 60 |
3 files changed, 398 insertions, 3 deletions
@@ -4,9 +4,18 @@ Glucometer Utilities This utility I created for personal needs, as my glucometer's software can only print the software (to XPS) without a file export option. -While right now only supports my glucometer (LifeScan OneTouch Ultra -2), I've tried designing it so that it can be extended and used for -other models as well. +While right now only supports the glucometers I own and for which I have +the specification of the protocol,I've tried designing it so that it can +be extended and used for other models and other brands as well. + +Supported devices +----------------- + + * **LifeScan OneTouch Ultra 2**: get information, get and set time, + dump of readings (including comments and meal specification) in + native units, memory reset. + * **LifeScan OneTouch Ultra Easy**: get information, get and set time, + dump of readings in native units, memory reset. Dump format ----------- diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py new file mode 100644 index 0000000..422a7c1 --- /dev/null +++ b/glucometerutils/drivers/otultraeasy.py @@ -0,0 +1,326 @@ +# -*- coding: utf-8 -*- +"""Driver for LifeScan OneTouch Ultra Easy devices""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2014, Diego Elio Pettenò' +__license__ = 'MIT' + +import array +import datetime +import re +import struct +import time + +import serial + +from glucometerutils import common +from glucometerutils import exceptions +from glucometerutils.drivers import lifescan_common + +_STX = 0x02 +_ETX = 0x03 + +_IDX_STX = 0 +_IDX_LENGTH = 1 +_IDX_CONTROL = 2 +_IDX_DATA = 3 +_IDX_ETX = -3 +_IDX_CHECKSUM = -2 + +_BIT_SENT_COUNTER = 0x01 +_BIT_EXPECT_RECEIVE = 0x02 +_BIT_ACK = 0x04 +_BIT_DISCONNECT = 0x08 +_BIT_MORE = 0x10 + +_READ_SERIAL_NUMBER = b'\x05\x0B\x02\x00\x00\x00\x00\x84\x6A\xE8\x73\x00' +_READ_VERSION = b'\x05\x0D\x02' +_READ_GLUCOSE_UNIT = b'\x05\x09\x02\x09\x00\x00\x00\x00' +_DELETE_RECORDS = b'\x05\x1A' +_READ_DATETIME = b'\x05\x20\x02\x00\x00\x00\x00' +_WRITE_DATETIME = b'\x05\x20\x01' +_READ_RECORD = b'\x05\x1F' + +_INVALID_RECORD = 501 + +_STRUCT_TIMESTAMP = struct.Struct('<I') +_STRUCT_RECORDID = struct.Struct('<H') + + +class UnsetPacketError(LookupError): + pass + + +class MalformedCommand(exceptions.InvalidResponse): + def __init__(self, position, expected, received): + exceptions.InvalidResponse.__init__( + self, 'Malformed command at position %s: expected %02x, received %02x' % ( + position, expected, received)) + + +def _convert_timestamp(timestamp_bytes): + timestamp, = _STRUCT_TIMESTAMP.unpack(timestamp_bytes) + + return datetime.datetime.fromtimestamp(timestamp) + + +class _Packet(object): + _STRUCT = struct.Struct('<H') + + @staticmethod + def _crc(cmd): + crc = 0xffff + + for byte in cmd: + crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff + crc ^= byte + crc ^= (crc & 0xff) >> 4 + crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff + crc ^= (crc & 0xff) << 5 + + return (crc & 0xffff) + + def __init__(self): + self.cmd = array.array('B') + + def read_from(self, serial): + self.cmd.extend(serial.read(3)) + + if self.cmd[_IDX_STX] != _STX: + raise MalformedCommand(_IDX_STX, _STX, self.cmd[_IDX_STX]) + + # the length includes prelude and appendix, which are six bytes total. + if self.length > 6: + self.cmd.extend(serial.read(self.length - 6)) + + self.cmd.extend(serial.read(3)) + + if self.cmd[_IDX_ETX] != _ETX: + raise MalformedCommand(_IDX_ETX, _ETX, self.cmd[_IDX_ETX]) + + def build_command(self, cmd_bytes): + self.cmd.append(_STX) + self.cmd.append(6 + len(cmd_bytes)) + self.cmd.append(0x00) # link control + self.cmd.extend(cmd_bytes) + self.cmd.extend([_ETX, 0x00, 0x00]) + + @property + def length(self): + if not self.cmd: + return None + + return self.cmd[_IDX_LENGTH] + + def __is_in_control(self, bitmask): + if not self.cmd: + return None + + return bool(self.cmd[_IDX_CONTROL] & bitmask) + + def __set_in_control(self, bitmask, value): + if not self.cmd: + return None + + if value: + self.cmd[_IDX_CONTROL] |= bitmask + else: + self.cmd[_IDX_CONTROL] &= (~bitmask) & 0xFF + + return value + + @property + def sent_counter(self): + return self.__is_in_control(_BIT_SENT_COUNTER) + + @sent_counter.setter + def sent_counter(self, value): + self.__set_in_control(_BIT_SENT_COUNTER, value) + + @property + def expect_receive(self): + return self.__is_in_control(_BIT_EXPECT_RECEIVE) + + @expect_receive.setter + def expect_receive(self, value): + self.__set_in_control(_BIT_EXPECT_RECEIVE, value) + + @property + def checksum(self): + return self._crc(self.cmd[:_IDX_CHECKSUM].tobytes()) + + @property + def acknowledge(self): + return self.__is_in_control(_BIT_ACK) + + @acknowledge.setter + def acknowledge(self, value): + self.__set_in_control(_BIT_ACK, value) + + @property + def disconnect(self): + return self.__is_in_control(_BIT_DISCONNECT) + + @disconnect.setter + def disconnect(self, value): + self.__set_in_control(_BIT_DISCONNECT, value) + + @property + def more(self): + return self.__is_in_control(_BIT_MORE) + + @more.setter + def more(self, value): + self.__set_in_control(_BIT_MORE, value) + + def validate_checksum(self): + expected_checksum = self.checksum + received_checksum = self._STRUCT.unpack(self.cmd[_IDX_CHECKSUM:])[0] + if received_checksum != expected_checksum: + raise lifescan_common.InvalidChecksum(expected_checksum, received_checksum) + + def update_checksum(self): + self._STRUCT.pack_into(self.cmd, _IDX_CHECKSUM, self.checksum) + + def tobytes(self): + return self.cmd.tobytes() + + @property + def data(self): + return self.cmd[_IDX_DATA:_IDX_ETX] + + +class Device(object): + def __init__(self, device): + self.serial_ = serial.Serial( + port=device, baudrate=9600, bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, + timeout=1, xonxoff=False, rtscts=False, dsrdtr=False, writeTimeout=None) + + self.sent_counter_ = False + self.expect_receive_ = False + + def connect(self): + self._send_command('', disconnect=True) + + def disconnect(self): + self.connect() + + def _read_response(self): + response = _Packet() + + response.read_from(self.serial_) + + if not response.disconnect and response.sent_counter != self.expect_receive_: + raise MalformedCommand('2[0b]', self.expect_receive_, response.expect_receive) + + if not response.acknowledge: + self.expect_receive_ = not self.expect_receive_ + + response.validate_checksum() + + if not response.acknowledge: + self._send_command('', acknowledge=True) + + return response + + def _send_command(self, cmd_bytes, acknowledge=False, disconnect=False): + cmd = _Packet() + + # set the proper expectations + cmd.build_command(cmd_bytes) + cmd.sent_counter = self.sent_counter_ + cmd.expect_receive = self.expect_receive_ + cmd.acknowledge = acknowledge + cmd.disconnect = disconnect + + cmd.update_checksum() + + self.serial_.write(cmd.tobytes()) + self.serial_.flush() + + if not acknowledge: + self.sent_counter_ = not self.sent_counter_ + result = self._read_response() + return result + + def get_information_string(self): + return ('OneTouch Ultra Easy glucometer\n' + 'Serial number: %s\n' + 'Software version: %s\n' + 'Time: %s\n' + 'Default unit: %s' % ( + self.get_serial_number(), + self.get_version(), + self.get_datetime(), + self.get_glucose_unit())) + + def get_version(self): + result = self._send_command(_READ_VERSION) + + response = self._read_response() + + return response.data[3:].tobytes().decode('ascii') + + def get_serial_number(self): + result = self._send_command(_READ_SERIAL_NUMBER) + + response = self._read_response() + + return response.data[2:].tobytes().decode('ascii') + + def get_datetime(self): + result = self._send_command(_READ_DATETIME) + response = self._read_response() + + return _convert_timestamp(response.data[2:6]) + + def set_datetime(self, date=datetime.datetime.now()): + epoch = datetime.datetime.utcfromtimestamp(0) + delta = date - epoch + timestamp = int(delta.total_seconds()) + + timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) + + result = self._send_command(_WRITE_DATETIME + timestamp_bytes) + + response = self._read_response() + return _convert_timestamp(response.data[2:6]) + + def zero_log(self): + result = self._send_command(_DELETE_RECORDS) + response = self._read_response() + + if response.data.tobytes() != b'\x05\x06': + raise exceptions.InvalidResponse(response.data) + + def get_glucose_unit(self): + result = self._send_command(_READ_GLUCOSE_UNIT) + response = self._read_response() + + if response.data[2] == 0: + return common.UNIT_MGDL + elif response.data[2] == 1: + return common.UNIT_MMOLL + else: + raise MalformedCommand('PM1', response.data[2], 0) + + def _get_reading(self, record_id): + id_bytes = _STRUCT_RECORDID.pack(record_id) + + result = self._send_command(_READ_RECORD + id_bytes) + return self._read_response() + + def get_readings(self): + count_response = self._get_reading(_INVALID_RECORD) + + record_count, = _STRUCT_RECORDID.unpack_from(count_response.data, 2) + + for record_id in range(record_count): + record_response = self._get_reading(record_id) + + timestamp = _convert_timestamp(record_response.data[2:6]) + value, = _STRUCT_TIMESTAMP.unpack_from(record_response.data, 6) + + yield common.Reading(timestamp, float(value)) diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py new file mode 100644 index 0000000..326aeea --- /dev/null +++ b/test/test_otultraeasy.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +"""Tests for the LifeScan OneTouch Ultra Mini driver.""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2013, Diego Elio Pettenò' +__license__ = 'MIT' + +import array +import os +import sys +import unittest + +import mock + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from glucometerutils.drivers import lifescan_common +from glucometerutils.drivers import otultraeasy +from glucometerutils import exceptions + +class TestOTUltraMini(unittest.TestCase): + def setUp(self): + self.addCleanup(mock.patch.stopall) + + mock_serial = mock.patch('serial.Serial').start() + self.mock_readline = mock_serial.return_value.readline + + self.device = otultraeasy.Device('mockdevice') + + def testCrc(self): + self.assertEqual( + 0x41cd, + otultraeasy._Packet._crc(b'\x02\x06\x06\x03')) + + cmd_array = array.array('B', b'\x02\x06\x08\x03') + self.assertEqual( + 0x62C2, + otultraeasy._Packet._crc(cmd_array)) + + def testPacketUpdateChecksum(self): + packet = otultraeasy._Packet() + + packet.build_command('') + packet.disconnect = True + + packet.update_checksum() + self.assertEqual( + b'\x02\x06\x08\x03\xC2\x62', + packet.tobytes()) + + packet.validate_checksum() + packet.disconnect = False + + with self.assertRaises(lifescan_common.InvalidChecksum): + packet.validate_checksum() + + +if __name__ == '__main__': + unittest.main() |