summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support
diff options
context:
space:
mode:
authorBen <b-schaefer@posteo.de>2020-02-21 10:45:40 +0100
committerDiego Elio Pettenò <flameeyes@flameeyes.com>2020-03-08 00:36:39 +0100
commite72b02d84e7f67cdf6107862ad580e951a5bbda1 (patch)
tree0887513d2478f55b27abccfeb307f313231bd994 /glucometerutils/support
parentpre-commit guide in README (diff)
downloadglucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.gz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.bz2
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.lz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.xz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.zst
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.zip
Diffstat (limited to 'glucometerutils/support')
-rw-r--r--glucometerutils/support/construct_extras.py2
-rw-r--r--glucometerutils/support/contourusb.py163
-rw-r--r--glucometerutils/support/driver_base.py1
-rw-r--r--glucometerutils/support/freestyle.py147
-rw-r--r--glucometerutils/support/hiddevice.py26
-rw-r--r--glucometerutils/support/lifescan.py23
-rw-r--r--glucometerutils/support/lifescan_binary_protocol.py45
-rw-r--r--glucometerutils/support/serial.py17
8 files changed, 228 insertions, 196 deletions
diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py
index 7abcd9e..b44ee84 100644
--- a/glucometerutils/support/construct_extras.py
+++ b/glucometerutils/support/construct_extras.py
@@ -7,6 +7,7 @@ import datetime
import construct
+
class Timestamp(construct.Adapter):
"""Adapter for converting datetime object into timestamps.
@@ -14,6 +15,7 @@ class Timestamp(construct.Adapter):
and an optional epoch offset to the UNIX Epoch.
"""
+
__slots__ = ["epoch"]
def __init__(self, subcon, epoch=0):
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
index b4db4eb..3b0dc80 100644
--- a/glucometerutils/support/contourusb.py
+++ b/glucometerutils/support/contourusb.py
@@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile(
"(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
"(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
"(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
- "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)")
+ "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)"
+)
_RESULT_RECORD_RE = re.compile(
"^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
"^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
"(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
- "(?P<datetime>[0-9]+)")
+ "(?P<datetime>[0-9]+)"
+)
_RECORD_FORMAT = re.compile(
- '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)'
- '\x0d(?P<end>[\x03\x17]))'
- '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a')
+ "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)"
+ "\x0d(?P<end>[\x03\x17]))"
+ "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a"
+)
+
class FrameError(Exception):
pass
+
class ContourHidDevice(hiddevice.HidDevice):
"""Base class implementing the ContourUSB HID common protocol.
"""
+
blocksize = 64
# Operation modes
@@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice):
while True:
data = self._read()
dstr = data
- result.append(dstr[4:data[3]+4])
- if data[3] != self.blocksize-4:
+ result.append(dstr[4 : data[3] + 4])
+ if data[3] != self.blocksize - 4:
break
- return (b"".join(result))
+ return b"".join(result)
def write(self, data):
- data = b'ABC' + chr(len(data)).encode() + data.encode()
+ data = b"ABC" + chr(len(data)).encode() + data.encode()
pad_length = self.blocksize - len(data)
- data += pad_length * b'\x00'
+ data += pad_length * b"\x00"
self._write(data)
- USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
def parse_header_record(self, text):
header = _HEADER_RECORD_RE.search(text)
- self.field_del = header.group('field_del')
- self.repeat_del = header.group('repeat_del')
- self.component_del = header.group('component_del')
- self.escape_del = header.group('escape_del')
-
- self.product_code = header.group('product_code')
- self.dig_ver = header.group('dig_ver')
- self.anlg_ver = header.group('anlg_ver')
- self.agp_ver = header.group('agp_ver')
-
- self.serial_num = header.group('serial_num')
- self.sku_id = header.group('sku_id')
- self.res_marking = header.group('res_marking')
- self.config_bits = header.group('config_bits')
- self.lang = header.group('lang')
- self.interv = header.group('interv')
- self.ref_method = header.group('ref_method')
- self.internal = header.group('internal')
+ self.field_del = header.group("field_del")
+ self.repeat_del = header.group("repeat_del")
+ self.component_del = header.group("component_del")
+ self.escape_del = header.group("escape_del")
+
+ self.product_code = header.group("product_code")
+ self.dig_ver = header.group("dig_ver")
+ self.anlg_ver = header.group("anlg_ver")
+ self.agp_ver = header.group("agp_ver")
+
+ self.serial_num = header.group("serial_num")
+ self.sku_id = header.group("sku_id")
+ self.res_marking = header.group("res_marking")
+ self.config_bits = header.group("config_bits")
+ self.lang = header.group("lang")
+ self.interv = header.group("interv")
+ self.ref_method = header.group("ref_method")
+ self.internal = header.group("internal")
# U limit
- self.unit = header.group('unit')
- self.lo_bound = header.group('lo_bound')
- self.hi_bound = header.group('hi_bound')
+ self.unit = header.group("unit")
+ self.lo_bound = header.group("lo_bound")
+ self.hi_bound = header.group("hi_bound")
# X field
- self.hypo_limit = header.group('hypo_limit')
- self.overall_low = header.group('overall_low')
- self.pre_food_low = header.group('pre_food_low')
- self.post_food_low = header.group('post_food_low')
- self.overall_high = header.group('overall_high')
- self.pre_food_high = header.group('pre_food_high')
- self.post_food_high = header.group('post_food_high')
- self.hyper_limit = header.group('hyper_limit')
+ self.hypo_limit = header.group("hypo_limit")
+ self.overall_low = header.group("overall_low")
+ self.pre_food_low = header.group("pre_food_low")
+ self.post_food_low = header.group("post_food_low")
+ self.overall_high = header.group("overall_high")
+ self.pre_food_high = header.group("pre_food_high")
+ self.post_food_high = header.group("post_food_high")
+ self.hyper_limit = header.group("hyper_limit")
# Y field
- self.upp_hyper = header.group('upp_hyper')
- self.low_hyper = header.group('low_hyper')
- self.upp_hypo = header.group('upp_hypo')
- self.low_hypo = header.group('low_hypo')
- self.upp_low_target = header.group('upp_low_target')
- self.low_low_target = header.group('low_low_target')
- self.upp_hi_target = header.group('upp_hi_target')
- self.low_hi_target = header.group('low_hi_target')
+ self.upp_hyper = header.group("upp_hyper")
+ self.low_hyper = header.group("low_hyper")
+ self.upp_hypo = header.group("upp_hypo")
+ self.low_hypo = header.group("low_hypo")
+ self.upp_low_target = header.group("upp_low_target")
+ self.low_low_target = header.group("low_low_target")
+ self.upp_hi_target = header.group("upp_hi_target")
+ self.low_hi_target = header.group("low_hi_target")
# Z field
- self.trends = header.group('trends')
+ self.trends = header.group("trends")
- self.total = header.group('total')
- self.spec_ver = header.group('spec_ver')
+ self.total = header.group("total")
+ self.spec_ver = header.group("spec_ver")
# Datetime string in YYYYMMDDHHMM format
- self.datetime = header.group('datetime')
-
+ self.datetime = header.group("datetime")
def checksum(self, text):
"""
Implemented by Anders Hammarquist for glucodump project
More info: https://bitbucket.org/iko/glucodump/src/default/
"""
- checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1]
- return ('00' + checksum)[-2:]
+ checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1]
+ return ("00" + checksum)[-2:]
def checkframe(self, frame):
"""
@@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice):
if not match:
raise FrameError("Couldn't parse frame", frame)
- recno = int(match.group('recno'))
+ recno = int(match.group("recno"))
if self.currecno is None:
self.currecno = recno
@@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice):
return None
if recno != self.currecno:
- raise FrameError("Bad recno, got %r expected %r" %
- (recno, self.currecno),
- frame)
-
- checksum = self.checksum(match.group('check'))
- if checksum != match.group('checksum'):
- raise FrameError("Checksum error: got %s expected %s" %
- (match.group('checksum'), checksum),
- frame)
+ raise FrameError(
+ "Bad recno, got %r expected %r" % (recno, self.currecno), frame
+ )
+
+ checksum = self.checksum(match.group("check"))
+ if checksum != match.group("checksum"):
+ raise FrameError(
+ "Checksum error: got %s expected %s"
+ % (match.group("checksum"), checksum),
+ frame,
+ )
self.currecno = (self.currecno + 1) % 8
- return match.group('text')
+ return match.group("text")
def connect(self):
"""Connecting the device, nothing to be done.
@@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice):
self.state = self.mode_establish
try:
while True:
- self.write('\x04')
+ self.write("\x04")
res = self.read()
if res[0] == 4 and res[-1] == 5:
# we are connected and just got a header
header_record = res.decode()
- stx = header_record.find('\x02')
+ stx = header_record.find("\x02")
if stx != -1:
- result = _RECORD_FORMAT.match(
- header_record[stx:]).group('text')
+ result = _RECORD_FORMAT.match(header_record[stx:]).group("text")
self.parse_header_record(result)
break
else:
@@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice):
int(datetime_str[6:8]), # day
int(datetime_str[8:10]), # hour
int(datetime_str[10:12]), # minute
- 0)
+ 0,
+ )
def sync(self):
"""
@@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
self.state = self.mode_establish
try:
- tometer = '\x04'
+ tometer = "\x04"
result = None
foo = 0
while True:
@@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice):
continue
if data_bytes[-1] == 5:
# got an <ENQ>, send <ACK>
- tometer = '\x06'
+ tometer = "\x06"
self.currecno = None
continue
if self.state == self.mode_data:
@@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice):
# got an <EOT>, done
self.state = self.mode_precommand
break
- stx = data.find('\x02')
+ stx = data.find("\x02")
if stx != -1:
# got <STX>, parse frame
try:
result = self.checkframe(data[stx:])
- tometer = '\x06'
+ tometer = "\x06"
self.state = self.mode_data
except FrameError as e:
- tometer = '\x15' # Couldn't parse, <NAK>
+ tometer = "\x15" # Couldn't parse, <NAK>
else:
# Got something we don't understand, <NAK> it
- tometer = '\x15'
+ tometer = "\x15"
except Exception as e:
raise e
@@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
records_arr = []
for rec in self.sync():
- if rec[0] == 'R':
+ if rec[0] == "R":
# parse using result record regular expression
rec_text = self.parse_result_record(rec)
# get dictionary to use in main driver module without import re
diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py
index b7b3d0f..1caa960 100644
--- a/glucometerutils/support/driver_base.py
+++ b/glucometerutils/support/driver_base.py
@@ -3,7 +3,6 @@ from datetime import datetime
class GlucometerDriver(ABC):
-
def connect(self):
pass
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index d48ac04..341e978 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple
import construct
from glucometerutils import exceptions
-from glucometerutils.support import hiddevice, driver_base
+from glucometerutils.support import driver_base, hiddevice
_INIT_COMMAND = 0x01
_INIT_RESPONSE = 0x71
@@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14
_ENCRYPTION_SETUP_RESPONSE = 0x33
_ALWAYS_UNENCRYPTED_MESSAGES = (
- _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d,
- _ENCRYPTION_SETUP_COMMAND, 0x15,
- _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35,
+ _INIT_COMMAND,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x0C,
+ 0x0D,
+ _ENCRYPTION_SETUP_COMMAND,
+ 0x15,
+ _ENCRYPTION_SETUP_RESPONSE,
+ 0x34,
+ 0x35,
_INIT_RESPONSE,
_KEEPALIVE_RESPONSE,
)
+
def _create_matcher(message_type, content):
# type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool]
def _matcher(message):
- return (
- message[0] == message_type and
- (content is None or content == message[1]))
+ return message[0] == message_type and (content is None or content == message[1])
return _matcher
-_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01')
+
+_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01")
_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None)
-_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85')
-_is_encryption_missing_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x15')
-_is_encryption_setup_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x14')
+_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
+_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
+_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
_FREESTYLE_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.Prefixed(construct.Byte, construct.GreedyBytes)),
+ construct.Prefixed(construct.Byte, construct.GreedyBytes),
+ ),
)
_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.GreedyBytes),
+ construct.GreedyBytes,
+ ),
)
-_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)')
+_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
_TEXT_REPLY_FORMAT = re.compile(
- b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n'
- b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL)
+ b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n"
+ b"CMD (?P<status>OK|Fail!)\r\n$",
+ re.DOTALL,
+)
_MULTIRECORDS_FORMAT = re.compile(
- '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$',
- re.DOTALL)
+ "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL
+)
def _verify_checksum(message, expected_checksum_hex):
@@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
TEXT_CMD = 0x60
TEXT_REPLY_CMD = 0x60
- USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care
+ USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care
USB_PRODUCT_ID = None # type: int
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
- self._send_command(_INIT_COMMAND, b'')
+ self._send_command(_INIT_COMMAND, b"")
response = self._read_response()
if not _is_init_reply(response):
raise exceptions.ConnectionFailed(
- 'Connection error: unexpected message %02x:%s' % (
- response[0], response[1].hex()))
+ "Connection error: unexpected message %02x:%s"
+ % (response[0], response[1].hex())
+ )
def disconnect(self):
"""Disconnect the device, nothing to be done."""
@@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
meta_construct = _FREESTYLE_MESSAGE
usb_packet = meta_construct.build(
- {'message_type': message_type, 'command': command})
+ {"message_type": message_type, "command": command}
+ )
- logging.debug('Sending packet: %r', usb_packet)
+ logging.debug("Sending packet: %r", usb_packet)
self._write(usb_packet)
def _read_response(self, encrypted=False):
@@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
"""Read the response from the device and extracts it."""
usb_packet = self._read()
- logging.debug('Read packet: %r', usb_packet)
+ logging.debug("Read packet: %r", usb_packet)
assert usb_packet
message_type = usb_packet[0]
if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
message_length = usb_packet[1]
- message_content = usb_packet[2:2+message_length]
+ message_content = usb_packet[2 : 2 + message_length]
else:
message_content = usb_packet[1:]
@@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
return self._read_response(encrypted=encrypted)
if _is_unknown_message_error(message):
- raise exceptions.CommandError('Invalid command')
+ raise exceptions.CommandError("Invalid command")
if _is_encryption_missing_error(message):
- raise exceptions.CommandError(
- 'Device encryption not initialized.')
+ raise exceptions.CommandError("Device encryption not initialized.")
if _is_encryption_setup_error(message):
- raise exceptions.CommandError(
- 'Device encryption initialization failed.')
+ raise exceptions.CommandError("Device encryption initialization failed.")
return message
@@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
self._send_command(self.TEXT_CMD, command)
# Reply can stretch multiple buffers
- full_content = b''
+ full_content = b""
while True:
message_type, content = self._read_response()
logging.debug(
- 'Received message: type %02x content %s',
- message_type, content.hex())
+ "Received message: type %02x content %s", message_type, content.hex()
+ )
if message_type != self.TEXT_REPLY_CMD:
raise exceptions.InvalidResponse(
- 'Message type %02x does not match expectations: %r' %
- (message_type, content))
+ "Message type %02x does not match expectations: %r"
+ % (message_type, content)
+ )
full_content += content
@@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(full_content)
- message = match.group('message')
- _verify_checksum(message, match.group('checksum'))
+ message = match.group("message")
+ _verify_checksum(message, match.group("checksum"))
- if match.group('status') != b'OK':
+ if match.group("status") != b"OK":
raise exceptions.InvalidResponse(message or "Command failed")
# If there is anything in the response that is not ASCII-safe, this is
# probably in the patient name. The Windows utility does not seem to
# validate those, so just replace anything non-ASCII with the correct
# unknown codepoint.
- return message.decode('ascii', 'replace')
+ return message.decode("ascii", "replace")
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
@@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def _get_version(self):
# type: () -> Text
"""Return the software version of the device."""
- return self._send_text_command(b'$swver?').rstrip('\r\n')
+ return self._send_text_command(b"$swver?").rstrip("\r\n")
def get_serial_number(self):
# type: () -> Text
"""Returns the serial number of the device."""
- return self._send_text_command(b'$serlnum?').rstrip('\r\n')
+ return self._send_text_command(b"$serlnum?").rstrip("\r\n")
def get_patient_name(self):
# type: () -> Optional[Text]
- patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n')
+ patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
@@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def set_patient_name(self, name):
# type: (Text) -> None
try:
- encoded_name = name.encode('ascii')
+ encoded_name = name.encode("ascii")
except UnicodeDecodeError:
- raise ValueError('Only ASCII-safe names are tested working')
+ raise ValueError("Only ASCII-safe names are tested working")
- result = self._send_text_command(b'$ptname,' + encoded_name)
+ result = self._send_text_command(b"$ptname," + encoded_name)
def get_datetime(self):
# type: () -> datetime.datetime
@@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
This is one of the few commands that appear common to many of the
FreeStyle devices that use the HID framing protocol.
"""
- date = self._send_text_command(b'$date?').rstrip('\r\n')
- time = self._send_text_command(b'$time?').rstrip('\r\n')
+ date = self._send_text_command(b"$date?").rstrip("\r\n")
+ time = self._send_text_command(b"$time?").rstrip("\r\n")
# Year is returned as an offset to 2000.
- month, day, year = (int(x) for x in date.split(','))
- hour, minute = (int(x) for x in time.split(','))
+ month, day, year = (int(x) for x in date.split(","))
+ hour, minute = (int(x) for x in time.split(","))
# At least Precision Neo devices can have an invalid date (bad RTC?),
# and report 255 for each field, which is not valid for
@@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# The format used by the FreeStyle devices is not composable based on
# standard strftime() (namely it includes no leading zeros), so we need
# to build it manually.
- date_cmd = '$date,{month},{day},{year}'.format(
- month=date.month, day=date.day, year=(date.year-2000))
- time_cmd = '$time,{hour},{minute}'.format(
- hour=date.hour, minute=date.minute)
+ date_cmd = "$date,{month},{day},{year}".format(
+ month=date.month, day=date.day, year=(date.year - 2000)
+ )
+ time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute)
self._send_text_command(bytes(date_cmd, "ascii"))
self._send_text_command(bytes(time_cmd, "ascii"))
@@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
in the record file.
"""
message = self._send_text_command(command)
- logging.debug('Received multirecord message:\n%s', message)
+ logging.debug("Received multirecord message:\n%s", message)
if message == "Log Empty\r\n":
return iter(())
@@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(message)
- records_str = match.group('message')
- _verify_checksum(records_str, match.group('checksum'))
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
- logging.debug('Received multi-record string: %s', records_str)
+ logging.debug("Received multi-record string: %s", records_str)
- return csv.reader(records_str.split('\r\n'))
+ return csv.reader(records_str.split("\r\n"))
diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py
index b44e5c7..ad8f3a6 100644
--- a/glucometerutils/support/hiddevice.py
+++ b/glucometerutils/support/hiddevice.py
@@ -45,33 +45,36 @@ class HidDevice:
# type: (Optional[Text]) -> None
if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device:
raise exceptions.CommandLineError(
- '--device parameter is required, should point to a /dev/hidraw '
- 'device node representing the meter.')
+ "--device parameter is required, should point to a /dev/hidraw "
+ "device node representing the meter."
+ )
# If the user passed a device path that does not exist, raise an
# error. This is to avoid writing to a file instead of to a device node.
if device and not os.path.exists(device):
raise exceptions.ConnectionFailed(
- message='Path %s does not exist.' % device)
+ message="Path %s does not exist." % device
+ )
# If the user passed a device, try opening it.
if device:
- self.handle_ = open(device, 'w+b') # type: Optional[BinaryIO]
+ self.handle_ = open(device, "w+b") # type: Optional[BinaryIO]
else:
self.handle_ = None
- logging.info(
- 'No --device parameter provided, using hidapi library.')
+ logging.info("No --device parameter provided, using hidapi library.")
try:
import hid
+
self.hidapi_handle_ = hid.device()
- self.hidapi_handle_.open(
- self.USB_VENDOR_ID, self.USB_PRODUCT_ID)
+ self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID)
except ImportError:
raise exceptions.ConnectionFailed(
- message='Missing requied "hidapi" module.')
+ message='Missing requied "hidapi" module.'
+ )
except OSError as e:
raise exceptions.ConnectionFailed(
- message='Unable to connect to meter: %s.' % e)
+ message="Unable to connect to meter: %s." % e
+ )
def _write(self, report):
# type: (bytes) -> None
@@ -95,5 +98,4 @@ class HidDevice:
if self.handle_:
return bytes(self.handle_.read(size))
- return bytes(self.hidapi_handle_.read(
- size, timeout_ms=self.TIMEOUT_MS))
+ return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS))
diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py
index 19155d4..9340e49 100644
--- a/glucometerutils/support/lifescan.py
+++ b/glucometerutils/support/lifescan.py
@@ -8,22 +8,25 @@ from glucometerutils import exceptions
class MissingChecksum(exceptions.InvalidResponse):
"""The response misses the expected 4-digits checksum."""
+
def __init__(self, response):
super(MissingChecksum, self).__init__(
- 'Response is missing checksum: %s' % response)
+ "Response is missing checksum: %s" % response
+ )
class InvalidSerialNumber(exceptions.Error):
"""The serial number is not as expected."""
+
def __init__(self, serial_number):
super(InvalidSerialNumber, self).__init__(
- 'Serial number %s is invalid.' % serial_number)
+ "Serial number %s is invalid." % serial_number
+ )
class MalformedCommand(exceptions.InvalidResponse):
def __init__(self, message):
- super(MalformedCommand, self).__init__(
- 'Malformed command: %s' % message)
+ super(MalformedCommand, self).__init__("Malformed command: %s" % message)
def crc_ccitt(data):
@@ -39,13 +42,13 @@ def crc_ccitt(data):
This function uses the non-default 0xFFFF seed as used by multiple
LifeScan meters.
"""
- crc = 0xffff
+ crc = 0xFFFF
for byte in data:
- crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff
+ crc = (crc >> 8) & 0xFFFF | (crc << 8) & 0xFFFF
crc ^= byte
- crc ^= (crc & 0xff) >> 4
- crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff
- crc ^= (crc & 0xff) << 5
+ crc ^= (crc & 0xFF) >> 4
+ crc ^= (((crc << 8) & 0xFFFF) << 4) & 0xFFFF
+ crc ^= (crc & 0xFF) << 5
- return crc & 0xffff
+ return crc & 0xFFFF
diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py
index 288da83..441226e 100644
--- a/glucometerutils/support/lifescan_binary_protocol.py
+++ b/glucometerutils/support/lifescan_binary_protocol.py
@@ -12,48 +12,51 @@ This module implements an interface to send and receive these messages.
import construct
from glucometerutils import common
-from glucometerutils.support import construct_extras
-from glucometerutils.support import lifescan
+from glucometerutils.support import construct_extras, lifescan
_LINK_CONTROL = construct.BitStruct(
construct.Padding(3),
- 'more' / construct.Default(construct.Flag, False),
- 'disconnect' / construct.Default(construct.Flag, False),
- 'acknowledge' / construct.Default(construct.Flag, False),
- 'expect_receive' / construct.Default(construct.Flag, False),
- 'sequence_number' / construct.Default(construct.Flag, False),
+ "more" / construct.Default(construct.Flag, False),
+ "disconnect" / construct.Default(construct.Flag, False),
+ "acknowledge" / construct.Default(construct.Flag, False),
+ "expect_receive" / construct.Default(construct.Flag, False),
+ "sequence_number" / construct.Default(construct.Flag, False),
)
+
def LifeScanPacket(include_link_control): # pylint: disable=invalid-name
# type: (bool) -> construct.Struct
if include_link_control:
link_control_construct = _LINK_CONTROL
else:
- link_control_construct = construct.Const(b'\x00')
+ link_control_construct = construct.Const(b"\x00")
return construct.Struct(
- 'data' / construct.RawCopy(
+ "data"
+ / construct.RawCopy(
construct.Struct(
- construct.Const(b'\x02'), # stx
- 'length' / construct.Rebuild(
- construct.Byte, lambda this: len(this.message) + 6),
- 'link_control' / link_control_construct,
- 'message' / construct.Bytes(
- lambda this: this.length - 6),
- construct.Const(b'\x03'), # etx
+ construct.Const(b"\x02"), # stx
+ "length"
+ / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 6),
+ "link_control" / link_control_construct,
+ "message" / construct.Bytes(lambda this: this.length - 6),
+ construct.Const(b"\x03"), # etx
),
),
- 'checksum' / construct.Checksum(
- construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data),
+ "checksum"
+ / construct.Checksum(
+ construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data
+ ),
)
+
VERIO_TIMESTAMP = construct_extras.Timestamp(
- construct.Int32ul, epoch=946684800) # 2000-01-01 (not 2010)
+ construct.Int32ul, epoch=946684800
+) # 2000-01-01 (not 2010)
_GLUCOSE_UNIT_MAPPING_TABLE = {
common.Unit.MG_DL: 0x00,
common.Unit.MMOL_L: 0x01,
}
-GLUCOSE_UNIT = construct.Mapping(
- construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE)
+GLUCOSE_UNIT = construct.Mapping(construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE)
diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py
index 6067cf7..d9e80ea 100644
--- a/glucometerutils/support/serial.py
+++ b/glucometerutils/support/serial.py
@@ -8,7 +8,6 @@ import logging
from typing import Optional, Text
import serial
-
from glucometerutils import exceptions
@@ -48,19 +47,23 @@ class SerialDevice:
assert self.BAUDRATE is not None
if not device and self.DEFAULT_CABLE_ID:
- logging.info(
- 'No --device parameter provided, looking for default cable.')
- device = 'hwgrep://' + self.DEFAULT_CABLE_ID
+ logging.info("No --device parameter provided, looking for default cable.")
+ device = "hwgrep://" + self.DEFAULT_CABLE_ID
if not device:
raise exceptions.CommandLineError(
- 'No --device parameter provided, and no default cable known.')
+ "No --device parameter provided, and no default cable known."
+ )
self.serial_ = serial.serial_for_url(
device,
baudrate=self.BAUDRATE,
timeout=self.TIMEOUT,
writeTimeout=None,
- bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
+ bytesize=serial.EIGHTBITS,
+ parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
- xonxoff=False, rtscts=False, dsrdtr=False)
+ xonxoff=False,
+ rtscts=False,
+ dsrdtr=False,
+ )