diff options
Diffstat (limited to 'glucometerutils/support')
-rw-r--r-- | glucometerutils/support/construct_extras.py | 2 | ||||
-rw-r--r-- | glucometerutils/support/contourusb.py | 163 | ||||
-rw-r--r-- | glucometerutils/support/driver_base.py | 1 | ||||
-rw-r--r-- | glucometerutils/support/freestyle.py | 147 | ||||
-rw-r--r-- | glucometerutils/support/hiddevice.py | 26 | ||||
-rw-r--r-- | glucometerutils/support/lifescan.py | 23 | ||||
-rw-r--r-- | glucometerutils/support/lifescan_binary_protocol.py | 45 | ||||
-rw-r--r-- | glucometerutils/support/serial.py | 17 |
8 files changed, 228 insertions, 196 deletions
diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py index 7abcd9e..b44ee84 100644 --- a/glucometerutils/support/construct_extras.py +++ b/glucometerutils/support/construct_extras.py @@ -7,6 +7,7 @@ import datetime import construct + class Timestamp(construct.Adapter): """Adapter for converting datetime object into timestamps. @@ -14,6 +15,7 @@ class Timestamp(construct.Adapter): and an optional epoch offset to the UNIX Epoch. """ + __slots__ = ["epoch"] def __init__(self, subcon, epoch=0): diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py index b4db4eb..3b0dc80 100644 --- a/glucometerutils/support/contourusb.py +++ b/glucometerutils/support/contourusb.py @@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile( "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})" "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|" "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|" - "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)") + "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)" +) _RESULT_RECORD_RE = re.compile( "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^" "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|" - "(?P<datetime>[0-9]+)") + "(?P<datetime>[0-9]+)" +) _RECORD_FORMAT = re.compile( - '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)' - '\x0d(?P<end>[\x03\x17]))' - '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a') + "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)" + "\x0d(?P<end>[\x03\x17]))" + "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a" +) + class FrameError(Exception): pass + class ContourHidDevice(hiddevice.HidDevice): """Base class implementing the ContourUSB HID common protocol. """ + blocksize = 64 # Operation modes @@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice): while True: data = self._read() dstr = data - result.append(dstr[4:data[3]+4]) - if data[3] != self.blocksize-4: + result.append(dstr[4 : data[3] + 4]) + if data[3] != self.blocksize - 4: break - return (b"".join(result)) + return b"".join(result) def write(self, data): - data = b'ABC' + chr(len(data)).encode() + data.encode() + data = b"ABC" + chr(len(data)).encode() + data.encode() pad_length = self.blocksize - len(data) - data += pad_length * b'\x00' + data += pad_length * b"\x00" self._write(data) - USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int def parse_header_record(self, text): header = _HEADER_RECORD_RE.search(text) - self.field_del = header.group('field_del') - self.repeat_del = header.group('repeat_del') - self.component_del = header.group('component_del') - self.escape_del = header.group('escape_del') - - self.product_code = header.group('product_code') - self.dig_ver = header.group('dig_ver') - self.anlg_ver = header.group('anlg_ver') - self.agp_ver = header.group('agp_ver') - - self.serial_num = header.group('serial_num') - self.sku_id = header.group('sku_id') - self.res_marking = header.group('res_marking') - self.config_bits = header.group('config_bits') - self.lang = header.group('lang') - self.interv = header.group('interv') - self.ref_method = header.group('ref_method') - self.internal = header.group('internal') + self.field_del = header.group("field_del") + self.repeat_del = header.group("repeat_del") + self.component_del = header.group("component_del") + self.escape_del = header.group("escape_del") + + self.product_code = header.group("product_code") + self.dig_ver = header.group("dig_ver") + self.anlg_ver = header.group("anlg_ver") + self.agp_ver = header.group("agp_ver") + + self.serial_num = header.group("serial_num") + self.sku_id = header.group("sku_id") + self.res_marking = header.group("res_marking") + self.config_bits = header.group("config_bits") + self.lang = header.group("lang") + self.interv = header.group("interv") + self.ref_method = header.group("ref_method") + self.internal = header.group("internal") # U limit - self.unit = header.group('unit') - self.lo_bound = header.group('lo_bound') - self.hi_bound = header.group('hi_bound') + self.unit = header.group("unit") + self.lo_bound = header.group("lo_bound") + self.hi_bound = header.group("hi_bound") # X field - self.hypo_limit = header.group('hypo_limit') - self.overall_low = header.group('overall_low') - self.pre_food_low = header.group('pre_food_low') - self.post_food_low = header.group('post_food_low') - self.overall_high = header.group('overall_high') - self.pre_food_high = header.group('pre_food_high') - self.post_food_high = header.group('post_food_high') - self.hyper_limit = header.group('hyper_limit') + self.hypo_limit = header.group("hypo_limit") + self.overall_low = header.group("overall_low") + self.pre_food_low = header.group("pre_food_low") + self.post_food_low = header.group("post_food_low") + self.overall_high = header.group("overall_high") + self.pre_food_high = header.group("pre_food_high") + self.post_food_high = header.group("post_food_high") + self.hyper_limit = header.group("hyper_limit") # Y field - self.upp_hyper = header.group('upp_hyper') - self.low_hyper = header.group('low_hyper') - self.upp_hypo = header.group('upp_hypo') - self.low_hypo = header.group('low_hypo') - self.upp_low_target = header.group('upp_low_target') - self.low_low_target = header.group('low_low_target') - self.upp_hi_target = header.group('upp_hi_target') - self.low_hi_target = header.group('low_hi_target') + self.upp_hyper = header.group("upp_hyper") + self.low_hyper = header.group("low_hyper") + self.upp_hypo = header.group("upp_hypo") + self.low_hypo = header.group("low_hypo") + self.upp_low_target = header.group("upp_low_target") + self.low_low_target = header.group("low_low_target") + self.upp_hi_target = header.group("upp_hi_target") + self.low_hi_target = header.group("low_hi_target") # Z field - self.trends = header.group('trends') + self.trends = header.group("trends") - self.total = header.group('total') - self.spec_ver = header.group('spec_ver') + self.total = header.group("total") + self.spec_ver = header.group("spec_ver") # Datetime string in YYYYMMDDHHMM format - self.datetime = header.group('datetime') - + self.datetime = header.group("datetime") def checksum(self, text): """ Implemented by Anders Hammarquist for glucodump project More info: https://bitbucket.org/iko/glucodump/src/default/ """ - checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1] - return ('00' + checksum)[-2:] + checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1] + return ("00" + checksum)[-2:] def checkframe(self, frame): """ @@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice): if not match: raise FrameError("Couldn't parse frame", frame) - recno = int(match.group('recno')) + recno = int(match.group("recno")) if self.currecno is None: self.currecno = recno @@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice): return None if recno != self.currecno: - raise FrameError("Bad recno, got %r expected %r" % - (recno, self.currecno), - frame) - - checksum = self.checksum(match.group('check')) - if checksum != match.group('checksum'): - raise FrameError("Checksum error: got %s expected %s" % - (match.group('checksum'), checksum), - frame) + raise FrameError( + "Bad recno, got %r expected %r" % (recno, self.currecno), frame + ) + + checksum = self.checksum(match.group("check")) + if checksum != match.group("checksum"): + raise FrameError( + "Checksum error: got %s expected %s" + % (match.group("checksum"), checksum), + frame, + ) self.currecno = (self.currecno + 1) % 8 - return match.group('text') + return match.group("text") def connect(self): """Connecting the device, nothing to be done. @@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice): self.state = self.mode_establish try: while True: - self.write('\x04') + self.write("\x04") res = self.read() if res[0] == 4 and res[-1] == 5: # we are connected and just got a header header_record = res.decode() - stx = header_record.find('\x02') + stx = header_record.find("\x02") if stx != -1: - result = _RECORD_FORMAT.match( - header_record[stx:]).group('text') + result = _RECORD_FORMAT.match(header_record[stx:]).group("text") self.parse_header_record(result) break else: @@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice): int(datetime_str[6:8]), # day int(datetime_str[8:10]), # hour int(datetime_str[10:12]), # minute - 0) + 0, + ) def sync(self): """ @@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ self.state = self.mode_establish try: - tometer = '\x04' + tometer = "\x04" result = None foo = 0 while True: @@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice): continue if data_bytes[-1] == 5: # got an <ENQ>, send <ACK> - tometer = '\x06' + tometer = "\x06" self.currecno = None continue if self.state == self.mode_data: @@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice): # got an <EOT>, done self.state = self.mode_precommand break - stx = data.find('\x02') + stx = data.find("\x02") if stx != -1: # got <STX>, parse frame try: result = self.checkframe(data[stx:]) - tometer = '\x06' + tometer = "\x06" self.state = self.mode_data except FrameError as e: - tometer = '\x15' # Couldn't parse, <NAK> + tometer = "\x15" # Couldn't parse, <NAK> else: # Got something we don't understand, <NAK> it - tometer = '\x15' + tometer = "\x15" except Exception as e: raise e @@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ records_arr = [] for rec in self.sync(): - if rec[0] == 'R': + if rec[0] == "R": # parse using result record regular expression rec_text = self.parse_result_record(rec) # get dictionary to use in main driver module without import re diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py index b7b3d0f..1caa960 100644 --- a/glucometerutils/support/driver_base.py +++ b/glucometerutils/support/driver_base.py @@ -3,7 +3,6 @@ from datetime import datetime class GlucometerDriver(ABC): - def connect(self): pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index d48ac04..341e978 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple import construct from glucometerutils import exceptions -from glucometerutils.support import hiddevice, driver_base +from glucometerutils.support import driver_base, hiddevice _INIT_COMMAND = 0x01 _INIT_RESPONSE = 0x71 @@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14 _ENCRYPTION_SETUP_RESPONSE = 0x33 _ALWAYS_UNENCRYPTED_MESSAGES = ( - _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d, - _ENCRYPTION_SETUP_COMMAND, 0x15, - _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35, + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, _INIT_RESPONSE, _KEEPALIVE_RESPONSE, ) + def _create_matcher(message_type, content): # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] def _matcher(message): - return ( - message[0] == message_type and - (content is None or content == message[1])) + return message[0] == message_type and (content is None or content == message[1]) return _matcher -_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01') + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") _is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) -_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85') -_is_encryption_missing_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x15') -_is_encryption_setup_error = _create_matcher( - _ENCRYPTION_SETUP_RESPONSE, b'\x14') +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") _FREESTYLE_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.Prefixed(construct.Byte, construct.GreedyBytes)), + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), ) _FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( - 'hid_report' / construct.Const(0, construct.Byte), - 'message_type' / construct.Byte, - 'command' / construct.Padded( + "hid_report" / construct.Const(0, construct.Byte), + "message_type" / construct.Byte, + "command" + / construct.Padded( 63, # command can only be up to 62 bytes, but one is used for length. - construct.GreedyBytes), + construct.GreedyBytes, + ), ) -_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)') +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") _TEXT_REPLY_FORMAT = re.compile( - b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n' - b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL) + b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n" + b"CMD (?P<status>OK|Fail!)\r\n$", + re.DOTALL, +) _MULTIRECORDS_FORMAT = re.compile( - '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$', - re.DOTALL) + "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL +) def _verify_checksum(message, expected_checksum_hex): @@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) TEXT_CMD = 0x60 TEXT_REPLY_CMD = 0x60 - USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care + USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care USB_PRODUCT_ID = None # type: int def connect(self): """Open connection to the device, starting the knocking sequence.""" - self._send_command(_INIT_COMMAND, b'') + self._send_command(_INIT_COMMAND, b"") response = self._read_response() if not _is_init_reply(response): raise exceptions.ConnectionFailed( - 'Connection error: unexpected message %02x:%s' % ( - response[0], response[1].hex())) + "Connection error: unexpected message %02x:%s" + % (response[0], response[1].hex()) + ) def disconnect(self): """Disconnect the device, nothing to be done.""" @@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) meta_construct = _FREESTYLE_MESSAGE usb_packet = meta_construct.build( - {'message_type': message_type, 'command': command}) + {"message_type": message_type, "command": command} + ) - logging.debug('Sending packet: %r', usb_packet) + logging.debug("Sending packet: %r", usb_packet) self._write(usb_packet) def _read_response(self, encrypted=False): @@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) """Read the response from the device and extracts it.""" usb_packet = self._read() - logging.debug('Read packet: %r', usb_packet) + logging.debug("Read packet: %r", usb_packet) assert usb_packet message_type = usb_packet[0] if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: message_length = usb_packet[1] - message_content = usb_packet[2:2+message_length] + message_content = usb_packet[2 : 2 + message_length] else: message_content = usb_packet[1:] @@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) return self._read_response(encrypted=encrypted) if _is_unknown_message_error(message): - raise exceptions.CommandError('Invalid command') + raise exceptions.CommandError("Invalid command") if _is_encryption_missing_error(message): - raise exceptions.CommandError( - 'Device encryption not initialized.') + raise exceptions.CommandError("Device encryption not initialized.") if _is_encryption_setup_error(message): - raise exceptions.CommandError( - 'Device encryption initialization failed.') + raise exceptions.CommandError("Device encryption initialization failed.") return message @@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) self._send_command(self.TEXT_CMD, command) # Reply can stretch multiple buffers - full_content = b'' + full_content = b"" while True: message_type, content = self._read_response() logging.debug( - 'Received message: type %02x content %s', - message_type, content.hex()) + "Received message: type %02x content %s", message_type, content.hex() + ) if message_type != self.TEXT_REPLY_CMD: raise exceptions.InvalidResponse( - 'Message type %02x does not match expectations: %r' % - (message_type, content)) + "Message type %02x does not match expectations: %r" + % (message_type, content) + ) full_content += content @@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(full_content) - message = match.group('message') - _verify_checksum(message, match.group('checksum')) + message = match.group("message") + _verify_checksum(message, match.group("checksum")) - if match.group('status') != b'OK': + if match.group("status") != b"OK": raise exceptions.InvalidResponse(message or "Command failed") # If there is anything in the response that is not ASCII-safe, this is # probably in the patient name. The Windows utility does not seem to # validate those, so just replace anything non-ASCII with the correct # unknown codepoint. - return message.decode('ascii', 'replace') + return message.decode("ascii", "replace") # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change @@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def _get_version(self): # type: () -> Text """Return the software version of the device.""" - return self._send_text_command(b'$swver?').rstrip('\r\n') + return self._send_text_command(b"$swver?").rstrip("\r\n") def get_serial_number(self): # type: () -> Text """Returns the serial number of the device.""" - return self._send_text_command(b'$serlnum?').rstrip('\r\n') + return self._send_text_command(b"$serlnum?").rstrip("\r\n") def get_patient_name(self): # type: () -> Optional[Text] - patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n') + patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name @@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) def set_patient_name(self, name): # type: (Text) -> None try: - encoded_name = name.encode('ascii') + encoded_name = name.encode("ascii") except UnicodeDecodeError: - raise ValueError('Only ASCII-safe names are tested working') + raise ValueError("Only ASCII-safe names are tested working") - result = self._send_text_command(b'$ptname,' + encoded_name) + result = self._send_text_command(b"$ptname," + encoded_name) def get_datetime(self): # type: () -> datetime.datetime @@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) This is one of the few commands that appear common to many of the FreeStyle devices that use the HID framing protocol. """ - date = self._send_text_command(b'$date?').rstrip('\r\n') - time = self._send_text_command(b'$time?').rstrip('\r\n') + date = self._send_text_command(b"$date?").rstrip("\r\n") + time = self._send_text_command(b"$time?").rstrip("\r\n") # Year is returned as an offset to 2000. - month, day, year = (int(x) for x in date.split(',')) - hour, minute = (int(x) for x in time.split(',')) + month, day, year = (int(x) for x in date.split(",")) + hour, minute = (int(x) for x in time.split(",")) # At least Precision Neo devices can have an invalid date (bad RTC?), # and report 255 for each field, which is not valid for @@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. - date_cmd = '$date,{month},{day},{year}'.format( - month=date.month, day=date.day, year=(date.year-2000)) - time_cmd = '$time,{hour},{minute}'.format( - hour=date.hour, minute=date.minute) + date_cmd = "$date,{month},{day},{year}".format( + month=date.month, day=date.day, year=(date.year - 2000) + ) + time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute) self._send_text_command(bytes(date_cmd, "ascii")) self._send_text_command(bytes(time_cmd, "ascii")) @@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) in the record file. """ message = self._send_text_command(command) - logging.debug('Received multirecord message:\n%s', message) + logging.debug("Received multirecord message:\n%s", message) if message == "Log Empty\r\n": return iter(()) @@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) if not match: raise exceptions.InvalidResponse(message) - records_str = match.group('message') - _verify_checksum(records_str, match.group('checksum')) + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) - logging.debug('Received multi-record string: %s', records_str) + logging.debug("Received multi-record string: %s", records_str) - return csv.reader(records_str.split('\r\n')) + return csv.reader(records_str.split("\r\n")) diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py index b44e5c7..ad8f3a6 100644 --- a/glucometerutils/support/hiddevice.py +++ b/glucometerutils/support/hiddevice.py @@ -45,33 +45,36 @@ class HidDevice: # type: (Optional[Text]) -> None if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device: raise exceptions.CommandLineError( - '--device parameter is required, should point to a /dev/hidraw ' - 'device node representing the meter.') + "--device parameter is required, should point to a /dev/hidraw " + "device node representing the meter." + ) # If the user passed a device path that does not exist, raise an # error. This is to avoid writing to a file instead of to a device node. if device and not os.path.exists(device): raise exceptions.ConnectionFailed( - message='Path %s does not exist.' % device) + message="Path %s does not exist." % device + ) # If the user passed a device, try opening it. if device: - self.handle_ = open(device, 'w+b') # type: Optional[BinaryIO] + self.handle_ = open(device, "w+b") # type: Optional[BinaryIO] else: self.handle_ = None - logging.info( - 'No --device parameter provided, using hidapi library.') + logging.info("No --device parameter provided, using hidapi library.") try: import hid + self.hidapi_handle_ = hid.device() - self.hidapi_handle_.open( - self.USB_VENDOR_ID, self.USB_PRODUCT_ID) + self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID) except ImportError: raise exceptions.ConnectionFailed( - message='Missing requied "hidapi" module.') + message='Missing requied "hidapi" module.' + ) except OSError as e: raise exceptions.ConnectionFailed( - message='Unable to connect to meter: %s.' % e) + message="Unable to connect to meter: %s." % e + ) def _write(self, report): # type: (bytes) -> None @@ -95,5 +98,4 @@ class HidDevice: if self.handle_: return bytes(self.handle_.read(size)) - return bytes(self.hidapi_handle_.read( - size, timeout_ms=self.TIMEOUT_MS)) + return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS)) diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py index 19155d4..9340e49 100644 --- a/glucometerutils/support/lifescan.py +++ b/glucometerutils/support/lifescan.py @@ -8,22 +8,25 @@ from glucometerutils import exceptions class MissingChecksum(exceptions.InvalidResponse): """The response misses the expected 4-digits checksum.""" + def __init__(self, response): super(MissingChecksum, self).__init__( - 'Response is missing checksum: %s' % response) + "Response is missing checksum: %s" % response + ) class InvalidSerialNumber(exceptions.Error): """The serial number is not as expected.""" + def __init__(self, serial_number): super(InvalidSerialNumber, self).__init__( - 'Serial number %s is invalid.' % serial_number) + "Serial number %s is invalid." % serial_number + ) class MalformedCommand(exceptions.InvalidResponse): def __init__(self, message): - super(MalformedCommand, self).__init__( - 'Malformed command: %s' % message) + super(MalformedCommand, self).__init__("Malformed command: %s" % message) def crc_ccitt(data): @@ -39,13 +42,13 @@ def crc_ccitt(data): This function uses the non-default 0xFFFF seed as used by multiple LifeScan meters. """ - crc = 0xffff + crc = 0xFFFF for byte in data: - crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff + crc = (crc >> 8) & 0xFFFF | (crc << 8) & 0xFFFF crc ^= byte - crc ^= (crc & 0xff) >> 4 - crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff - crc ^= (crc & 0xff) << 5 + crc ^= (crc & 0xFF) >> 4 + crc ^= (((crc << 8) & 0xFFFF) << 4) & 0xFFFF + crc ^= (crc & 0xFF) << 5 - return crc & 0xffff + return crc & 0xFFFF diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py index 288da83..441226e 100644 --- a/glucometerutils/support/lifescan_binary_protocol.py +++ b/glucometerutils/support/lifescan_binary_protocol.py @@ -12,48 +12,51 @@ This module implements an interface to send and receive these messages. import construct from glucometerutils import common -from glucometerutils.support import construct_extras -from glucometerutils.support import lifescan +from glucometerutils.support import construct_extras, lifescan _LINK_CONTROL = construct.BitStruct( construct.Padding(3), - 'more' / construct.Default(construct.Flag, False), - 'disconnect' / construct.Default(construct.Flag, False), - 'acknowledge' / construct.Default(construct.Flag, False), - 'expect_receive' / construct.Default(construct.Flag, False), - 'sequence_number' / construct.Default(construct.Flag, False), + "more" / construct.Default(construct.Flag, False), + "disconnect" / construct.Default(construct.Flag, False), + "acknowledge" / construct.Default(construct.Flag, False), + "expect_receive" / construct.Default(construct.Flag, False), + "sequence_number" / construct.Default(construct.Flag, False), ) + def LifeScanPacket(include_link_control): # pylint: disable=invalid-name # type: (bool) -> construct.Struct if include_link_control: link_control_construct = _LINK_CONTROL else: - link_control_construct = construct.Const(b'\x00') + link_control_construct = construct.Const(b"\x00") return construct.Struct( - 'data' / construct.RawCopy( + "data" + / construct.RawCopy( construct.Struct( - construct.Const(b'\x02'), # stx - 'length' / construct.Rebuild( - construct.Byte, lambda this: len(this.message) + 6), - 'link_control' / link_control_construct, - 'message' / construct.Bytes( - lambda this: this.length - 6), - construct.Const(b'\x03'), # etx + construct.Const(b"\x02"), # stx + "length" + / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 6), + "link_control" / link_control_construct, + "message" / construct.Bytes(lambda this: this.length - 6), + construct.Const(b"\x03"), # etx ), ), - 'checksum' / construct.Checksum( - construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data), + "checksum" + / construct.Checksum( + construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data + ), ) + VERIO_TIMESTAMP = construct_extras.Timestamp( - construct.Int32ul, epoch=946684800) # 2000-01-01 (not 2010) + construct.Int32ul, epoch=946684800 +) # 2000-01-01 (not 2010) _GLUCOSE_UNIT_MAPPING_TABLE = { common.Unit.MG_DL: 0x00, common.Unit.MMOL_L: 0x01, } -GLUCOSE_UNIT = construct.Mapping( - construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE) +GLUCOSE_UNIT = construct.Mapping(construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE) diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py index 6067cf7..d9e80ea 100644 --- a/glucometerutils/support/serial.py +++ b/glucometerutils/support/serial.py @@ -8,7 +8,6 @@ import logging from typing import Optional, Text import serial - from glucometerutils import exceptions @@ -48,19 +47,23 @@ class SerialDevice: assert self.BAUDRATE is not None if not device and self.DEFAULT_CABLE_ID: - logging.info( - 'No --device parameter provided, looking for default cable.') - device = 'hwgrep://' + self.DEFAULT_CABLE_ID + logging.info("No --device parameter provided, looking for default cable.") + device = "hwgrep://" + self.DEFAULT_CABLE_ID if not device: raise exceptions.CommandLineError( - 'No --device parameter provided, and no default cable known.') + "No --device parameter provided, and no default cable known." + ) self.serial_ = serial.serial_for_url( device, baudrate=self.BAUDRATE, timeout=self.TIMEOUT, writeTimeout=None, - bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, - xonxoff=False, rtscts=False, dsrdtr=False) + xonxoff=False, + rtscts=False, + dsrdtr=False, + ) |