From d0a282b552922f31f80b2e979b79d5697fbbbcda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sun, 22 Mar 2020 16:41:48 +0000 Subject: Make HID support a "session" class rather than a base class. This in turn allows wrapping the FreeStyle access in its own session class, which the freestyle_hid_console can use without dirty tricks, and without triggering the now-abstract class. --- glucometerutils/drivers/contourusb.py | 8 +- glucometerutils/drivers/fsinsulinx.py | 5 +- glucometerutils/drivers/fslibre.py | 11 +- glucometerutils/drivers/fsprecisionneo.py | 5 +- glucometerutils/support/contourusb.py | 15 ++- glucometerutils/support/driver_base.py | 5 + glucometerutils/support/freestyle.py | 163 +++++++++++++----------- glucometerutils/support/hiddevice.py | 58 ++++----- reversing_tools/abbott/freestyle_hid_console.py | 10 +- 9 files changed, 153 insertions(+), 127 deletions(-) diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 5c9ed11..095f920 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -21,7 +21,7 @@ http://protocols.ascensia.com/Programming-Guide.aspx import datetime from glucometerutils import common -from glucometerutils.support import contourusb, driver_base +from glucometerutils.support import contourusb def _extract_timestamp(parsed_record, prefix=""): @@ -41,11 +41,11 @@ def _extract_timestamp(parsed_record, prefix=""): ) -class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): +class Device(contourusb.ContourHidDevice): """Glucometer driver for FreeStyle Libre devices.""" - USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour - USB_PRODUCT_ID = 0x6002 # type: int + def __init__(self, device): + self._hid_session = contourusb.ContourHidSession((0x1A79, 0x6002), device) def get_meter_info(self): self._get_info_record() diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index 5465b3a..a3e54fb 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -51,7 +51,8 @@ _InsulinxReading = collections.namedtuple( class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle InsuLinux devices.""" - USB_PRODUCT_ID = 0x3460 + def __init__(self, device_path): + super().__init__(0x3460, device_path) def get_meter_info(self): """Return the device information in structured form.""" @@ -68,7 +69,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b"$result?"): + for record in self._session.query_multirecord(b"$result?"): if not record or record[0] != _TYPE_GLUCOSE_READING: continue diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index 4802673..2e37ec1 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -195,7 +195,8 @@ def _parse_arresult(record): class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Libre devices.""" - USB_PRODUCT_ID = 0x3650 + def __init__(self, device_path): + super().__init__(0x3650, device_path) def get_meter_info(self): """Return the device information in structured form.""" @@ -209,7 +210,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_serial_number(self): """Overridden function as the command is not compatible.""" - return self._send_text_command(b"$sn?").rstrip("\r\n") + return self._session.send_text_command(b"$sn?").rstrip("\r\n") def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -221,7 +222,7 @@ class Device(freestyle.FreeStyleHidDevice): # First of all get the usually longer list of sensor readings, and # convert them to Readings objects. - for record in self._get_multirecord(b"$history?"): + for record in self._session.query_multirecord(b"$history?"): parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP) if not parsed_record or parsed_record["errors"] != 0: @@ -238,10 +239,10 @@ class Device(freestyle.FreeStyleHidDevice): # Then get the results of explicit scans and blood tests (and other # events). - for record in self._get_multirecord(b"$arresult?"): + for record in self._session.query_multirecord(b"$arresult?"): reading = _parse_arresult(record) if reading: yield reading def zero_log(self): - self._send_text_command(b"$resetpatient") + self._session.send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 909fed8..379bdf4 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -56,7 +56,8 @@ _NeoReading = collections.namedtuple( class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Precision Neo devices.""" - USB_PRODUCT_ID = 0x3850 + def __init__(self, device_path): + super().__init__(0x3850, device_path) def get_meter_info(self): """Return the device information in structured form.""" @@ -74,7 +75,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b"$result?"): + for record in self._session.query_multirecord(b"$result?"): cls = None if record and record[0] == _TYPE_GLUCOSE_READING: cls = common.GlucoseReading diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py index 9afb79d..6cd9442 100644 --- a/glucometerutils/support/contourusb.py +++ b/glucometerutils/support/contourusb.py @@ -15,13 +15,13 @@ import csv import datetime import logging import re -from typing import Dict, Iterator, List, Text +from typing import Dict, Iterator, List, Optional, Text, Tuple import construct from glucometerutils import exceptions from glucometerutils.exceptions import InvalidResponse -from glucometerutils.support import hiddevice +from glucometerutils.support import driver_base, hiddevice # regexr.com/4k6jb _HEADER_RECORD_RE = re.compile( @@ -64,7 +64,7 @@ class FrameError(Exception): pass -class ContourHidDevice(hiddevice.HidDevice): +class ContourHidDevice(driver_base.GlucometerDriver): """Base class implementing the ContourUSB HID common protocol. """ @@ -77,11 +77,16 @@ class ContourHidDevice(hiddevice.HidDevice): mode_command = object() state = None + def __init__(self, usb_ids, device_path): + # type: (Tuple[int, int], Optional[Text]) -> None + super().__init__(device_path) + self._hid_session = hiddevice.HidSession(usb_ids, device_path) + def read(self, r_size=blocksize): result = [] while True: - data = self._read() + data = self._hid_session.read() dstr = data result.append(dstr[4 : data[3] + 4]) if data[3] != self.blocksize - 4: @@ -94,7 +99,7 @@ class ContourHidDevice(hiddevice.HidDevice): pad_length = self.blocksize - len(data) data += pad_length * b"\x00" - self._write(data) + self._hid_session.write(data) USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py index 1caa960..d3e45b7 100644 --- a/glucometerutils/support/driver_base.py +++ b/glucometerutils/support/driver_base.py @@ -1,8 +1,13 @@ from abc import ABC, abstractmethod from datetime import datetime +from typing import Optional, Text class GlucometerDriver(ABC): + def __init__(self, device_path): + # type: (Optional[Text]) -> None + pass + def connect(self): pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index 86c53d0..b1eb2d2 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -128,38 +128,31 @@ def convert_ketone_unit(raw_value): return raw_value / 18.0 -class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC): - """Base class implementing the FreeStyle HID common protocol. - - This class implements opening, initializing the connection and sending - commands to the device, reading the response and confirming the checksums. +ABBOTT_VENDOR_ID = 0x1A61 - Commands sent to the devices over this protocol have a "message type" - prefixed to the command itself. Text command are usually sent with message - type 0x60, and the replied received with the same. Some devices may diverge - though. - """ - TEXT_CMD = 0x60 - TEXT_REPLY_CMD = 0x60 +class FreeStyleHidSession: + def __init__( + self, product_id, device_path, text_message_type, text_reply_message_type + ): + # type: (int, Optional[Text], int, int) -> None - USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care - USB_PRODUCT_ID = None # type: int + self._hid_session = hiddevice.HidSession( + (ABBOTT_VENDOR_ID, product_id), device_path + ) + self._text_message_type = text_message_type + self._text_reply_message_type = text_reply_message_type def connect(self): """Open connection to the device, starting the knocking sequence.""" - self._send_command(_INIT_COMMAND, b"") - response = self._read_response() + self.send_command(_INIT_COMMAND, b"") + response = self.read_response() if not _is_init_reply(response): raise exceptions.ConnectionFailed( f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" ) - def disconnect(self): - """Disconnect the device, nothing to be done.""" - pass - - def _send_command(self, message_type, command, encrypted=False): + def send_command(self, message_type, command, encrypted=False): # type: (int, bytes, bool) -> None """Send a raw command to the device. @@ -178,12 +171,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) ) logging.debug("Sending packet: %r", usb_packet) - self._write(usb_packet) + self._hid_session.write(usb_packet) - def _read_response(self, encrypted=False): + def read_response(self, encrypted=False): # type: (bool) -> Tuple[int, bytes] """Read the response from the device and extracts it.""" - usb_packet = self._read() + usb_packet = self._hid_session.read() logging.debug("Read packet: %r", usb_packet) @@ -205,7 +198,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) # unencrypted, so we need to inspect them before we decide what the # message content is. if _is_keepalive_response(message): - return self._read_response(encrypted=encrypted) + return self.read_response(encrypted=encrypted) if _is_unknown_message_error(message): raise exceptions.CommandError("Invalid command") @@ -218,21 +211,21 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) return message - def _send_text_command(self, command): + def send_text_command(self, command): # type: (bytes) -> Text """Send a command to the device that expects a text reply.""" - self._send_command(self.TEXT_CMD, command) + self.send_command(self._text_message_type, command) # Reply can stretch multiple buffers full_content = b"" while True: - message_type, content = self._read_response() + message_type, content = self.read_response() logging.debug( "Received message: type %02x content %s", message_type, content.hex() ) - if message_type != self.TEXT_REPLY_CMD: + if message_type != self._text_reply_message_type: raise exceptions.InvalidResponse( f"Message type {message_type:02x}: content does not match expectations: {content!r}" ) @@ -258,22 +251,84 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) # unknown codepoint. return message.decode("ascii", "replace") + def query_multirecord(self, command): + # type: (bytes) -> Iterator[List[Text]] + """Queries for, and returns, "multirecords" results. + + Multirecords are used for querying events, readings, history and similar + other data out of a FreeStyle device. These are comma-separated values, + variable-length. + + The validation includes the general HID framing parsing, as well as + validation of the record count, and of the embedded records checksum. + + Args: + command: The text command to send to the device for the query. + + Returns: + A CSV reader object that returns a record for each line in the + reply buffer. + """ + message = self.send_text_command(command) + logging.debug("Received multirecord message:\n%s", message) + if message == "Log Empty\r\n": + return iter(()) + + match = _MULTIRECORDS_FORMAT.search(message) + if not match: + raise exceptions.InvalidResponse(message) + + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) + + logging.debug("Received multi-record string: %s", records_str) + + return csv.reader(records_str.split("\r\n")) + + +class FreeStyleHidDevice(driver_base.GlucometerDriver): + """Base class implementing the FreeStyle HID common protocol. + + This class implements opening, initializing the connection and sending + commands to the device, reading the response and confirming the checksums. + + Commands sent to the devices over this protocol have a "message type" + prefixed to the command itself. Text command are usually sent with message + type 0x60, and the replied received with the same. Some devices may diverge + though. + """ + + def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60): + # type: (int, Optional[Text], int, int) -> None + super().__init__(device_path) + self._session = FreeStyleHidSession( + product_id, device_path, text_cmd, text_reply_cmd + ) + + def connect(self): + """Open connection to the device, starting the knocking sequence.""" + self._session.connect() + + def disconnect(self): + """Disconnect the device, nothing to be done.""" + pass + # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change # between them. def _get_version(self): # type: () -> Text """Return the software version of the device.""" - return self._send_text_command(b"$swver?").rstrip("\r\n") + return self._session.send_text_command(b"$swver?").rstrip("\r\n") def get_serial_number(self): # type: () -> Text """Returns the serial number of the device.""" - return self._send_text_command(b"$serlnum?").rstrip("\r\n") + return self._session.send_text_command(b"$serlnum?").rstrip("\r\n") def get_patient_name(self): # type: () -> Optional[Text] - patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n") + patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name @@ -285,7 +340,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) except UnicodeDecodeError: raise ValueError("Only ASCII-safe names are tested working") - result = self._send_text_command(b"$ptname," + encoded_name) + result = self._session.send_text_command(b"$ptname," + encoded_name) def get_datetime(self): # type: () -> datetime.datetime @@ -294,8 +349,8 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) This is one of the few commands that appear common to many of the FreeStyle devices that use the HID framing protocol. """ - date = self._send_text_command(b"$date?").rstrip("\r\n") - time = self._send_text_command(b"$time?").rstrip("\r\n") + date = self._session.send_text_command(b"$date?").rstrip("\r\n") + time = self._session.send_text_command(b"$time?").rstrip("\r\n") # Year is returned as an offset to 2000. month, day, year = (int(x) for x in date.split(",")) @@ -318,41 +373,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC) date_cmd = f"$date,{date.month},{date.day},{date.year - 2000}" time_cmd = f"$time,{date.hour},{date.minute}" - self._send_text_command(bytes(date_cmd, "ascii")) - self._send_text_command(bytes(time_cmd, "ascii")) + self._session.send_text_command(bytes(date_cmd, "ascii")) + self._session.send_text_command(bytes(time_cmd, "ascii")) return self.get_datetime() - - def _get_multirecord(self, command): - # type: (bytes) -> Iterator[List[Text]] - """Queries for, and returns, "multirecords" results. - - Multirecords are used for querying events, readings, history and similar - other data out of a FreeStyle device. These are comma-separated values, - variable-length. - - The validation includes the general HID framing parsing, as well as - validation of the record count, and of the embedded records checksum. - - Args: - command: (bytes) the text command to send to the device for the query. - - Returns: - (csv.reader): a CSV reader object that returns a record for each line - in the record file. - """ - message = self._send_text_command(command) - logging.debug("Received multirecord message:\n%s", message) - if message == "Log Empty\r\n": - return iter(()) - - match = _MULTIRECORDS_FORMAT.search(message) - if not match: - raise exceptions.InvalidResponse(message) - - records_str = match.group("message") - _verify_checksum(records_str, match.group("checksum")) - - logging.debug("Received multi-record string: %s", records_str) - - return csv.reader(records_str.split("\r\n")) diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py index 43461e7..41ad17c 100644 --- a/glucometerutils/support/hiddevice.py +++ b/glucometerutils/support/hiddevice.py @@ -6,44 +6,34 @@ import logging import os -from typing import BinaryIO, Optional, Text +from typing import BinaryIO, Optional, Text, Tuple from glucometerutils import exceptions -class HidDevice: - """A device speaking USB HID protocol driver base. +class HidSession: + """An access class to speak to USB HID based devices. - This class does not implement an actual driver by itself, but provides an - easier access to the boilerplate code required for speaking USB HID. - - This helper wraps around an optional dependency on hidapi library: if - present the driver will auto-detect the device, if not the device path needs - to be provided and should point to a device implementing Linux's hidraw - interface. - - The following constants can be set by the actual drivers: - - USB_VENDOR_ID: (int) USB vendor ID for the device. - USB_PRODUCT_ID: (int) USB product ID for the device. - - If the VID/PID pair is not provided, the driver will require a device path - to be used. - - Optional parameters available: - - TIMEOUT_MS: (int, default: 0) the read timeout in milliseconds, used - for hidapi reads only. If < 1, hidapi will be provided no timeout. + This class does not implement a full driver, but rather provide simpler read/write + methods abstracting the HID library. """ - USB_VENDOR_ID = None # type: int - USB_PRODUCT_ID = None # type: int + def __init__(self, usb_id, device, timeout_ms=0): + # type: (Optional[Tuple[int, int]], Optional[Text], int) -> None + """Construct a new session object. + + Args: + usb_id: Optional pair of vendor_id and product_id for the session. + This is required to use the hidapi library. + device: Optional path to Linux hidraw-style device path. If not provided, + usb_id needs to be provided instead. + timeout_ms: Timeout in milliseconds for read operations. Only relevant when + using hidapi library. + """ - TIMEOUT_MS = 0 # type: int + self._timeout_ms = timeout_ms - def __init__(self, device): - # type: (Optional[Text]) -> None - if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device: + if not usb_id and not device: raise exceptions.CommandLineError( "--device parameter is required, should point to a /dev/hidraw " "device node representing the meter." @@ -63,8 +53,10 @@ class HidDevice: try: import hid + assert usb_id + vendor_id, product_id = usb_id self.hidapi_handle_ = hid.device() - self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID) + self.hidapi_handle_.open(vendor_id, product_id) except ImportError: raise exceptions.ConnectionFailed( message='Missing requied "hidapi" module.' @@ -74,7 +66,7 @@ class HidDevice: message=f"Unable to connect to meter: {e}." ) - def _write(self, report): + def write(self, report): # type: (bytes) -> None """Writes a report to the HID handle.""" @@ -86,7 +78,7 @@ class HidDevice: if written < 0: raise exceptions.CommandError() - def _read(self, size=64): + def read(self, size=64): # type: (int) -> bytes """Read a report from the HID handle. @@ -96,4 +88,4 @@ class HidDevice: if self.handle_: return bytes(self.handle_.read(size)) - return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS)) + return bytes(self.hidapi_handle_.read(size, timeout_ms=self._timeout_ms)) diff --git a/reversing_tools/abbott/freestyle_hid_console.py b/reversing_tools/abbott/freestyle_hid_console.py index 654a97c..0ed4705 100755 --- a/reversing_tools/abbott/freestyle_hid_console.py +++ b/reversing_tools/abbott/freestyle_hid_console.py @@ -45,11 +45,11 @@ def main(): logging.basicConfig(level=args.vlog) - device = freestyle.FreeStyleHidDevice(args.device) - device.TEXT_CMD = args.text_cmd_type - device.TEXT_REPLY_CMD = args.text_reply_type + session = freestyle.FreeStyleHidSession( + None, args.device, args.text_cmd_type, args.text_reply_type + ) - device.connect() + session.connect() while True: if sys.stdin.isatty(): @@ -59,7 +59,7 @@ def main(): print(f">>> {command}") try: - print(device._send_text_command(bytes(command, "ascii"))) + print(session.send_text_command(bytes(command, "ascii"))) except exceptions.InvalidResponse as error: print(f"! {error}") -- cgit v1.2.3