From a020489f97ac97f595825cdfce7e42d3327b49a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sat, 25 Jan 2020 12:35:25 +0000 Subject: freestyle: add some (rough) support for encrypted messages. While the Libre 2 protocol is still not usable, this makes it possible to at least send messages to it and get a translation of what the mistake was in the first place. --- glucometerutils/support/freestyle.py | 97 +++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index c94a92b..54edc7c 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -12,7 +12,7 @@ import csv import datetime import logging import re -from typing import AnyStr, Iterator, List, Optional, Text, Tuple +from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple import construct @@ -20,6 +20,38 @@ from glucometerutils import exceptions from glucometerutils.support import hiddevice _INIT_COMMAND = 0x01 +_INIT_RESPONSE = 0x71 + +_KEEPALIVE_RESPONSE = 0x22 +_UNKNOWN_MESSAGE_RESPONSE = 0x30 + +_ENCRYPTION_SETUP_COMMAND = 0x14 +_ENCRYPTION_SETUP_RESPONSE = 0x33 + +_ALWAYS_UNENCRYPTED_MESSAGES = ( + _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d, + _ENCRYPTION_SETUP_COMMAND, 0x15, + _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35, + _INIT_RESPONSE, + _KEEPALIVE_RESPONSE, +) + +def _create_matcher(message_type, content): + # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] + def _matcher(message): + return ( + message[0] == message_type and + (content is None or content == message[1])) + + return _matcher + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01') +_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, b'\x05') +_is_uknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85') +_is_encryption_missing_error = _create_matcher( + _ENCRYPTION_SETUP_RESPONSE, b'\x15') +_is_encryption_setup_error = _create_matcher( + _ENCRYPTION_SETUP_RESPONSE, b'\x14') _FREESTYLE_MESSAGE = construct.Struct( 'hid_report' / construct.Const(0, construct.Byte), @@ -29,6 +61,14 @@ _FREESTYLE_MESSAGE = construct.Struct( construct.Prefixed(construct.Byte, construct.GreedyBytes)), ) +_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( + 'hid_report' / construct.Const(0, construct.Byte), + 'message_type' / construct.Byte, + 'command' / construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.GreedyBytes), +) + _TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)') _TEXT_REPLY_FORMAT = re.compile( b'^(?P.*)CKSM:(?P[0-9A-F]{8})\r\n' @@ -94,28 +134,38 @@ class FreeStyleHidDevice(hiddevice.HidDevice): def connect(self): """Open connection to the device, starting the knocking sequence.""" self._send_command(_INIT_COMMAND, b'') - self._read_response() # Only expect non-error result. + response = self._read_response() + if not _is_init_reply(response): + raise exceptions.ConnectionFailed( + 'Connection error: unexpected message %02x:%s' % ( + response[0], response[1].hex())) def disconnect(self): """Disconnect the device, nothing to be done.""" pass - def _send_command(self, message_type, command): - # type: (int, bytes) -> None + def _send_command(self, message_type, command, encrypted=False): + # type: (int, bytes, bool) -> None """Send a raw command to the device. Args: message_type: (int) The first byte sent with the report to the device. command: (bytes) The command to send out the device. """ - usb_packet = _FREESTYLE_MESSAGE.build( + if encrypted: + assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES + meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE + else: + meta_construct = _FREESTYLE_MESSAGE + + usb_packet = meta_construct.build( {'message_type': message_type, 'command': command}) logging.debug('Sending packet: %r', usb_packet) self._write(usb_packet) - def _read_response(self): - # type: () -> Tuple[int, bytes] + def _read_response(self, encrypted=False): + # type: (bool) -> Tuple[int, bytes] """Read the response from the device and extracts it.""" usb_packet = self._read() @@ -123,24 +173,36 @@ class FreeStyleHidDevice(hiddevice.HidDevice): assert usb_packet message_type = usb_packet[0] - message_length = usb_packet[1] - message_content = usb_packet[2:2+message_length] + + if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: + message_length = usb_packet[1] + message_content = usb_packet[2:2+message_length] + else: + message_content = usb_packet[1:] + + # hidapi module returns a list of bytes rather than a bytes object. + message = (message_type, bytes(message_content)) # There appears to be a stray number of 22 01 xx messages being returned # by some devices after commands are sent. These do not appear to have - # meaning, so ignore them and proceed to the next. - if message_type == 0x22 and message_length == 1: - return self._read_response() + # meaning, so ignore them and proceed to the next. These are always sent + # unencrypted, so we need to inspect them before we decide what the + # message content is. + if _is_keepalive_response(message): + return self._read_response(encrypted=encrypted) - if message_type == 0x30 and message_content == b'\x85': + if _is_uknown_message_error(message): raise exceptions.CommandError('Invalid command') - if message_type == 0x33 and message_content == b'\x15': + if _is_encryption_missing_error(message): + raise exceptions.CommandError( + 'Device encryption not initialized.') + + if _is_encryption_setup_error(message): raise exceptions.CommandError( 'Device encryption initialization failed.') - # hidapi module returns a list of bytes rather than a bytes object. - return (message_type, bytes(message_content)) + return message def _send_text_command(self, command): # type: (bytes) -> Text @@ -153,7 +215,8 @@ class FreeStyleHidDevice(hiddevice.HidDevice): message_type, content = self._read_response() logging.debug( - 'Received message: type %02x content %r', message_type, content) + 'Received message: type %02x content %s', + message_type, content.hex()) if message_type != self.TEXT_REPLY_CMD: raise exceptions.InvalidResponse( -- cgit v1.2.3