summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support/freestyle.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/support/freestyle.py')
-rw-r--r--glucometerutils/support/freestyle.py147
1 files changed, 80 insertions, 67 deletions
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index d48ac04..341e978 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple
import construct
from glucometerutils import exceptions
-from glucometerutils.support import hiddevice, driver_base
+from glucometerutils.support import driver_base, hiddevice
_INIT_COMMAND = 0x01
_INIT_RESPONSE = 0x71
@@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14
_ENCRYPTION_SETUP_RESPONSE = 0x33
_ALWAYS_UNENCRYPTED_MESSAGES = (
- _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d,
- _ENCRYPTION_SETUP_COMMAND, 0x15,
- _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35,
+ _INIT_COMMAND,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x0C,
+ 0x0D,
+ _ENCRYPTION_SETUP_COMMAND,
+ 0x15,
+ _ENCRYPTION_SETUP_RESPONSE,
+ 0x34,
+ 0x35,
_INIT_RESPONSE,
_KEEPALIVE_RESPONSE,
)
+
def _create_matcher(message_type, content):
# type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool]
def _matcher(message):
- return (
- message[0] == message_type and
- (content is None or content == message[1]))
+ return message[0] == message_type and (content is None or content == message[1])
return _matcher
-_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01')
+
+_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01")
_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None)
-_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85')
-_is_encryption_missing_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x15')
-_is_encryption_setup_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x14')
+_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
+_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
+_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
_FREESTYLE_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.Prefixed(construct.Byte, construct.GreedyBytes)),
+ construct.Prefixed(construct.Byte, construct.GreedyBytes),
+ ),
)
_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.GreedyBytes),
+ construct.GreedyBytes,
+ ),
)
-_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)')
+_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
_TEXT_REPLY_FORMAT = re.compile(
- b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n'
- b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL)
+ b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n"
+ b"CMD (?P<status>OK|Fail!)\r\n$",
+ re.DOTALL,
+)
_MULTIRECORDS_FORMAT = re.compile(
- '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$',
- re.DOTALL)
+ "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL
+)
def _verify_checksum(message, expected_checksum_hex):
@@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
TEXT_CMD = 0x60
TEXT_REPLY_CMD = 0x60
- USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care
+ USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care
USB_PRODUCT_ID = None # type: int
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
- self._send_command(_INIT_COMMAND, b'')
+ self._send_command(_INIT_COMMAND, b"")
response = self._read_response()
if not _is_init_reply(response):
raise exceptions.ConnectionFailed(
- 'Connection error: unexpected message %02x:%s' % (
- response[0], response[1].hex()))
+ "Connection error: unexpected message %02x:%s"
+ % (response[0], response[1].hex())
+ )
def disconnect(self):
"""Disconnect the device, nothing to be done."""
@@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
meta_construct = _FREESTYLE_MESSAGE
usb_packet = meta_construct.build(
- {'message_type': message_type, 'command': command})
+ {"message_type": message_type, "command": command}
+ )
- logging.debug('Sending packet: %r', usb_packet)
+ logging.debug("Sending packet: %r", usb_packet)
self._write(usb_packet)
def _read_response(self, encrypted=False):
@@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
"""Read the response from the device and extracts it."""
usb_packet = self._read()
- logging.debug('Read packet: %r', usb_packet)
+ logging.debug("Read packet: %r", usb_packet)
assert usb_packet
message_type = usb_packet[0]
if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
message_length = usb_packet[1]
- message_content = usb_packet[2:2+message_length]
+ message_content = usb_packet[2 : 2 + message_length]
else:
message_content = usb_packet[1:]
@@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
return self._read_response(encrypted=encrypted)
if _is_unknown_message_error(message):
- raise exceptions.CommandError('Invalid command')
+ raise exceptions.CommandError("Invalid command")
if _is_encryption_missing_error(message):
- raise exceptions.CommandError(
- 'Device encryption not initialized.')
+ raise exceptions.CommandError("Device encryption not initialized.")
if _is_encryption_setup_error(message):
- raise exceptions.CommandError(
- 'Device encryption initialization failed.')
+ raise exceptions.CommandError("Device encryption initialization failed.")
return message
@@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
self._send_command(self.TEXT_CMD, command)
# Reply can stretch multiple buffers
- full_content = b''
+ full_content = b""
while True:
message_type, content = self._read_response()
logging.debug(
- 'Received message: type %02x content %s',
- message_type, content.hex())
+ "Received message: type %02x content %s", message_type, content.hex()
+ )
if message_type != self.TEXT_REPLY_CMD:
raise exceptions.InvalidResponse(
- 'Message type %02x does not match expectations: %r' %
- (message_type, content))
+ "Message type %02x does not match expectations: %r"
+ % (message_type, content)
+ )
full_content += content
@@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(full_content)
- message = match.group('message')
- _verify_checksum(message, match.group('checksum'))
+ message = match.group("message")
+ _verify_checksum(message, match.group("checksum"))
- if match.group('status') != b'OK':
+ if match.group("status") != b"OK":
raise exceptions.InvalidResponse(message or "Command failed")
# If there is anything in the response that is not ASCII-safe, this is
# probably in the patient name. The Windows utility does not seem to
# validate those, so just replace anything non-ASCII with the correct
# unknown codepoint.
- return message.decode('ascii', 'replace')
+ return message.decode("ascii", "replace")
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
@@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def _get_version(self):
# type: () -> Text
"""Return the software version of the device."""
- return self._send_text_command(b'$swver?').rstrip('\r\n')
+ return self._send_text_command(b"$swver?").rstrip("\r\n")
def get_serial_number(self):
# type: () -> Text
"""Returns the serial number of the device."""
- return self._send_text_command(b'$serlnum?').rstrip('\r\n')
+ return self._send_text_command(b"$serlnum?").rstrip("\r\n")
def get_patient_name(self):
# type: () -> Optional[Text]
- patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n')
+ patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
@@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def set_patient_name(self, name):
# type: (Text) -> None
try:
- encoded_name = name.encode('ascii')
+ encoded_name = name.encode("ascii")
except UnicodeDecodeError:
- raise ValueError('Only ASCII-safe names are tested working')
+ raise ValueError("Only ASCII-safe names are tested working")
- result = self._send_text_command(b'$ptname,' + encoded_name)
+ result = self._send_text_command(b"$ptname," + encoded_name)
def get_datetime(self):
# type: () -> datetime.datetime
@@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
This is one of the few commands that appear common to many of the
FreeStyle devices that use the HID framing protocol.
"""
- date = self._send_text_command(b'$date?').rstrip('\r\n')
- time = self._send_text_command(b'$time?').rstrip('\r\n')
+ date = self._send_text_command(b"$date?").rstrip("\r\n")
+ time = self._send_text_command(b"$time?").rstrip("\r\n")
# Year is returned as an offset to 2000.
- month, day, year = (int(x) for x in date.split(','))
- hour, minute = (int(x) for x in time.split(','))
+ month, day, year = (int(x) for x in date.split(","))
+ hour, minute = (int(x) for x in time.split(","))
# At least Precision Neo devices can have an invalid date (bad RTC?),
# and report 255 for each field, which is not valid for
@@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# The format used by the FreeStyle devices is not composable based on
# standard strftime() (namely it includes no leading zeros), so we need
# to build it manually.
- date_cmd = '$date,{month},{day},{year}'.format(
- month=date.month, day=date.day, year=(date.year-2000))
- time_cmd = '$time,{hour},{minute}'.format(
- hour=date.hour, minute=date.minute)
+ date_cmd = "$date,{month},{day},{year}".format(
+ month=date.month, day=date.day, year=(date.year - 2000)
+ )
+ time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute)
self._send_text_command(bytes(date_cmd, "ascii"))
self._send_text_command(bytes(time_cmd, "ascii"))
@@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
in the record file.
"""
message = self._send_text_command(command)
- logging.debug('Received multirecord message:\n%s', message)
+ logging.debug("Received multirecord message:\n%s", message)
if message == "Log Empty\r\n":
return iter(())
@@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(message)
- records_str = match.group('message')
- _verify_checksum(records_str, match.group('checksum'))
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
- logging.debug('Received multi-record string: %s', records_str)
+ logging.debug("Received multi-record string: %s", records_str)
- return csv.reader(records_str.split('\r\n'))
+ return csv.reader(records_str.split("\r\n"))