summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support/freestyle.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/support/freestyle.py')
-rw-r--r--glucometerutils/support/freestyle.py163
1 files changed, 92 insertions, 71 deletions
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index 86c53d0..b1eb2d2 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -128,38 +128,31 @@ def convert_ketone_unit(raw_value):
return raw_value / 18.0
-class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC):
- """Base class implementing the FreeStyle HID common protocol.
-
- This class implements opening, initializing the connection and sending
- commands to the device, reading the response and confirming the checksums.
+ABBOTT_VENDOR_ID = 0x1A61
- Commands sent to the devices over this protocol have a "message type"
- prefixed to the command itself. Text command are usually sent with message
- type 0x60, and the replied received with the same. Some devices may diverge
- though.
- """
- TEXT_CMD = 0x60
- TEXT_REPLY_CMD = 0x60
+class FreeStyleHidSession:
+ def __init__(
+ self, product_id, device_path, text_message_type, text_reply_message_type
+ ):
+ # type: (int, Optional[Text], int, int) -> None
- USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care
- USB_PRODUCT_ID = None # type: int
+ self._hid_session = hiddevice.HidSession(
+ (ABBOTT_VENDOR_ID, product_id), device_path
+ )
+ self._text_message_type = text_message_type
+ self._text_reply_message_type = text_reply_message_type
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
- self._send_command(_INIT_COMMAND, b"")
- response = self._read_response()
+ self.send_command(_INIT_COMMAND, b"")
+ response = self.read_response()
if not _is_init_reply(response):
raise exceptions.ConnectionFailed(
f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}"
)
- def disconnect(self):
- """Disconnect the device, nothing to be done."""
- pass
-
- def _send_command(self, message_type, command, encrypted=False):
+ def send_command(self, message_type, command, encrypted=False):
# type: (int, bytes, bool) -> None
"""Send a raw command to the device.
@@ -178,12 +171,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
)
logging.debug("Sending packet: %r", usb_packet)
- self._write(usb_packet)
+ self._hid_session.write(usb_packet)
- def _read_response(self, encrypted=False):
+ def read_response(self, encrypted=False):
# type: (bool) -> Tuple[int, bytes]
"""Read the response from the device and extracts it."""
- usb_packet = self._read()
+ usb_packet = self._hid_session.read()
logging.debug("Read packet: %r", usb_packet)
@@ -205,7 +198,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# unencrypted, so we need to inspect them before we decide what the
# message content is.
if _is_keepalive_response(message):
- return self._read_response(encrypted=encrypted)
+ return self.read_response(encrypted=encrypted)
if _is_unknown_message_error(message):
raise exceptions.CommandError("Invalid command")
@@ -218,21 +211,21 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
return message
- def _send_text_command(self, command):
+ def send_text_command(self, command):
# type: (bytes) -> Text
"""Send a command to the device that expects a text reply."""
- self._send_command(self.TEXT_CMD, command)
+ self.send_command(self._text_message_type, command)
# Reply can stretch multiple buffers
full_content = b""
while True:
- message_type, content = self._read_response()
+ message_type, content = self.read_response()
logging.debug(
"Received message: type %02x content %s", message_type, content.hex()
)
- if message_type != self.TEXT_REPLY_CMD:
+ if message_type != self._text_reply_message_type:
raise exceptions.InvalidResponse(
f"Message type {message_type:02x}: content does not match expectations: {content!r}"
)
@@ -258,22 +251,84 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# unknown codepoint.
return message.decode("ascii", "replace")
+ def query_multirecord(self, command):
+ # type: (bytes) -> Iterator[List[Text]]
+ """Queries for, and returns, "multirecords" results.
+
+ Multirecords are used for querying events, readings, history and similar
+ other data out of a FreeStyle device. These are comma-separated values,
+ variable-length.
+
+ The validation includes the general HID framing parsing, as well as
+ validation of the record count, and of the embedded records checksum.
+
+ Args:
+ command: The text command to send to the device for the query.
+
+ Returns:
+ A CSV reader object that returns a record for each line in the
+ reply buffer.
+ """
+ message = self.send_text_command(command)
+ logging.debug("Received multirecord message:\n%s", message)
+ if message == "Log Empty\r\n":
+ return iter(())
+
+ match = _MULTIRECORDS_FORMAT.search(message)
+ if not match:
+ raise exceptions.InvalidResponse(message)
+
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
+
+ logging.debug("Received multi-record string: %s", records_str)
+
+ return csv.reader(records_str.split("\r\n"))
+
+
+class FreeStyleHidDevice(driver_base.GlucometerDriver):
+ """Base class implementing the FreeStyle HID common protocol.
+
+ This class implements opening, initializing the connection and sending
+ commands to the device, reading the response and confirming the checksums.
+
+ Commands sent to the devices over this protocol have a "message type"
+ prefixed to the command itself. Text command are usually sent with message
+ type 0x60, and the replied received with the same. Some devices may diverge
+ though.
+ """
+
+ def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60):
+ # type: (int, Optional[Text], int, int) -> None
+ super().__init__(device_path)
+ self._session = FreeStyleHidSession(
+ product_id, device_path, text_cmd, text_reply_cmd
+ )
+
+ def connect(self):
+ """Open connection to the device, starting the knocking sequence."""
+ self._session.connect()
+
+ def disconnect(self):
+ """Disconnect the device, nothing to be done."""
+ pass
+
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
# between them.
def _get_version(self):
# type: () -> Text
"""Return the software version of the device."""
- return self._send_text_command(b"$swver?").rstrip("\r\n")
+ return self._session.send_text_command(b"$swver?").rstrip("\r\n")
def get_serial_number(self):
# type: () -> Text
"""Returns the serial number of the device."""
- return self._send_text_command(b"$serlnum?").rstrip("\r\n")
+ return self._session.send_text_command(b"$serlnum?").rstrip("\r\n")
def get_patient_name(self):
# type: () -> Optional[Text]
- patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n")
+ patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
@@ -285,7 +340,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
except UnicodeDecodeError:
raise ValueError("Only ASCII-safe names are tested working")
- result = self._send_text_command(b"$ptname," + encoded_name)
+ result = self._session.send_text_command(b"$ptname," + encoded_name)
def get_datetime(self):
# type: () -> datetime.datetime
@@ -294,8 +349,8 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
This is one of the few commands that appear common to many of the
FreeStyle devices that use the HID framing protocol.
"""
- date = self._send_text_command(b"$date?").rstrip("\r\n")
- time = self._send_text_command(b"$time?").rstrip("\r\n")
+ date = self._session.send_text_command(b"$date?").rstrip("\r\n")
+ time = self._session.send_text_command(b"$time?").rstrip("\r\n")
# Year is returned as an offset to 2000.
month, day, year = (int(x) for x in date.split(","))
@@ -318,41 +373,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
date_cmd = f"$date,{date.month},{date.day},{date.year - 2000}"
time_cmd = f"$time,{date.hour},{date.minute}"
- self._send_text_command(bytes(date_cmd, "ascii"))
- self._send_text_command(bytes(time_cmd, "ascii"))
+ self._session.send_text_command(bytes(date_cmd, "ascii"))
+ self._session.send_text_command(bytes(time_cmd, "ascii"))
return self.get_datetime()
-
- def _get_multirecord(self, command):
- # type: (bytes) -> Iterator[List[Text]]
- """Queries for, and returns, "multirecords" results.
-
- Multirecords are used for querying events, readings, history and similar
- other data out of a FreeStyle device. These are comma-separated values,
- variable-length.
-
- The validation includes the general HID framing parsing, as well as
- validation of the record count, and of the embedded records checksum.
-
- Args:
- command: (bytes) the text command to send to the device for the query.
-
- Returns:
- (csv.reader): a CSV reader object that returns a record for each line
- in the record file.
- """
- message = self._send_text_command(command)
- logging.debug("Received multirecord message:\n%s", message)
- if message == "Log Empty\r\n":
- return iter(())
-
- match = _MULTIRECORDS_FORMAT.search(message)
- if not match:
- raise exceptions.InvalidResponse(message)
-
- records_str = match.group("message")
- _verify_checksum(records_str, match.group("checksum"))
-
- logging.debug("Received multi-record string: %s", records_str)
-
- return csv.reader(records_str.split("\r\n"))