From 4d3c59f2b5bf1e3afc620b12370be16db36a3688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Thu, 3 Aug 2023 00:38:53 +0100 Subject: Be more verbose on error messages and parsing with construct. Instead of manually combining and parsing integers (particularly IV that is a 56-bit integer), use construct formats to handle the parsing. --- freestyle_hid/_exceptions.py | 8 ++ freestyle_hid/_session.py | 151 ++++++++++++++++++++++++---------- freestyle_hid/tests/test_freestyle.py | 4 +- 3 files changed, 119 insertions(+), 44 deletions(-) diff --git a/freestyle_hid/_exceptions.py b/freestyle_hid/_exceptions.py index 2b803f8..aa71e7c 100644 --- a/freestyle_hid/_exceptions.py +++ b/freestyle_hid/_exceptions.py @@ -10,6 +10,10 @@ class ConnectionError(Exception): """Errors related to Session establishment.""" +class EncryptionHandshakeError(ConnectionError): + """Errors related to encryption handshake.""" + + class ChecksumError(Exception): """Errors related to the transmission checksums.""" @@ -18,6 +22,10 @@ class CommandError(Exception): """Errors related to the command stream.""" +class EncryptionNotInitialized(CommandError): + """Device needs encryption handshake.""" + + class MissingFreeStyleKeys(Exception): """The freestyle-hid-keys package is missing.""" diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py index a2c9195..03e8c54 100644 --- a/freestyle_hid/_session.py +++ b/freestyle_hid/_session.py @@ -10,7 +10,13 @@ from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple import construct -from ._exceptions import ChecksumError, CommandError, MissingFreeStyleKeys +from ._exceptions import ( + ChecksumError, + CommandError, + EncryptionHandshakeError, + EncryptionNotInitialized, + MissingFreeStyleKeys, +) from ._freestyle_encryption import SpeckCMAC, SpeckEncrypt from ._hidwrapper import HidWrapper @@ -64,13 +70,44 @@ _is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") _is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") _is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") +_HID_REPORT = construct.Struct( + number=construct.Byte, content=construct.Padded(64, construct.GreedyBytes) +) + _FREESTYLE_MESSAGE = construct.Struct( - hid_report=construct.Const(0, construct.Byte), message_type=construct.Byte, command=construct.Padded( - 63, # command can only be up to 62 bytes, but one is used for length. + 55, # command can only be up to 54 bytes, but one is used for length. construct.Prefixed(construct.Byte, construct.GreedyBytes), ), + iv_counter=construct.Padding(4), + mac=construct.Int32ul, +) + +_CHALLENGE_MESSAGE = construct.Struct( + subcmd=construct.Const(0x16, construct.Byte), + reader_nonce=construct.Bytes(8), + iv=construct.BytesInteger(7, signed=False, swapped=False), +) + +_CHALLENGE_RESPONSE_NOMAC_RAW = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_COMMAND, construct.Byte), + length=construct.Const(0x1A, construct.Byte), + response_subcmd=construct.Const(0x17, construct.Byte), + response=construct.Bytes(16), + const1=construct.Const(0x01, construct.Byte), +) + +_CHALLENGE_RESPONSE_RAW = construct.Struct( + response=_CHALLENGE_RESPONSE_NOMAC_RAW, + mac=construct.Int64ul, +) + +_CHALLENGE_ACCEPTED_MESSAGE = construct.Struct( + subcmd=construct.Const(0x18, construct.Byte), + encrypted_nonces=construct.Bytes(16), + iv=construct.BytesInteger(7, signed=False, swapped=False), + mac=construct.Int64ul, ) _TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") @@ -144,40 +181,66 @@ class Session: auth_mac = SpeckCMAC(auth_mac_key) self.send_command(_ENCRYPTION_SETUP_COMMAND, b"\x11") - response = self.read_response() - assert response[0] == _ENCRYPTION_SETUP_RESPONSE - assert response[1][0] == 0x16 - reader_rand = response[1][1:9] - iv = int.from_bytes(response[1][9:16], "big", signed=False) - driver_rand = random.randbytes(8) - resp_enc = auth_enc.encrypt(iv, reader_rand + driver_rand) - resp_mac = auth_mac.sign(b"\x14\x1a\x17" + resp_enc + b"\x01") - resp_mac = int.to_bytes(resp_mac, 8, byteorder="little", signed=False) - self.send_command( - _ENCRYPTION_SETUP_COMMAND, b"\x17" + resp_enc + b"\x01" + resp_mac + (response_type, response_bytes) = self.read_response() + + if response_type != _ENCRYPTION_SETUP_RESPONSE: + raise EncryptionHandshakeError( + f"Unexpected response type: {response_type:02x}" + ) + + challenge_response = _CHALLENGE_MESSAGE.parse(response_bytes) + host_nonce = random.randbytes(8) + + encrypted_challenge_response = auth_enc.encrypt( + challenge_response.iv, challenge_response.reader_nonce + host_nonce ) - response = self.read_response() - assert response[0] == _ENCRYPTION_SETUP_RESPONSE - assert response[1][0] == 0x18 - mac = auth_mac.sign(b"\x33\x22" + response[1][:24]) - mac = int.to_bytes(mac, 8, byteorder="little", signed=False) - assert mac == response[1][24:32] - iv = int.from_bytes(response[1][17:24], "big", signed=False) - resp_dec = auth_enc.decrypt(iv, response[1][1:17]) - assert resp_dec[:8] == driver_rand - assert resp_dec[8:] == reader_rand - crypt = SpeckCMAC(libre2_keys.SESSION_ENCRYPTION_KEY) - ses_enc_key = crypt.derive( - "SessnEnc".encode(), serial + reader_rand + driver_rand + raw_response_nomac = _CHALLENGE_RESPONSE_NOMAC_RAW.build( + {"response": encrypted_challenge_response} ) - crypt = SpeckCMAC(libre2_keys.SESSION_MAC_KEY) - ses_mac_key = crypt.derive( - "SessnMAC".encode(), serial + reader_rand + driver_rand + response_mac = auth_mac.sign(raw_response_nomac) + raw_response = _CHALLENGE_RESPONSE_RAW.build( + { + "response": {"response": encrypted_challenge_response}, + "mac": response_mac, + } ) + + self._write_hid(raw_response) + (response_type, response_bytes) = self.read_response() + + if response_type != _ENCRYPTION_SETUP_RESPONSE: + raise EncryptionHandshakeError( + f"Unexpected response type: {response_type:02x}" + ) + + acceptance_response = _CHALLENGE_ACCEPTED_MESSAGE.parse(response_bytes) + + # We need to reconstruct the raw message, so we include the expected type and size. + mac = auth_mac.sign(b"\x33\x22" + response_bytes[:24]) + + if mac != acceptance_response.mac: + raise EncryptionHandshakeError( + f"Challenge acceptance has incorrect MAC! Expected {mac:016x} received {acceptance_response.mac:016x}." + ) + + decoded_nonces = auth_enc.decrypt( + acceptance_response.iv, acceptance_response.encrypted_nonces + ) + + if decoded_nonces != host_nonce + challenge_response.reader_nonce: + raise EncryptionHandshakeError("Decrypted nonces do not match expectation.") + + context_key = serial + challenge_response.reader_nonce + host_nonce + + logging.debug(f"Context key established: {context_key.hex()}") + + crypt = SpeckCMAC(libre2_keys.SESSION_ENCRYPTION_KEY) + ses_enc_key = crypt.derive("SessnEnc".encode(), context_key) + crypt = SpeckCMAC(libre2_keys.SESSION_MAC_KEY) + ses_mac_key = crypt.derive("SessnMAC".encode(), context_key) self.crypt_enc = SpeckEncrypt(ses_enc_key) self.crypt_mac = SpeckCMAC(ses_mac_key) - # print("HANDSHAKE SUCCESSFUL!") def connect(self): """Open connection to the device, starting the knocking sequence.""" @@ -193,12 +256,12 @@ class Session: def encrypt_message(self, packet: bytes): output = bytearray(packet) # 0xFF IV is actually 0, because of some weird padding - encrypted = self.crypt_enc.encrypt(0xFF, packet[2:57]) - output[2:57] = encrypted + encrypted = self.crypt_enc.encrypt(0xFF, packet[1:56]) + output[1:56] = encrypted # Not giving a f**k about the IV counter for now - output[57:61] = bytes(4) - mac = self.crypt_mac.sign(output[1:61]) - output[61:65] = int.to_bytes(mac, 8, byteorder="little", signed=False)[4:] + output[56:60] = bytes(4) + mac = self.crypt_mac.sign(output[0:60]) + output[60:64] = int.to_bytes(mac, 8, byteorder="little", signed=False)[4:] return bytes(output) def decrypt_message(self, packet: bytes): @@ -210,6 +273,11 @@ class Session: output[1:56] = self.crypt_enc.decrypt(iv, packet[1:56]) return bytes(output) + def _write_hid(self, packet: bytes, hid_report: int = 0) -> None: + usb_packet = _HID_REPORT.build({"number": hid_report, "content": packet}) + logging.debug(f"Sending packet: {usb_packet!r}") + self._handle.write(usb_packet) + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): """Send a raw command to the device. @@ -218,18 +286,17 @@ class Session: command: The command to send out the device. """ - usb_packet = _FREESTYLE_MESSAGE.build( - {"message_type": message_type, "command": command} + message = _FREESTYLE_MESSAGE.build( + {"message_type": message_type, "command": command, "mac": 0} ) if ( self._encrypted_protocol and message_type not in _ALWAYS_UNENCRYPTED_MESSAGES ): - usb_packet = self.encrypt_message(usb_packet) + message = self.encrypt_message(message) - logging.debug(f"Sending packet: {usb_packet!r}") - self._handle.write(usb_packet) + self._write_hid(message) def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: """Read the response from the device and extracts it.""" @@ -265,7 +332,7 @@ class Session: raise CommandError("Invalid command") if _is_encryption_missing_error(message): - raise CommandError("Device encryption not initialized.") + raise EncryptionNotInitialized("Device encryption not initialized.") if _is_encryption_setup_error(message): raise CommandError("Device encryption initialization failed.") diff --git a/freestyle_hid/tests/test_freestyle.py b/freestyle_hid/tests/test_freestyle.py index feb7c15..ebf6b80 100644 --- a/freestyle_hid/tests/test_freestyle.py +++ b/freestyle_hid/tests/test_freestyle.py @@ -14,9 +14,9 @@ class TestFreeStyle(unittest.TestCase): """Test the generation of a new outgoing message.""" self.assertEqual( - b"\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" + b"\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", _session._FREESTYLE_MESSAGE.build( - {"message_type": 23, "command": b"command"} + {"message_type": 23, "command": b"command", "mac": 0} ), ) -- cgit v1.2.3