summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support/freestyle.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/support/freestyle.py')
-rw-r--r--glucometerutils/support/freestyle.py275
1 files changed, 16 insertions, 259 deletions
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index 13e48eb..28aec6a 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -9,110 +9,13 @@ https://protocols.glucometers.tech/abbott/shared-hid-protocol
"""
-import csv
import datetime
-import logging
-import re
-from typing import AnyStr, Callable, Iterator, List, Optional, Tuple
+import pathlib
+from typing import Optional
-import construct
+import freestyle_hid
from glucometerutils import driver, exceptions
-from glucometerutils.support import hiddevice
-
-_INIT_COMMAND = 0x01
-_INIT_RESPONSE = 0x71
-
-_KEEPALIVE_RESPONSE = 0x22
-_UNKNOWN_MESSAGE_RESPONSE = 0x30
-
-_ENCRYPTION_SETUP_COMMAND = 0x14
-_ENCRYPTION_SETUP_RESPONSE = 0x33
-
-_ALWAYS_UNENCRYPTED_MESSAGES = (
- _INIT_COMMAND,
- 0x04,
- 0x05,
- 0x06,
- 0x0C,
- 0x0D,
- _ENCRYPTION_SETUP_COMMAND,
- 0x15,
- _ENCRYPTION_SETUP_RESPONSE,
- 0x34,
- 0x35,
- _INIT_RESPONSE,
- _KEEPALIVE_RESPONSE,
-)
-
-
-def _create_matcher(
- message_type: int, content: Optional[bytes]
-) -> Callable[[Tuple[int, bytes]], bool]:
- def _matcher(message: Tuple[int, bytes]) -> bool:
- return message[0] == message_type and (content is None or content == message[1])
-
- return _matcher
-
-
-_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01")
-_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None)
-_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
-_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
-_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
-
-_FREESTYLE_MESSAGE = construct.Struct(
- hid_report=construct.Const(0, construct.Byte),
- message_type=construct.Byte,
- command=construct.Padded(
- 63, # command can only be up to 62 bytes, but one is used for length.
- construct.Prefixed(construct.Byte, construct.GreedyBytes),
- ),
-)
-
-_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
- hid_report=construct.Const(0, construct.Byte),
- message_type=construct.Byte,
- command=construct.Padded(
- 63, # command can only be up to 62 bytes, but one is used for length.
- construct.GreedyBytes,
- ),
-)
-
-_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
-_TEXT_REPLY_FORMAT = re.compile(
- b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n"
- b"CMD (?P<status>OK|Fail!)\r\n$",
- re.DOTALL,
-)
-
-_MULTIRECORDS_FORMAT = re.compile(
- "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL
-)
-
-
-def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None:
- """Calculate the simple checksum of the message and compare with expected.
-
- Args:
- message: (str) message to calculate the checksum of.
- expected_checksum_hex: hexadecimal string representing the checksum
- expected to match the message.
-
- Raises:
- InvalidChecksum: if the message checksum calculated does not match the one
- received.
- """
- expected_checksum = int(expected_checksum_hex, 16)
- if isinstance(message, bytes):
- all_bytes = (c for c in message)
- else:
- all_bytes = (ord(c) for c in message)
-
- calculated_checksum = sum(all_bytes)
-
- if expected_checksum != calculated_checksum:
- raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum)
def convert_ketone_unit(raw_value: float) -> float:
@@ -129,161 +32,6 @@ def convert_ketone_unit(raw_value: float) -> float:
ABBOTT_VENDOR_ID = 0x1A61
-class FreeStyleHidSession:
- def __init__(
- self,
- product_id: int,
- device_path: Optional[str],
- text_message_type: int,
- text_reply_message_type: int,
- ) -> None:
-
- self._hid_session = hiddevice.HidSession(
- (ABBOTT_VENDOR_ID, product_id), device_path
- )
- self._text_message_type = text_message_type
- self._text_reply_message_type = text_reply_message_type
-
- def connect(self):
- """Open connection to the device, starting the knocking sequence."""
- self.send_command(_INIT_COMMAND, b"")
- response = self.read_response()
- if not _is_init_reply(response):
- raise exceptions.ConnectionFailed(
- f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}"
- )
-
- def send_command(self, message_type: int, command: bytes, encrypted: bool = False):
- """Send a raw command to the device.
-
- Args:
- message_type: The first byte sent with the report to the device.
- command: The command to send out the device.
- """
- if encrypted:
- assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES
- meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE
- else:
- meta_construct = _FREESTYLE_MESSAGE
-
- usb_packet = meta_construct.build(
- {"message_type": message_type, "command": command}
- )
-
- logging.debug("Sending packet: %r", usb_packet)
- self._hid_session.write(usb_packet)
-
- def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]:
- """Read the response from the device and extracts it."""
- usb_packet = self._hid_session.read()
-
- logging.debug("Read packet: %r", usb_packet)
-
- assert usb_packet
- message_type = usb_packet[0]
-
- if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
- message_length = usb_packet[1]
- message_end_idx = 2 + message_length
- message_content = usb_packet[2:message_end_idx]
- else:
- message_content = usb_packet[1:]
-
- # hidapi module returns a list of bytes rather than a bytes object.
- message = (message_type, bytes(message_content))
-
- # There appears to be a stray number of 22 01 xx messages being returned
- # by some devices after commands are sent. These do not appear to have
- # meaning, so ignore them and proceed to the next. These are always sent
- # unencrypted, so we need to inspect them before we decide what the
- # message content is.
- if _is_keepalive_response(message):
- return self.read_response(encrypted=encrypted)
-
- if _is_unknown_message_error(message):
- raise exceptions.CommandError("Invalid command")
-
- if _is_encryption_missing_error(message):
- raise exceptions.CommandError("Device encryption not initialized.")
-
- if _is_encryption_setup_error(message):
- raise exceptions.CommandError("Device encryption initialization failed.")
-
- return message
-
- def send_text_command(self, command: bytes) -> str:
- """Send a command to the device that expects a text reply."""
- self.send_command(self._text_message_type, command)
-
- # Reply can stretch multiple buffers
- full_content = b""
- while True:
- message_type, content = self.read_response()
-
- logging.debug(
- "Received message: type %02x content %s", message_type, content.hex()
- )
-
- if message_type != self._text_reply_message_type:
- raise exceptions.InvalidResponse(
- f"Message type {message_type:02x}: content does not match expectations: {content!r}"
- )
-
- full_content += content
-
- if _TEXT_COMPLETION_RE.search(full_content):
- break
-
- match = _TEXT_REPLY_FORMAT.search(full_content)
- if not match:
- raise exceptions.InvalidResponse(repr(full_content))
-
- message = match.group("message")
- _verify_checksum(message, match.group("checksum"))
-
- if match.group("status") != b"OK":
- raise exceptions.InvalidResponse(repr(message) or "Command failed")
-
- # If there is anything in the response that is not ASCII-safe, this is
- # probably in the patient name. The Windows utility does not seem to
- # validate those, so just replace anything non-ASCII with the correct
- # unknown codepoint.
- return message.decode("ascii", "replace")
-
- def query_multirecord(self, command: bytes) -> Iterator[List[str]]:
- """Queries for, and returns, "multirecords" results.
-
- Multirecords are used for querying events, readings, history and similar
- other data out of a FreeStyle device. These are comma-separated values,
- variable-length.
-
- The validation includes the general HID framing parsing, as well as
- validation of the record count, and of the embedded records checksum.
-
- Args:
- command: The text command to send to the device for the query.
-
- Returns:
- A CSV reader object that returns a record for each line in the
- reply buffer.
- """
- message = self.send_text_command(command)
- logging.debug("Received multirecord message:\n%s", message)
- if message == "Log Empty\r\n":
- return iter(())
-
- match = _MULTIRECORDS_FORMAT.search(message)
- if not match:
- raise exceptions.InvalidResponse(message)
-
- records_str = match.group("message")
- _verify_checksum(records_str, match.group("checksum"))
-
- logging.debug("Received multi-record string: %s", records_str)
-
- return csv.reader(records_str.split("\r\n"))
-
-
class FreeStyleHidDevice(driver.GlucometerDevice):
"""Base class implementing the FreeStyle HID common protocol.
@@ -304,13 +52,22 @@ class FreeStyleHidDevice(driver.GlucometerDevice):
text_reply_cmd: int = 0x60,
) -> None:
super().__init__(device_path)
- self._session = FreeStyleHidSession(
- product_id, device_path, text_cmd, text_reply_cmd
- )
+ try:
+ self._session = freestyle_hid.Session(
+ product_id,
+ pathlib.Path(device_path) if device_path else None,
+ text_cmd,
+ text_reply_cmd,
+ )
+ except Exception as e:
+ raise exceptions.ConnectionFailed(str(e))
def connect(self) -> None:
"""Open connection to the device, starting the knocking sequence."""
- self._session.connect()
+ try:
+ self._session.connect()
+ except Exception as e:
+ raise exceptions.ConnectionFailed(str(e))
def disconnect(self) -> None:
"""Disconnect the device, nothing to be done."""