summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDiego Elio Pettenò <flameeyes@flameeyes.com>2020-03-22 17:41:48 +0100
committermergify[bot] <37929162+mergify[bot]@users.noreply.github.com>2020-03-22 17:50:42 +0100
commitd0a282b552922f31f80b2e979b79d5697fbbbcda (patch)
treec6e76cd3333bf60e1db8b787c2209043589bbdf9
parentExclude mypy from Python 3.9 (diff)
downloadglucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar.gz
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar.bz2
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar.lz
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar.xz
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.tar.zst
glucometerutils-d0a282b552922f31f80b2e979b79d5697fbbbcda.zip
-rw-r--r--glucometerutils/drivers/contourusb.py8
-rw-r--r--glucometerutils/drivers/fsinsulinx.py5
-rw-r--r--glucometerutils/drivers/fslibre.py11
-rw-r--r--glucometerutils/drivers/fsprecisionneo.py5
-rw-r--r--glucometerutils/support/contourusb.py15
-rw-r--r--glucometerutils/support/driver_base.py5
-rw-r--r--glucometerutils/support/freestyle.py163
-rw-r--r--glucometerutils/support/hiddevice.py58
-rwxr-xr-xreversing_tools/abbott/freestyle_hid_console.py10
9 files changed, 153 insertions, 127 deletions
diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py
index 5c9ed11..095f920 100644
--- a/glucometerutils/drivers/contourusb.py
+++ b/glucometerutils/drivers/contourusb.py
@@ -21,7 +21,7 @@ http://protocols.ascensia.com/Programming-Guide.aspx
import datetime
from glucometerutils import common
-from glucometerutils.support import contourusb, driver_base
+from glucometerutils.support import contourusb
def _extract_timestamp(parsed_record, prefix=""):
@@ -41,11 +41,11 @@ def _extract_timestamp(parsed_record, prefix=""):
)
-class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver):
+class Device(contourusb.ContourHidDevice):
"""Glucometer driver for FreeStyle Libre devices."""
- USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
- USB_PRODUCT_ID = 0x6002 # type: int
+ def __init__(self, device):
+ self._hid_session = contourusb.ContourHidSession((0x1A79, 0x6002), device)
def get_meter_info(self):
self._get_info_record()
diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py
index 5465b3a..a3e54fb 100644
--- a/glucometerutils/drivers/fsinsulinx.py
+++ b/glucometerutils/drivers/fsinsulinx.py
@@ -51,7 +51,8 @@ _InsulinxReading = collections.namedtuple(
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle InsuLinux devices."""
- USB_PRODUCT_ID = 0x3460
+ def __init__(self, device_path):
+ super().__init__(0x3460, device_path)
def get_meter_info(self):
"""Return the device information in structured form."""
@@ -68,7 +69,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b"$result?"):
+ for record in self._session.query_multirecord(b"$result?"):
if not record or record[0] != _TYPE_GLUCOSE_READING:
continue
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py
index 4802673..2e37ec1 100644
--- a/glucometerutils/drivers/fslibre.py
+++ b/glucometerutils/drivers/fslibre.py
@@ -195,7 +195,8 @@ def _parse_arresult(record):
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle Libre devices."""
- USB_PRODUCT_ID = 0x3650
+ def __init__(self, device_path):
+ super().__init__(0x3650, device_path)
def get_meter_info(self):
"""Return the device information in structured form."""
@@ -209,7 +210,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_serial_number(self):
"""Overridden function as the command is not compatible."""
- return self._send_text_command(b"$sn?").rstrip("\r\n")
+ return self._session.send_text_command(b"$sn?").rstrip("\r\n")
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -221,7 +222,7 @@ class Device(freestyle.FreeStyleHidDevice):
# First of all get the usually longer list of sensor readings, and
# convert them to Readings objects.
- for record in self._get_multirecord(b"$history?"):
+ for record in self._session.query_multirecord(b"$history?"):
parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)
if not parsed_record or parsed_record["errors"] != 0:
@@ -238,10 +239,10 @@ class Device(freestyle.FreeStyleHidDevice):
# Then get the results of explicit scans and blood tests (and other
# events).
- for record in self._get_multirecord(b"$arresult?"):
+ for record in self._session.query_multirecord(b"$arresult?"):
reading = _parse_arresult(record)
if reading:
yield reading
def zero_log(self):
- self._send_text_command(b"$resetpatient")
+ self._session.send_text_command(b"$resetpatient")
diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py
index 909fed8..379bdf4 100644
--- a/glucometerutils/drivers/fsprecisionneo.py
+++ b/glucometerutils/drivers/fsprecisionneo.py
@@ -56,7 +56,8 @@ _NeoReading = collections.namedtuple(
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle Precision Neo devices."""
- USB_PRODUCT_ID = 0x3850
+ def __init__(self, device_path):
+ super().__init__(0x3850, device_path)
def get_meter_info(self):
"""Return the device information in structured form."""
@@ -74,7 +75,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b"$result?"):
+ for record in self._session.query_multirecord(b"$result?"):
cls = None
if record and record[0] == _TYPE_GLUCOSE_READING:
cls = common.GlucoseReading
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
index 9afb79d..6cd9442 100644
--- a/glucometerutils/support/contourusb.py
+++ b/glucometerutils/support/contourusb.py
@@ -15,13 +15,13 @@ import csv
import datetime
import logging
import re
-from typing import Dict, Iterator, List, Text
+from typing import Dict, Iterator, List, Optional, Text, Tuple
import construct
from glucometerutils import exceptions
from glucometerutils.exceptions import InvalidResponse
-from glucometerutils.support import hiddevice
+from glucometerutils.support import driver_base, hiddevice
# regexr.com/4k6jb
_HEADER_RECORD_RE = re.compile(
@@ -64,7 +64,7 @@ class FrameError(Exception):
pass
-class ContourHidDevice(hiddevice.HidDevice):
+class ContourHidDevice(driver_base.GlucometerDriver):
"""Base class implementing the ContourUSB HID common protocol.
"""
@@ -77,11 +77,16 @@ class ContourHidDevice(hiddevice.HidDevice):
mode_command = object()
state = None
+ def __init__(self, usb_ids, device_path):
+ # type: (Tuple[int, int], Optional[Text]) -> None
+ super().__init__(device_path)
+ self._hid_session = hiddevice.HidSession(usb_ids, device_path)
+
def read(self, r_size=blocksize):
result = []
while True:
- data = self._read()
+ data = self._hid_session.read()
dstr = data
result.append(dstr[4 : data[3] + 4])
if data[3] != self.blocksize - 4:
@@ -94,7 +99,7 @@ class ContourHidDevice(hiddevice.HidDevice):
pad_length = self.blocksize - len(data)
data += pad_length * b"\x00"
- self._write(data)
+ self._hid_session.write(data)
USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py
index 1caa960..d3e45b7 100644
--- a/glucometerutils/support/driver_base.py
+++ b/glucometerutils/support/driver_base.py
@@ -1,8 +1,13 @@
from abc import ABC, abstractmethod
from datetime import datetime
+from typing import Optional, Text
class GlucometerDriver(ABC):
+ def __init__(self, device_path):
+ # type: (Optional[Text]) -> None
+ pass
+
def connect(self):
pass
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index 86c53d0..b1eb2d2 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -128,38 +128,31 @@ def convert_ketone_unit(raw_value):
return raw_value / 18.0
-class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC):
- """Base class implementing the FreeStyle HID common protocol.
-
- This class implements opening, initializing the connection and sending
- commands to the device, reading the response and confirming the checksums.
+ABBOTT_VENDOR_ID = 0x1A61
- Commands sent to the devices over this protocol have a "message type"
- prefixed to the command itself. Text command are usually sent with message
- type 0x60, and the replied received with the same. Some devices may diverge
- though.
- """
- TEXT_CMD = 0x60
- TEXT_REPLY_CMD = 0x60
+class FreeStyleHidSession:
+ def __init__(
+ self, product_id, device_path, text_message_type, text_reply_message_type
+ ):
+ # type: (int, Optional[Text], int, int) -> None
- USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care
- USB_PRODUCT_ID = None # type: int
+ self._hid_session = hiddevice.HidSession(
+ (ABBOTT_VENDOR_ID, product_id), device_path
+ )
+ self._text_message_type = text_message_type
+ self._text_reply_message_type = text_reply_message_type
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
- self._send_command(_INIT_COMMAND, b"")
- response = self._read_response()
+ self.send_command(_INIT_COMMAND, b"")
+ response = self.read_response()
if not _is_init_reply(response):
raise exceptions.ConnectionFailed(
f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}"
)
- def disconnect(self):
- """Disconnect the device, nothing to be done."""
- pass
-
- def _send_command(self, message_type, command, encrypted=False):
+ def send_command(self, message_type, command, encrypted=False):
# type: (int, bytes, bool) -> None
"""Send a raw command to the device.
@@ -178,12 +171,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
)
logging.debug("Sending packet: %r", usb_packet)
- self._write(usb_packet)
+ self._hid_session.write(usb_packet)
- def _read_response(self, encrypted=False):
+ def read_response(self, encrypted=False):
# type: (bool) -> Tuple[int, bytes]
"""Read the response from the device and extracts it."""
- usb_packet = self._read()
+ usb_packet = self._hid_session.read()
logging.debug("Read packet: %r", usb_packet)
@@ -205,7 +198,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# unencrypted, so we need to inspect them before we decide what the
# message content is.
if _is_keepalive_response(message):
- return self._read_response(encrypted=encrypted)
+ return self.read_response(encrypted=encrypted)
if _is_unknown_message_error(message):
raise exceptions.CommandError("Invalid command")
@@ -218,21 +211,21 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
return message
- def _send_text_command(self, command):
+ def send_text_command(self, command):
# type: (bytes) -> Text
"""Send a command to the device that expects a text reply."""
- self._send_command(self.TEXT_CMD, command)
+ self.send_command(self._text_message_type, command)
# Reply can stretch multiple buffers
full_content = b""
while True:
- message_type, content = self._read_response()
+ message_type, content = self.read_response()
logging.debug(
"Received message: type %02x content %s", message_type, content.hex()
)
- if message_type != self.TEXT_REPLY_CMD:
+ if message_type != self._text_reply_message_type:
raise exceptions.InvalidResponse(
f"Message type {message_type:02x}: content does not match expectations: {content!r}"
)
@@ -258,22 +251,84 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# unknown codepoint.
return message.decode("ascii", "replace")
+ def query_multirecord(self, command):
+ # type: (bytes) -> Iterator[List[Text]]
+ """Queries for, and returns, "multirecords" results.
+
+ Multirecords are used for querying events, readings, history and similar
+ other data out of a FreeStyle device. These are comma-separated values,
+ variable-length.
+
+ The validation includes the general HID framing parsing, as well as
+ validation of the record count, and of the embedded records checksum.
+
+ Args:
+ command: The text command to send to the device for the query.
+
+ Returns:
+ A CSV reader object that returns a record for each line in the
+ reply buffer.
+ """
+ message = self.send_text_command(command)
+ logging.debug("Received multirecord message:\n%s", message)
+ if message == "Log Empty\r\n":
+ return iter(())
+
+ match = _MULTIRECORDS_FORMAT.search(message)
+ if not match:
+ raise exceptions.InvalidResponse(message)
+
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
+
+ logging.debug("Received multi-record string: %s", records_str)
+
+ return csv.reader(records_str.split("\r\n"))
+
+
+class FreeStyleHidDevice(driver_base.GlucometerDriver):
+ """Base class implementing the FreeStyle HID common protocol.
+
+ This class implements opening, initializing the connection and sending
+ commands to the device, reading the response and confirming the checksums.
+
+ Commands sent to the devices over this protocol have a "message type"
+ prefixed to the command itself. Text command are usually sent with message
+ type 0x60, and the replied received with the same. Some devices may diverge
+ though.
+ """
+
+ def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60):
+ # type: (int, Optional[Text], int, int) -> None
+ super().__init__(device_path)
+ self._session = FreeStyleHidSession(
+ product_id, device_path, text_cmd, text_reply_cmd
+ )
+
+ def connect(self):
+ """Open connection to the device, starting the knocking sequence."""
+ self._session.connect()
+
+ def disconnect(self):
+ """Disconnect the device, nothing to be done."""
+ pass
+
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
# between them.
def _get_version(self):
# type: () -> Text
"""Return the software version of the device."""
- return self._send_text_command(b"$swver?").rstrip("\r\n")
+ return self._session.send_text_command(b"$swver?").rstrip("\r\n")
def get_serial_number(self):
# type: () -> Text
"""Returns the serial number of the device."""
- return self._send_text_command(b"$serlnum?").rstrip("\r\n")
+ return self._session.send_text_command(b"$serlnum?").rstrip("\r\n")
def get_patient_name(self):
# type: () -> Optional[Text]
- patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n")
+ patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
@@ -285,7 +340,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
except UnicodeDecodeError:
raise ValueError("Only ASCII-safe names are tested working")
- result = self._send_text_command(b"$ptname," + encoded_name)
+ result = self._session.send_text_command(b"$ptname," + encoded_name)
def get_datetime(self):
# type: () -> datetime.datetime
@@ -294,8 +349,8 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
This is one of the few commands that appear common to many of the
FreeStyle devices that use the HID framing protocol.
"""
- date = self._send_text_command(b"$date?").rstrip("\r\n")
- time = self._send_text_command(b"$time?").rstrip("\r\n")
+ date = self._session.send_text_command(b"$date?").rstrip("\r\n")
+ time = self._session.send_text_command(b"$time?").rstrip("\r\n")
# Year is returned as an offset to 2000.
month, day, year = (int(x) for x in date.split(","))
@@ -318,41 +373,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
date_cmd = f"$date,{date.month},{date.day},{date.year - 2000}"
time_cmd = f"$time,{date.hour},{date.minute}"
- self._send_text_command(bytes(date_cmd, "ascii"))
- self._send_text_command(bytes(time_cmd, "ascii"))
+ self._session.send_text_command(bytes(date_cmd, "ascii"))
+ self._session.send_text_command(bytes(time_cmd, "ascii"))
return self.get_datetime()
-
- def _get_multirecord(self, command):
- # type: (bytes) -> Iterator[List[Text]]
- """Queries for, and returns, "multirecords" results.
-
- Multirecords are used for querying events, readings, history and similar
- other data out of a FreeStyle device. These are comma-separated values,
- variable-length.
-
- The validation includes the general HID framing parsing, as well as
- validation of the record count, and of the embedded records checksum.
-
- Args:
- command: (bytes) the text command to send to the device for the query.
-
- Returns:
- (csv.reader): a CSV reader object that returns a record for each line
- in the record file.
- """
- message = self._send_text_command(command)
- logging.debug("Received multirecord message:\n%s", message)
- if message == "Log Empty\r\n":
- return iter(())
-
- match = _MULTIRECORDS_FORMAT.search(message)
- if not match:
- raise exceptions.InvalidResponse(message)
-
- records_str = match.group("message")
- _verify_checksum(records_str, match.group("checksum"))
-
- logging.debug("Received multi-record string: %s", records_str)
-
- return csv.reader(records_str.split("\r\n"))
diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py
index 43461e7..41ad17c 100644
--- a/glucometerutils/support/hiddevice.py
+++ b/glucometerutils/support/hiddevice.py
@@ -6,44 +6,34 @@
import logging
import os
-from typing import BinaryIO, Optional, Text
+from typing import BinaryIO, Optional, Text, Tuple
from glucometerutils import exceptions
-class HidDevice:
- """A device speaking USB HID protocol driver base.
+class HidSession:
+ """An access class to speak to USB HID based devices.
- This class does not implement an actual driver by itself, but provides an
- easier access to the boilerplate code required for speaking USB HID.
-
- This helper wraps around an optional dependency on hidapi library: if
- present the driver will auto-detect the device, if not the device path needs
- to be provided and should point to a device implementing Linux's hidraw
- interface.
-
- The following constants can be set by the actual drivers:
-
- USB_VENDOR_ID: (int) USB vendor ID for the device.
- USB_PRODUCT_ID: (int) USB product ID for the device.
-
- If the VID/PID pair is not provided, the driver will require a device path
- to be used.
-
- Optional parameters available:
-
- TIMEOUT_MS: (int, default: 0) the read timeout in milliseconds, used
- for hidapi reads only. If < 1, hidapi will be provided no timeout.
+ This class does not implement a full driver, but rather provide simpler read/write
+ methods abstracting the HID library.
"""
- USB_VENDOR_ID = None # type: int
- USB_PRODUCT_ID = None # type: int
+ def __init__(self, usb_id, device, timeout_ms=0):
+ # type: (Optional[Tuple[int, int]], Optional[Text], int) -> None
+ """Construct a new session object.
+
+ Args:
+ usb_id: Optional pair of vendor_id and product_id for the session.
+ This is required to use the hidapi library.
+ device: Optional path to Linux hidraw-style device path. If not provided,
+ usb_id needs to be provided instead.
+ timeout_ms: Timeout in milliseconds for read operations. Only relevant when
+ using hidapi library.
+ """
- TIMEOUT_MS = 0 # type: int
+ self._timeout_ms = timeout_ms
- def __init__(self, device):
- # type: (Optional[Text]) -> None
- if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device:
+ if not usb_id and not device:
raise exceptions.CommandLineError(
"--device parameter is required, should point to a /dev/hidraw "
"device node representing the meter."
@@ -63,8 +53,10 @@ class HidDevice:
try:
import hid
+ assert usb_id
+ vendor_id, product_id = usb_id
self.hidapi_handle_ = hid.device()
- self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID)
+ self.hidapi_handle_.open(vendor_id, product_id)
except ImportError:
raise exceptions.ConnectionFailed(
message='Missing requied "hidapi" module.'
@@ -74,7 +66,7 @@ class HidDevice:
message=f"Unable to connect to meter: {e}."
)
- def _write(self, report):
+ def write(self, report):
# type: (bytes) -> None
"""Writes a report to the HID handle."""
@@ -86,7 +78,7 @@ class HidDevice:
if written < 0:
raise exceptions.CommandError()
- def _read(self, size=64):
+ def read(self, size=64):
# type: (int) -> bytes
"""Read a report from the HID handle.
@@ -96,4 +88,4 @@ class HidDevice:
if self.handle_:
return bytes(self.handle_.read(size))
- return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS))
+ return bytes(self.hidapi_handle_.read(size, timeout_ms=self._timeout_ms))
diff --git a/reversing_tools/abbott/freestyle_hid_console.py b/reversing_tools/abbott/freestyle_hid_console.py
index 654a97c..0ed4705 100755
--- a/reversing_tools/abbott/freestyle_hid_console.py
+++ b/reversing_tools/abbott/freestyle_hid_console.py
@@ -45,11 +45,11 @@ def main():
logging.basicConfig(level=args.vlog)
- device = freestyle.FreeStyleHidDevice(args.device)
- device.TEXT_CMD = args.text_cmd_type
- device.TEXT_REPLY_CMD = args.text_reply_type
+ session = freestyle.FreeStyleHidSession(
+ None, args.device, args.text_cmd_type, args.text_reply_type
+ )
- device.connect()
+ session.connect()
while True:
if sys.stdin.isatty():
@@ -59,7 +59,7 @@ def main():
print(f">>> {command}")
try:
- print(device._send_text_command(bytes(command, "ascii")))
+ print(session.send_text_command(bytes(command, "ascii")))
except exceptions.InvalidResponse as error:
print(f"! {error}")