From f745ce94f71c16927b7ddb91986d6c026c21e7ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 4 Oct 2020 15:08:43 +0100 Subject: Initial import of freestyle-hid. This library is a factor-out of https://github.com/glucometers-tech/glucometerutils to only include the FreeStyle implementation, to make it easier to use outside of glucometerutils, and in particular to make it easier to build better reverse engineering tooling around it. Note that since the code was a mix of MIT and Apache-2.0 license, the overall license of the library is written down as Apache-2.0, as that would be a super-set of the requirements from MIT. --- freestyle_hid/_session.py | 264 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100644 freestyle_hid/_session.py (limited to 'freestyle_hid/_session.py') diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py new file mode 100644 index 0000000..7529ef3 --- /dev/null +++ b/freestyle_hid/_session.py @@ -0,0 +1,264 @@ +# SPDX-FileCopyrightText: © 2013 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + +import csv +import logging +import pathlib +import re +from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple + +import construct + +from ._exceptions import ChecksumError, CommandError +from ._hidwrapper import HidWrapper + +ABBOTT_VENDOR_ID = 0x1A61 + +_INIT_COMMAND = 0x01 +_INIT_RESPONSE = 0x71 + +_KEEPALIVE_RESPONSE = 0x22 +_UNKNOWN_MESSAGE_RESPONSE = 0x30 + +_ENCRYPTION_SETUP_COMMAND = 0x14 +_ENCRYPTION_SETUP_RESPONSE = 0x33 + +_ALWAYS_UNENCRYPTED_MESSAGES = ( + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, + _INIT_RESPONSE, + _KEEPALIVE_RESPONSE, +) + + +def _create_matcher( + message_type: int, content: Optional[bytes] +) -> Callable[[Tuple[int, bytes]], bool]: + def _matcher(message: Tuple[int, bytes]) -> bool: + return message[0] == message_type and (content is None or content == message[1]) + + return _matcher + + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") +_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") + +_FREESTYLE_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), +) + +_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.GreedyBytes, + ), +) + +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") +_TEXT_REPLY_FORMAT = re.compile( + b"^(?P.*)CKSM:(?P[0-9A-F]{8})\r\n" + b"CMD (?POK|Fail!)\r\n$", + re.DOTALL, +) + +_MULTIRECORDS_FORMAT = re.compile( + "^(?P.+\r\n)(?P[0-9]+),(?P[0-9A-F]{8})\r\n$", re.DOTALL +) + + +def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None: + """Calculate the simple checksum of the message and compare with expected. + + Args: + message: (str) message to calculate the checksum of. + expected_checksum_hex: hexadecimal string representing the checksum + expected to match the message. + + Raises: + InvalidChecksum: if the message checksum calculated does not match the one + received. + """ + expected_checksum = int(expected_checksum_hex, 16) + if isinstance(message, bytes): + all_bytes = (c for c in message) + else: + all_bytes = (ord(c) for c in message) + + calculated_checksum = sum(all_bytes) + + if expected_checksum != calculated_checksum: + raise ChecksumError( + f"Invalid checksum, expected {expected_checksum}, calculated {calculated_checksum}" + ) + + +class Session: + def __init__( + self, + product_id: Optional[int], + device_path: Optional[pathlib.Path], + text_message_type: int, + text_reply_message_type: int, + ) -> None: + self._handle = HidWrapper.open(device_path, ABBOTT_VENDOR_ID, product_id) + + self._text_message_type = text_message_type + self._text_reply_message_type = text_reply_message_type + + def connect(self): + """Open connection to the device, starting the knocking sequence.""" + self.send_command(_INIT_COMMAND, b"") + response = self.read_response() + if not _is_init_reply(response): + raise ConnectionError( + f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" + ) + + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): + """Send a raw command to the device. + + Args: + message_type: The first byte sent with the report to the device. + command: The command to send out the device. + """ + if encrypted: + assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES + meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE + else: + meta_construct = _FREESTYLE_MESSAGE + + usb_packet = meta_construct.build( + {"message_type": message_type, "command": command} + ) + + logging.debug(f"Sending packet: {usb_packet!r}") + self._handle.write(usb_packet) + + def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: + """Read the response from the device and extracts it.""" + usb_packet = self._handle.read() + + logging.debug(f"Read packet: {usb_packet!r}") + + assert usb_packet + message_type = usb_packet[0] + + if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: + message_length = usb_packet[1] + message_end_idx = 2 + message_length + message_content = usb_packet[2:message_end_idx] + else: + message_content = usb_packet[1:] + + # hidapi module returns a list of bytes rather than a bytes object. + message = (message_type, bytes(message_content)) + + # There appears to be a stray number of 22 01 xx messages being returned + # by some devices after commands are sent. These do not appear to have + # meaning, so ignore them and proceed to the next. These are always sent + # unencrypted, so we need to inspect them before we decide what the + # message content is. + if _is_keepalive_response(message): + return self.read_response(encrypted=encrypted) + + if _is_unknown_message_error(message): + raise CommandError("Invalid command") + + if _is_encryption_missing_error(message): + raise CommandError("Device encryption not initialized.") + + if _is_encryption_setup_error(message): + raise CommandError("Device encryption initialization failed.") + + return message + + def send_text_command(self, command: bytes) -> str: + """Send a command to the device that expects a text reply.""" + self.send_command(self._text_message_type, command) + + # Reply can stretch multiple buffers + full_content = b"" + while True: + message_type, content = self.read_response() + + logging.debug( + f"Received message: type {message_type:02x} content {content.hex()}" + ) + + if message_type != self._text_reply_message_type: + raise CommandError( + f"Message type {message_type:02x}: content does not match expectations: {content!r}" + ) + + full_content += content + + if _TEXT_COMPLETION_RE.search(full_content): + break + + match = _TEXT_REPLY_FORMAT.search(full_content) + if not match: + raise CommandError(repr(full_content)) + + message = match.group("message") + _verify_checksum(message, match.group("checksum")) + + if match.group("status") != b"OK": + raise CommandError(repr(message) or "Command failed") + + # If there is anything in the response that is not ASCII-safe, this is + # probably in the patient name. The Windows utility does not seem to + # validate those, so just replace anything non-ASCII with the correct + # unknown codepoint. + return message.decode("ascii", "replace") + + def query_multirecord(self, command: bytes) -> Iterator[Sequence[str]]: + """Queries for, and returns, "multirecords" results. + + Multirecords are used for querying events, readings, history and similar + other data out of a FreeStyle device. These are comma-separated values, + variable-length. + + The validation includes the general HID framing parsing, as well as + validation of the record count, and of the embedded records checksum. + + Args: + command: The text command to send to the device for the query. + + Returns: + A CSV reader object that returns a record for each line in the + reply buffer. + """ + message = self.send_text_command(command) + logging.debug(f"Received multirecord message:\n{message}") + if message == "Log Empty\r\n": + return iter(()) + + match = _MULTIRECORDS_FORMAT.search(message) + if not match: + raise CommandError(message) + + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) + + logging.debug(f"Received multi-record string: {records_str}") + + return csv.reader(records_str.split("\r\n")) -- cgit v1.2.3