summaryrefslogtreecommitdiffstats
path: root/freestyle_hid/tools/extract_chatter.py
diff options
context:
space:
mode:
Diffstat (limited to 'freestyle_hid/tools/extract_chatter.py')
-rwxr-xr-xfreestyle_hid/tools/extract_chatter.py68
1 files changed, 20 insertions, 48 deletions
diff --git a/freestyle_hid/tools/extract_chatter.py b/freestyle_hid/tools/extract_chatter.py
index a77a0ec..b2c16f6 100755
--- a/freestyle_hid/tools/extract_chatter.py
+++ b/freestyle_hid/tools/extract_chatter.py
@@ -16,6 +16,7 @@ import construct
import usbmon
import usbmon.chatter
import usbmon.pcapng
+import usbmon.support.hid
logger = logging.getLogger()
click_log.basic_config(logger)
@@ -49,7 +50,6 @@ _ABBOTT_VENDOR_ID = 0x1A61
_LIBRE2_PRODUCT_ID = 0x3950
_ENCRYPTED_MESSAGE = construct.Struct(
- message_type=construct.Byte,
encrypted_message=construct.Bytes(64 - 1 - 4 - 4),
sequence_number=construct.Int32ul,
mac=construct.Int32ul,
@@ -107,14 +107,15 @@ def main(
session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False)
if not device_address:
- for descriptor in session.device_descriptors.values():
- if descriptor.vendor_id == _ABBOTT_VENDOR_ID:
- if device_address and device_address != descriptor.address:
- raise Exception(
- "Multiple Abbott device present in capture, please"
- " provide a --device-address flag."
- )
- device_address = descriptor.address
+ possible_addresses = list(session.find_devices_by_ids(_ABBOTT_VENDOR_ID, None))
+ if len(possible_addresses) > 1:
+ raise click.UsageError(
+ f"Multiple Abbott devices found, please select one of {','.join(possible_addresses)}"
+ )
+ elif len(possible_addresses) == 0:
+ raise click.UsageError("No Abbott devices found.")
+ else:
+ (device_address,) = possible_addresses
if device_address not in session.device_descriptors:
logging.warning(
@@ -128,38 +129,9 @@ def main(
if descriptor.product_id == _LIBRE2_PRODUCT_ID:
encrypted_protocol = True
- for first, second in session.in_pairs():
- # Ignore stray callbacks/errors.
- if not first.type == usbmon.constants.PacketType.SUBMISSION:
- continue
-
- if not first.address.startswith(f"{device_address}."):
- # No need to check second, they will be linked.
- continue
-
- if first.xfer_type == usbmon.constants.XferType.INTERRUPT:
- pass
- elif (
- first.xfer_type == usbmon.constants.XferType.CONTROL
- and not first.setup_packet
- or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore
- ):
- pass
- else:
- continue
-
- if first.direction == usbmon.constants.Direction.OUT:
- packet = first
- else:
- assert second is not None
- packet = second
-
- if not packet.payload:
- continue
-
- assert len(packet.payload) >= 2
-
- message_type = packet.payload[0]
+ for packet in usbmon.support.hid.select(session, device_address=device_address):
+ assert packet.report_content
+ message_type = packet.report_id
if message_type == _KEEPALIVE_TYPE and not print_keepalive:
continue
@@ -170,7 +142,7 @@ def main(
# With encrypted communication, the length of the message is also encrypted,
# and all the packets use the full 64 bytes. So instead, we extract what
# metadata we can.
- parsed = _ENCRYPTED_MESSAGE.parse(packet.payload)
+ parsed = _ENCRYPTED_MESSAGE.parse(packet.report_content)
message_metadata.extend(
[f"SEQUENCE_NUMBER={parsed.sequence_number}", f"MAC={parsed.mac:04x}"]
)
@@ -178,9 +150,9 @@ def main(
message_type_str = f"x{message_type:02x}"
message = parsed.encrypted_message
elif verbose_encryption_setup and message_type in _ENCRYPTION_SETUP_TYPES:
- message_length = packet.payload[1]
- message_end_idx = 2 + message_length
- message = packet.payload[2:message_end_idx]
+ message_length = packet.report_content[0]
+ message_end_idx = 1 + message_length
+ message = packet.report_content[1:message_end_idx]
if message[0] == _START_AUTHORIZE_CMD:
message_metadata.append("START_AUTHORIZE")
@@ -204,11 +176,11 @@ def main(
message_metadata.append(f"RAW_LENGTH={message_length}")
message_type_str = f" {message_type:02x}"
else:
- message_length = packet.payload[1]
+ message_length = packet.report_content[0]
message_metadata.append(f"LENGTH={message_length}")
- message_end_idx = 2 + message_length
+ message_end_idx = 1 + message_length
message_type_str = f" {message_type:02x}"
- message = packet.payload[2:message_end_idx]
+ message = packet.report_content[1:message_end_idx]
if message_metadata:
metadata_string = "\n".join(