From f745ce94f71c16927b7ddb91986d6c026c21e7ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 4 Oct 2020 15:08:43 +0100 Subject: Initial import of freestyle-hid. This library is a factor-out of https://github.com/glucometers-tech/glucometerutils to only include the FreeStyle implementation, to make it easier to use outside of glucometerutils, and in particular to make it easier to build better reverse engineering tooling around it. Note that since the code was a mix of MIT and Apache-2.0 license, the overall license of the library is written down as Apache-2.0, as that would be a super-set of the requirements from MIT. --- freestyle_hid/tools/__init__.py | 3 + freestyle_hid/tools/encrypted_setup_extractor.py | 162 ++++++++++++++++ freestyle_hid/tools/extract_chatter.py | 233 +++++++++++++++++++++++ freestyle_hid/tools/hid_console.py | 80 ++++++++ freestyle_hid/tools/py.typed | 2 + 5 files changed, 480 insertions(+) create mode 100644 freestyle_hid/tools/__init__.py create mode 100644 freestyle_hid/tools/encrypted_setup_extractor.py create mode 100755 freestyle_hid/tools/extract_chatter.py create mode 100755 freestyle_hid/tools/hid_console.py create mode 100644 freestyle_hid/tools/py.typed (limited to 'freestyle_hid/tools') diff --git a/freestyle_hid/tools/__init__.py b/freestyle_hid/tools/__init__.py new file mode 100644 index 0000000..3e0558f --- /dev/null +++ b/freestyle_hid/tools/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2013 The freestyle-hid Authors +# +# SPDX-License-Identifier: 0BSD diff --git a/freestyle_hid/tools/encrypted_setup_extractor.py b/freestyle_hid/tools/encrypted_setup_extractor.py new file mode 100644 index 0000000..dfe8229 --- /dev/null +++ b/freestyle_hid/tools/encrypted_setup_extractor.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +from typing import BinaryIO, Sequence + +import click +import click_log +import construct +import usbmon +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + + +_SERIAL_NUMBER_RESPONSE_TYPE = 0x06 +_ENCRYPTION_SETUP_REQ_TYPE = 0x14 +_ENCRYPTION_SETUP_RESP_TYPE = 0x33 + + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 + + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_SERIAL_NO = construct.Struct( + message_type=construct.Const(_SERIAL_NUMBER_RESPONSE_TYPE, construct.Byte), + length=construct.Const(14, construct.Byte), + serial_number=construct.PaddedString(13, "ascii"), + termination=construct.Const(0, construct.Byte), +) + +_CHALLENGE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_RESP_TYPE, construct.Byte), + length=construct.Const(16, construct.Byte), + subcmd=construct.Const(_CHALLENGE_CMD, construct.Byte), + challenge=construct.Bytes(8), + iv=construct.Bytes(7), +) + +_CHALLENGE_RESPONSE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_REQ_TYPE, construct.Byte), + length=construct.Const(26, construct.Byte), + subcmd=construct.Const(_CHALLENGE_RESPONSE_CMD, construct.Byte), + challenge_response_encrypted=construct.Bytes(16), + const=construct.Const(1, construct.Byte), + mac=construct.Bytes(8), +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.argument( + "pcap-files", + type=click.File(mode="rb"), + nargs=None, +) +def main(*, device_address: str, pcap_files: Sequence[BinaryIO]): + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + for pcap_file in pcap_files: + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if ( + descriptor.vendor_id == _ABBOTT_VENDOR_ID + and descriptor.product_id == _LIBRE2_PRODUCT_ID + ): + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Libre2 devices present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + else: + device_address = descriptor.address + + if device_address in session.device_descriptors: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + assert descriptor.product_id == _LIBRE2_PRODUCT_ID + + serial_number = "UNKNOWN" + challenge = "UNKNOWN" + iv = "UNKNOWN" + encrypted_challenge = "UNKNOWN" + mac = "UNKNOWN" + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _SERIAL_NUMBER_RESPONSE_TYPE: + obj = _SERIAL_NO.parse(packet.payload) + serial_number = obj.serial_number + elif ( + message_type == _ENCRYPTION_SETUP_RESP_TYPE + and packet.payload[2] == _CHALLENGE_CMD + ): + obj = _CHALLENGE.parse(packet.payload) + challenge = obj.challenge.hex() + iv = obj.iv.hex() + elif ( + message_type == _ENCRYPTION_SETUP_REQ_TYPE + and packet.payload[2] == _CHALLENGE_RESPONSE_CMD + ): + obj = _CHALLENGE_RESPONSE.parse(packet.payload) + encrypted_challenge = obj.challenge_response_encrypted.hex() + mac = obj.mac.hex() + + print(f"{serial_number},{challenge},{iv},{encrypted_challenge},{mac}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/extract_chatter.py b/freestyle_hid/tools/extract_chatter.py new file mode 100755 index 0000000..a77a0ec --- /dev/null +++ b/freestyle_hid/tools/extract_chatter.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +import textwrap +from typing import BinaryIO + +import click +import click_log +import construct +import usbmon +import usbmon.chatter +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + +_KEEPALIVE_TYPE = 0x22 + +_UNENCRYPTED_TYPES = ( + 0x01, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + 0x14, + 0x15, + 0x33, + 0x34, + 0x35, + 0x71, + _KEEPALIVE_TYPE, +) + +_ENCRYPTION_SETUP_TYPES = (0x14, 0x33) + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 +_CHALLENGE_ACCEPTED_CMD = 0x18 + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_ENCRYPTED_MESSAGE = construct.Struct( + message_type=construct.Byte, + encrypted_message=construct.Bytes(64 - 1 - 4 - 4), + sequence_number=construct.Int32ul, + mac=construct.Int32ul, +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.option( + "--encrypted-protocol / --no-encrypted-protocol", + default=False, + help=( + "Whether to expect encrypted protocol in the capture." + " Ignored if the device descriptors are present in the capture." + ), +) +@click.option( + "--verbose-encryption-setup / --no-verbose-encryption-setup", + default=False, + help=( + "Whether to parse encryption setup commands and printing their component" + " together with the raw messsage." + ), +) +@click.option( + "--print-keepalive / --no-print-keepalive", + default=False, + help=( + "Whether to print the keepalive messages sent by the device. " + "Keepalive messages are usually safely ignored." + ), +) +@click.argument( + "pcap-file", + type=click.File(mode="rb"), +) +def main( + *, + device_address: str, + encrypted_protocol: bool, + verbose_encryption_setup: bool, + print_keepalive: bool, + pcap_file: BinaryIO, +) -> None: + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if descriptor.vendor_id == _ABBOTT_VENDOR_ID: + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Abbott device present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + + if device_address not in session.device_descriptors: + logging.warning( + f"Unable to find device {device_address} in the capture's descriptors." + " Assuming non-encrypted protocol.", + ) + else: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + + if descriptor.product_id == _LIBRE2_PRODUCT_ID: + encrypted_protocol = True + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _KEEPALIVE_TYPE and not print_keepalive: + continue + + message_metadata = [] + + if encrypted_protocol and message_type not in _UNENCRYPTED_TYPES: + # With encrypted communication, the length of the message is also encrypted, + # and all the packets use the full 64 bytes. So instead, we extract what + # metadata we can. + parsed = _ENCRYPTED_MESSAGE.parse(packet.payload) + message_metadata.extend( + [f"SEQUENCE_NUMBER={parsed.sequence_number}", f"MAC={parsed.mac:04x}"] + ) + + message_type_str = f"x{message_type:02x}" + message = parsed.encrypted_message + elif verbose_encryption_setup and message_type in _ENCRYPTION_SETUP_TYPES: + message_length = packet.payload[1] + message_end_idx = 2 + message_length + message = packet.payload[2:message_end_idx] + + if message[0] == _START_AUTHORIZE_CMD: + message_metadata.append("START_AUTHORIZE") + elif message[0] == _CHALLENGE_CMD: + message_metadata.append("CHALLENGE") + challenge = message[1:9] + iv = message[9:16] + message_metadata.append(f"CHALLENGE={challenge.hex()}") + message_metadata.append(f"IV={iv.hex()}") + elif message[0] == _CHALLENGE_RESPONSE_CMD: + message_metadata.append("CHALLENGE_RESPONSE") + encrypted_challenge = message[1:17] + challenge_mac = message[18:26] + message_metadata.append( + f"ENCRYPTED_CHALLENGE={encrypted_challenge.hex()}" + ) + message_metadata.append(f"MAC={challenge_mac.hex()}") + elif message[0] == _CHALLENGE_ACCEPTED_CMD: + message_metadata.append("CHALLENGE_ACCEPTED") + + message_metadata.append(f"RAW_LENGTH={message_length}") + message_type_str = f" {message_type:02x}" + else: + message_length = packet.payload[1] + message_metadata.append(f"LENGTH={message_length}") + message_end_idx = 2 + message_length + message_type_str = f" {message_type:02x}" + message = packet.payload[2:message_end_idx] + + if message_metadata: + metadata_string = "\n".join( + textwrap.wrap( + " ".join(message_metadata), width=80, break_long_words=False + ) + ) + print(metadata_string) + + print( + usbmon.chatter.dump_bytes( + packet.direction, + message, + prefix=f"[{message_type_str}]", + print_empty=True, + ), + "\n", + ) + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/hid_console.py b/freestyle_hid/tools/hid_console.py new file mode 100755 index 0000000..b3b3fee --- /dev/null +++ b/freestyle_hid/tools/hid_console.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 +"""CLI tool to send messages through FreeStyle HID protocol.""" + +import logging +import pathlib +import sys +from typing import Optional + +import click +import click_log + +import freestyle_hid + +logger = logging.getLogger() +click_log.basic_config(logger) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--text-command-type", + "-c", + type=int, + default=0x60, + help="Message type for text commands sent to the device.", +) +@click.option( + "--text-reply-type", + "-r", + type=int, + default=0x60, + help="Message type for text replies received from the device.", +) +@click.option( + "--product-id", + "-p", + type=int, + help="Optional product ID (in alternative to the device path)", +) +@click.argument( + "device-path", + type=click.Path(exists=True, dir_okay=False, writable=True, allow_dash=False), + callback=lambda ctx, param, value: pathlib.Path(value) if value else None, + required=False, +) +def main( + *, + text_command_type: int, + text_reply_type: int, + product_id: Optional[int], + device_path: Optional[pathlib.Path], +): + if not product_id and not device_path: + raise click.UsageError( + "One of --product-id or DEVICE_PATH need to be provided." + ) + + session = freestyle_hid.Session( + product_id, device_path, text_command_type, text_reply_type + ) + + session.connect() + + while True: + if sys.stdin.isatty(): + command = input(">>> ") + else: + command = input() + print(f">>> {command}") + + try: + print(session.send_text_command(bytes(command, "ascii"))) + except freestyle_hid.CommandError as error: + print(f"! {error!r}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/py.typed b/freestyle_hid/tools/py.typed new file mode 100644 index 0000000..311e481 --- /dev/null +++ b/freestyle_hid/tools/py.typed @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 -- cgit v1.2.3