summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md (renamed from README)0
-rw-r--r--glucometerutils/common.py87
-rw-r--r--glucometerutils/drivers/accuchek_reports.py73
-rw-r--r--glucometerutils/drivers/contourusb.py39
-rw-r--r--glucometerutils/drivers/fsinsulinx.py55
-rw-r--r--glucometerutils/drivers/fslibre.py184
-rw-r--r--glucometerutils/drivers/fsoptium.py134
-rw-r--r--glucometerutils/drivers/fsprecisionneo.py61
-rw-r--r--glucometerutils/drivers/otultra2.py112
-rw-r--r--glucometerutils/drivers/otultraeasy.py149
-rw-r--r--glucometerutils/drivers/otverio2015.py123
-rw-r--r--glucometerutils/drivers/otverioiq.py104
-rw-r--r--glucometerutils/drivers/sdcodefree.py108
-rw-r--r--glucometerutils/drivers/td4277.py129
-rw-r--r--glucometerutils/exceptions.py17
-rwxr-xr-xglucometerutils/glucometer.py158
-rw-r--r--glucometerutils/support/construct_extras.py2
-rw-r--r--glucometerutils/support/contourusb.py163
-rw-r--r--glucometerutils/support/driver_base.py1
-rw-r--r--glucometerutils/support/freestyle.py147
-rw-r--r--glucometerutils/support/hiddevice.py26
-rw-r--r--glucometerutils/support/lifescan.py23
-rw-r--r--glucometerutils/support/lifescan_binary_protocol.py45
-rw-r--r--glucometerutils/support/serial.py17
-rwxr-xr-xreversing_tools/abbott/extract_freestyle.py109
-rwxr-xr-xreversing_tools/abbott/freestyle_hid_console.py42
-rw-r--r--test/test_common.py131
-rw-r--r--test/test_construct_extras.py50
-rw-r--r--test/test_contourusb.py65
-rw-r--r--test/test_freestyle.py9
-rw-r--r--test/test_fsoptium.py25
-rwxr-xr-xtest/test_lifescan.py11
-rw-r--r--test/test_otultra2.py30
-rw-r--r--test/test_otultraeasy.py13
-rw-r--r--test/test_td4277.py17
35 files changed, 1297 insertions, 1162 deletions
diff --git a/README b/README.md
index 0f97db2..0f97db2 100644
--- a/README
+++ b/README.md
diff --git a/glucometerutils/common.py b/glucometerutils/common.py
index e7be7ec..6576ebd 100644
--- a/glucometerutils/common.py
+++ b/glucometerutils/common.py
@@ -10,21 +10,24 @@ from typing import Optional, Sequence
import attr
+
class Unit(enum.Enum):
- MG_DL = 'mg/dL'
- MMOL_L = 'mmol/L'
+ MG_DL = "mg/dL"
+ MMOL_L = "mmol/L"
+
# Constants for meal information
class Meal(enum.Enum):
- NONE = ''
- BEFORE = 'Before Meal'
- AFTER = 'After Meal'
+ NONE = ""
+ BEFORE = "Before Meal"
+ AFTER = "After Meal"
+
# Constants for measure method
class MeasurementMethod(enum.Enum):
- BLOOD_SAMPLE = 'blood sample'
- CGM = 'CGM' # Continuous Glucose Monitoring
- TIME = 'time'
+ BLOOD_SAMPLE = "blood sample"
+ CGM = "CGM" # Continuous Glucose Monitoring
+ TIME = "time"
def convert_glucose_unit(value, from_unit, to_unit):
@@ -50,19 +53,19 @@ def convert_glucose_unit(value, from_unit, to_unit):
return round(value * 18.0, 0)
+
@attr.s
class GlucoseReading:
timestamp = attr.ib(type=datetime.datetime)
value = attr.ib(type=float)
- meal = attr.ib(
- default=Meal.NONE, validator=attr.validators.in_(Meal),
- type=Meal)
- comment = attr.ib(default='', type=str)
+ meal = attr.ib(default=Meal.NONE, validator=attr.validators.in_(Meal), type=Meal)
+ comment = attr.ib(default="", type=str)
measure_method = attr.ib(
default=MeasurementMethod.BLOOD_SAMPLE,
validator=attr.validators.in_(MeasurementMethod),
- type=MeasurementMethod)
+ type=MeasurementMethod,
+ )
extra_data = attr.ib(factory=dict)
def get_value_as(self, to_unit):
@@ -78,15 +81,20 @@ class GlucoseReading:
# type: (Unit) -> str
"""Returns the reading as a formatted comma-separated value string."""
return '"%s","%.2f","%s","%s","%s"' % (
- self.timestamp, self.get_value_as(unit), self.meal.value,
- self.measure_method.value, self.comment)
+ self.timestamp,
+ self.get_value_as(unit),
+ self.meal.value,
+ self.measure_method.value,
+ self.comment,
+ )
+
@attr.s
class KetoneReading:
timestamp = attr.ib(type=datetime.datetime)
value = attr.ib(type=float)
- comment = attr.ib(default='', type=str)
+ comment = attr.ib(default="", type=str)
extra_data = attr.ib(factory=dict)
def as_csv(self, unit):
@@ -94,23 +102,28 @@ class KetoneReading:
del unit # Unused for Ketone readings.
return '"%s","%.2f","%s","%s"' % (
- self.timestamp, self.value, MeasurementMethod.BLOOD_SAMPLE.value,
- self.comment)
+ self.timestamp,
+ self.value,
+ MeasurementMethod.BLOOD_SAMPLE.value,
+ self.comment,
+ )
+
@attr.s
class TimeAdjustment:
timestamp = attr.ib() # type: datetime.datetime
old_timestamp = attr.ib() # type: datetime.datetime
measure_method = attr.ib(
- default=MeasurementMethod.TIME,
- validator=attr.validators.in_(
- MeasurementMethod)) # type: MeasurementMethod
+ default=MeasurementMethod.TIME, validator=attr.validators.in_(MeasurementMethod)
+ ) # type: MeasurementMethod
extra_data = attr.ib(factory=dict)
def as_csv(self, unit):
del unit
return '"%s","","%s","%s"' % (
- self.timestamp, self.measure_method.value, self.old_timestamp
+ self.timestamp,
+ self.measure_method.value,
+ self.old_timestamp,
)
@@ -126,32 +139,38 @@ class MeterInfo:
the device. It can include hardware and software version.
native_unit: One of the Unit values to identify the meter native unit.
"""
+
model = attr.ib(type=str)
- serial_number = attr.ib(default='N/A', type=str)
+ serial_number = attr.ib(default="N/A", type=str)
version_info = attr.ib(default=(), type=Sequence[str])
native_unit = attr.ib(
- default=Unit.MG_DL, validator=attr.validators.in_(Unit),
- type=Unit)
+ default=Unit.MG_DL, validator=attr.validators.in_(Unit), type=Unit
+ )
patient_name = attr.ib(default=None, type=Optional[str])
def __str__(self):
- version_information_string = 'N/A'
+ version_information_string = "N/A"
if self.version_info:
- version_information_string = '\n '.join(
- self.version_info).strip()
+ version_information_string = "\n ".join(self.version_info).strip()
- base_output = textwrap.dedent("""\
+ base_output = textwrap.dedent(
+ """\
{model}
Serial Number: {serial_number}
Version Information:
{version_information_string}
Native Unit: {native_unit}
- """).format(model=self.model, serial_number=self.serial_number,
- version_information_string=version_information_string,
- native_unit=self.native_unit.value)
+ """
+ ).format(
+ model=self.model,
+ serial_number=self.serial_number,
+ version_information_string=version_information_string,
+ native_unit=self.native_unit.value,
+ )
if self.patient_name != None:
- base_output += 'Patient Name: {patient_name}\n'.format(
- patient_name=self.patient_name)
+ base_output += "Patient Name: {patient_name}\n".format(
+ patient_name=self.patient_name
+ )
return base_output
diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py
index 06b69bc..c4d7527 100644
--- a/glucometerutils/drivers/accuchek_reports.py
+++ b/glucometerutils/drivers/accuchek_reports.py
@@ -19,45 +19,46 @@ import datetime
import glob
import os
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base
_UNIT_MAP = {
- 'mmol/l': common.Unit.MMOL_L,
- 'mg/dl': common.Unit.MG_DL,
+ "mmol/l": common.Unit.MMOL_L,
+ "mg/dl": common.Unit.MG_DL,
}
-_DATE_CSV_KEY = 'Date'
-_TIME_CSV_KEY = 'Time'
-_RESULT_CSV_KEY = 'Result'
-_UNIT_CSV_KEY = 'Unit'
-_TEMPWARNING_CSV_KEY = 'Temperature warning' # ignored
-_OUTRANGE_CSV_KEY = 'Out of target range' # ignored
-_OTHER_CSV_KEY = 'Other' # ignored
-_BEFORE_MEAL_CSV_KEY = 'Before meal'
-_AFTER_MEAL_CSV_KEY = 'After meal'
+_DATE_CSV_KEY = "Date"
+_TIME_CSV_KEY = "Time"
+_RESULT_CSV_KEY = "Result"
+_UNIT_CSV_KEY = "Unit"
+_TEMPWARNING_CSV_KEY = "Temperature warning" # ignored
+_OUTRANGE_CSV_KEY = "Out of target range" # ignored
+_OTHER_CSV_KEY = "Other" # ignored
+_BEFORE_MEAL_CSV_KEY = "Before meal"
+_AFTER_MEAL_CSV_KEY = "After meal"
# Control test has extra whitespace which is not ignored.
-_CONTROL_CSV_KEY = 'Control test' + ' '*197
+_CONTROL_CSV_KEY = "Control test" + " " * 197
-_DATE_FORMAT = '%d.%m.%Y'
-_TIME_FORMAT = '%H:%M'
+_DATE_FORMAT = "%d.%m.%Y"
+_TIME_FORMAT = "%H:%M"
-_DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT))
+_DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT))
class Device(driver_base.GlucometerDriver):
def __init__(self, device):
if not device or not os.path.isdir(device):
raise exceptions.CommandLineError(
- '--device parameter is required, should point to mount path '
- 'for the meter.')
+ "--device parameter is required, should point to mount path "
+ "for the meter."
+ )
- reports_path = os.path.join(device, '*', 'Reports', '*.csv')
+ reports_path = os.path.join(device, "*", "Reports", "*.csv")
report_files = glob.glob(reports_path)
if not report_files:
raise exceptions.ConnectionFailed(
- 'No report file found in path "%s".' % reports_path)
+ 'No report file found in path "%s".' % reports_path
+ )
self.report_file = report_files[0]
@@ -68,35 +69,32 @@ class Device(driver_base.GlucometerDriver):
next(self.report)
return csv.DictReader(
- self.report,
- delimiter=';',
- skipinitialspace=True,
- quoting=csv.QUOTE_NONE)
+ self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE
+ )
def connect(self):
- self.report = open(
- self.report_file, 'r', newline='\r\n', encoding='utf-8')
+ self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8")
def disconnect(self):
self.report.close()
def get_meter_info(self):
return common.MeterInfo(
- '%s glucometer' % self.get_model(),
+ "%s glucometer" % self.get_model(),
serial_number=self.get_serial_number(),
- native_unit=self.get_glucose_unit())
+ native_unit=self.get_glucose_unit(),
+ )
def get_model(self):
# $device/MODEL/Reports/*.csv
- return os.path.basename(
- os.path.dirname(os.path.dirname(self.report_file)))
+ return os.path.basename(os.path.dirname(os.path.dirname(self.report_file)))
def get_serial_number(self):
self.report.seek(0)
# ignore the first line.
next(self.report)
# The second line of the CSV is serial-no;report-date;report-time;;;;;;;
- return next(self.report).split(';')[0]
+ return next(self.report).split(";")[0]
def get_glucose_unit(self):
# Get the first record available and parse that.
@@ -115,13 +113,12 @@ class Device(driver_base.GlucometerDriver):
def _extract_datetime(self, record): # pylint: disable=no-self-use
# Date and time are in separate column, but we want to parse them
# together.
- date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY]))
+ date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY]))
return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT)
def _extract_meal(self, record): # pylint: disable=no-self-use
if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]:
- raise exceptions.InvalidResponse(
- 'Reading cannot be before and after meal.')
+ raise exceptions.InvalidResponse("Reading cannot be before and after meal.")
elif record[_AFTER_MEAL_CSV_KEY]:
return common.Meal.AFTER
elif record[_BEFORE_MEAL_CSV_KEY]:
@@ -139,5 +136,7 @@ class Device(driver_base.GlucometerDriver):
common.convert_glucose_unit(
float(record[_RESULT_CSV_KEY]),
_UNIT_MAP[record[_UNIT_CSV_KEY]],
- common.Unit.MG_DL),
- meal=self._extract_meal(record))
+ common.Unit.MG_DL,
+ ),
+ meal=self._extract_meal(record),
+ )
diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py
index 397eb4f..5c9ed11 100644
--- a/glucometerutils/drivers/contourusb.py
+++ b/glucometerutils/drivers/contourusb.py
@@ -24,43 +24,44 @@ from glucometerutils import common
from glucometerutils.support import contourusb, driver_base
-def _extract_timestamp(parsed_record, prefix=''):
+def _extract_timestamp(parsed_record, prefix=""):
"""Extract the timestamp from a parsed record.
This leverages the fact that all the reading records have the same base structure.
"""
- datetime_str = parsed_record['datetime']
+ datetime_str = parsed_record["datetime"]
return datetime.datetime(
- int(datetime_str[0:4]), #year
- int(datetime_str[4:6]), #month
- int(datetime_str[6:8]), #day
- int(datetime_str[8:10]), #hour
- int(datetime_str[10:12]), #minute
- 0)
+ int(datetime_str[0:4]), # year
+ int(datetime_str[4:6]), # month
+ int(datetime_str[6:8]), # day
+ int(datetime_str[8:10]), # hour
+ int(datetime_str[10:12]), # minute
+ 0,
+ )
class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver):
"""Glucometer driver for FreeStyle Libre devices."""
- USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
def get_meter_info(self):
self._get_info_record()
return common.MeterInfo(
- 'Contour USB',
+ "Contour USB",
serial_number=self._get_serial_number(),
- version_info=(
- 'Meter versions: ' + self._get_version(),),
- native_unit= self.get_glucose_unit())
+ version_info=("Meter versions: " + self._get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
- if self._get_glucose_unit() == '0':
+ if self._get_glucose_unit() == "0":
return common.Unit.MG_DL
else:
return common.Unit.MMOL_L
-
+
def get_readings(self):
"""
Get reading dump from download data mode(all readings stored)
@@ -69,10 +70,10 @@ class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver):
for parsed_record in self._get_multirecord():
yield common.GlucoseReading(
_extract_timestamp(parsed_record),
- int(parsed_record['value']),
- comment=parsed_record['markers'],
- measure_method=common.MeasurementMethod.BLOOD_SAMPLE
- )
+ int(parsed_record["value"]),
+ comment=parsed_record["markers"],
+ measure_method=common.MeasurementMethod.BLOOD_SAMPLE,
+ )
def get_serial_number(self):
raise NotImplementedError
diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py
index f3cf043..5465b3a 100644
--- a/glucometerutils/drivers/fsinsulinx.py
+++ b/glucometerutils/drivers/fsinsulinx.py
@@ -22,20 +22,30 @@ import datetime
from glucometerutils import common
from glucometerutils.support import freestyle
-
# The type is a string because it precedes the parsing of the object.
-_TYPE_GLUCOSE_READING = '0'
-
-_InsulinxReading = collections.namedtuple('_InsulinxReading', (
- 'type', # 0 = blood glucose
- 'id',
- 'month', 'day', 'year', # year is two-digits
- 'hour', 'minute',
- 'unknown1', 'unknown2', 'unknown3',
- 'unknown4', 'unknown5', 'unknown6',
- 'value',
- 'unknown7', 'unknown8',
-))
+_TYPE_GLUCOSE_READING = "0"
+
+_InsulinxReading = collections.namedtuple(
+ "_InsulinxReading",
+ (
+ "type", # 0 = blood glucose
+ "id",
+ "month",
+ "day",
+ "year", # year is two-digits
+ "hour",
+ "minute",
+ "unknown1",
+ "unknown2",
+ "unknown3",
+ "unknown4",
+ "unknown5",
+ "unknown6",
+ "value",
+ "unknown7",
+ "unknown8",
+ ),
+)
class Device(freestyle.FreeStyleHidDevice):
@@ -46,11 +56,11 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle InsuLinx',
+ "FreeStyle InsuLinx",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self._get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -58,7 +68,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b'$result?'):
+ for record in self._get_multirecord(b"$result?"):
if not record or record[0] != _TYPE_GLUCOSE_READING:
continue
@@ -67,11 +77,14 @@ class Device(freestyle.FreeStyleHidDevice):
raw_reading = _InsulinxReading._make([int(v) for v in record])
timestamp = datetime.datetime(
- raw_reading.year + 2000, raw_reading.month, raw_reading.day,
- raw_reading.hour, raw_reading.minute)
+ raw_reading.year + 2000,
+ raw_reading.month,
+ raw_reading.day,
+ raw_reading.hour,
+ raw_reading.minute,
+ )
yield common.GlucoseReading(timestamp, raw_reading.value)
def zero_log(self):
raise NotImplementedError
-
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py
index f1ac525..29e821a 100644
--- a/glucometerutils/drivers/fslibre.py
+++ b/glucometerutils/drivers/fslibre.py
@@ -27,52 +27,47 @@ from glucometerutils.support import freestyle
# Fields of the records returned by both $history and $arresult?
# Tuple of pairs of idx and field name
_BASE_ENTRY_MAP = (
- (0, 'device_id'),
- (1, 'type'),
- (2, 'month'),
- (3, 'day'),
- (4, 'year'), # 2-digits
- (5, 'hour'),
- (6, 'minute'),
- (7, 'second'),
+ (0, "device_id"),
+ (1, "type"),
+ (2, "month"),
+ (3, "day"),
+ (4, "year"), # 2-digits
+ (5, "hour"),
+ (6, "minute"),
+ (7, "second"),
)
# Fields of the records returned by $history?
-_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + (
- (13, 'value'),
- (15, 'errors'),
-)
+_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ((13, "value"), (15, "errors"),)
# Fields of the results returned by $arresult? where type = 2
_ARRESULT_TYPE2_ENTRY_MAP = (
- (9, 'reading-type'), # 0 = glucose blood strip,
- # 1 = ketone blood strip,
- # 2 = glucose sensor
- (12, 'value'),
- (15, 'sport-flag'),
- (16, 'medication-flag'),
- (17, 'rapid-acting-flag'), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
- (18, 'long-acting-flag'),
- (19, 'custom-comments-bitfield'),
- (23, 'double-long-acting-insulin'),
- (25, 'food-flag'),
- (26, 'food-carbs-grams'),
- (28, 'errors'),
+ (9, "reading-type"), # 0 = glucose blood strip,
+ # 1 = ketone blood strip,
+ # 2 = glucose sensor
+ (12, "value"),
+ (15, "sport-flag"),
+ (16, "medication-flag"),
+ (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
+ (18, "long-acting-flag"),
+ (19, "custom-comments-bitfield"),
+ (23, "double-long-acting-insulin"),
+ (25, "food-flag"),
+ (26, "food-carbs-grams"),
+ (28, "errors"),
)
_ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = (
- (9, 'old_month'),
- (10, 'old_day'),
- (11, 'old_year'),
- (12, 'old_hour'),
- (13, 'old_minute'),
- (14, 'old_second'),
+ (9, "old_month"),
+ (10, "old_day"),
+ (11, "old_year"),
+ (12, "old_hour"),
+ (13, "old_minute"),
+ (14, "old_second"),
)
# Fields only valid when rapid-acting-flag is "1"
-_ARRESULT_RAPID_INSULIN_ENTRY_MAP = (
- (43, 'double-rapid-acting-insulin'),
-)
+_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),)
def _parse_record(record, entry_map):
@@ -82,26 +77,25 @@ def _parse_record(record, entry_map):
return {}
try:
- return {
- key: int(record[idx]) for idx, key in entry_map
- }
+ return {key: int(record[idx]) for idx, key in entry_map}
except IndexError:
return {}
-def _extract_timestamp(parsed_record, prefix=''):
+def _extract_timestamp(parsed_record, prefix=""):
"""Extract the timestamp from a parsed record.
This leverages the fact that all the records have the same base structure.
"""
return datetime.datetime(
- parsed_record[prefix + 'year'] + 2000,
- parsed_record[prefix + 'month'],
- parsed_record[prefix + 'day'],
- parsed_record[prefix + 'hour'],
- parsed_record[prefix + 'minute'],
- parsed_record[prefix + 'second'])
+ parsed_record[prefix + "year"] + 2000,
+ parsed_record[prefix + "month"],
+ parsed_record[prefix + "day"],
+ parsed_record[prefix + "hour"],
+ parsed_record[prefix + "minute"],
+ parsed_record[prefix + "second"],
+ )
def _parse_arresult(record):
@@ -112,24 +106,23 @@ def _parse_arresult(record):
# There are other record types, but we don't currently need to expose these.
if not parsed_record:
return None
- elif parsed_record['type'] == 2:
+ elif parsed_record["type"] == 2:
parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP))
- elif parsed_record['type'] == 5:
+ elif parsed_record["type"] == 5:
parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP))
return common.TimeAdjustment(
_extract_timestamp(parsed_record),
- _extract_timestamp(parsed_record, 'old_'),
- extra_data={'device_id': parsed_record['device_id']},
+ _extract_timestamp(parsed_record, "old_"),
+ extra_data={"device_id": parsed_record["device_id"]},
)
else:
return None
# Check right away if we have rapid insulin
- if parsed_record['rapid-acting-flag']:
- parsed_record.update(
- _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))
+ if parsed_record["rapid-acting-flag"]:
+ parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))
- if parsed_record['errors']:
+ if parsed_record["errors"]:
return None
comment_parts = []
@@ -137,68 +130,69 @@ def _parse_arresult(record):
cls = None
value = None
- if parsed_record['reading-type'] == 2:
- comment_parts.append('(Scan)')
+ if parsed_record["reading-type"] == 2:
+ comment_parts.append("(Scan)")
measure_method = common.MeasurementMethod.CGM
cls = common.GlucoseReading
- value = parsed_record['value']
- elif parsed_record['reading-type'] == 0:
- comment_parts.append('(Blood)')
+ value = parsed_record["value"]
+ elif parsed_record["reading-type"] == 0:
+ comment_parts.append("(Blood)")
measure_method = common.MeasurementMethod.BLOOD_SAMPLE
cls = common.GlucoseReading
- value = parsed_record['value']
- elif parsed_record['reading-type'] == 1:
- comment_parts.append('(Ketone)')
+ value = parsed_record["value"]
+ elif parsed_record["reading-type"] == 1:
+ comment_parts.append("(Ketone)")
measure_method = common.MeasurementMethod.BLOOD_SAMPLE
cls = common.KetoneReading
# automatically convert the raw value in mmol/L
- value = freestyle.convert_ketone_unit(parsed_record['value'])
+ value = freestyle.convert_ketone_unit(parsed_record["value"])
else:
# unknown reading
return None
custom_comments = record[29:35]
for comment_index in range(6):
- if parsed_record['custom-comments-bitfield'] & (1 << comment_index):
+ if parsed_record["custom-comments-bitfield"] & (1 << comment_index):
comment_parts.append(custom_comments[comment_index][1:-1])
- if parsed_record['sport-flag']:
- comment_parts.append('Sport')
+ if parsed_record["sport-flag"]:
+ comment_parts.append("Sport")
- if parsed_record['medication-flag']:
- comment_parts.append('Medication')
+ if parsed_record["medication-flag"]:
+ comment_parts.append("Medication")
- if parsed_record['food-flag']:
- if parsed_record['food-carbs-grams']:
- comment_parts.append(
- 'Food (%d g)' % parsed_record['food-carbs-grams'])
+ if parsed_record["food-flag"]:
+ if parsed_record["food-carbs-grams"]:
+ comment_parts.append("Food (%d g)" % parsed_record["food-carbs-grams"])
else:
- comment_parts.append('Food')
+ comment_parts.append("Food")
- if parsed_record['long-acting-flag']:
- if parsed_record['double-long-acting-insulin']:
+ if parsed_record["long-acting-flag"]:
+ if parsed_record["double-long-acting-insulin"]:
comment_parts.append(
- 'Long-acting insulin (%.1f)' %
- (parsed_record['double-long-acting-insulin']/2.))
+ "Long-acting insulin (%.1f)"
+ % (parsed_record["double-long-acting-insulin"] / 2.0)
+ )
else:
- comment_parts.append('Long-acting insulin')
+ comment_parts.append("Long-acting insulin")
- if parsed_record['rapid-acting-flag']:
+ if parsed_record["rapid-acting-flag"]:
# provide default value, as this record does not always exist
# (even if rapid-acting-flag is set)
- if parsed_record.get('double-rapid-acting-insulin', 0):
+ if parsed_record.get("double-rapid-acting-insulin", 0):
comment_parts.append(
- 'Rapid-acting insulin (%.1f)' %
- (parsed_record['double-rapid-acting-insulin']/2.))
+ "Rapid-acting insulin (%.1f)"
+ % (parsed_record["double-rapid-acting-insulin"] / 2.0)
+ )
else:
- comment_parts.append('Rapid-acting insulin')
+ comment_parts.append("Rapid-acting insulin")
return cls(
_extract_timestamp(parsed_record),
value,
- comment='; '.join(comment_parts),
+ comment="; ".join(comment_parts),
measure_method=measure_method,
- extra_data={'device_id': parsed_record['device_id']},
+ extra_data={"device_id": parsed_record["device_id"]},
)
@@ -210,16 +204,16 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle Libre',
+ "FreeStyle Libre",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
+ version_info=("Software version: " + self._get_version(),),
native_unit=self.get_glucose_unit(),
- patient_name=self.get_patient_name())
+ patient_name=self.get_patient_name(),
+ )
def get_serial_number(self):
"""Overridden function as the command is not compatible."""
- return self._send_text_command(b'$sn?').rstrip('\r\n')
+ return self._send_text_command(b"$sn?").rstrip("\r\n")
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -231,27 +225,27 @@ class Device(freestyle.FreeStyleHidDevice):
# First of all get the usually longer list of sensor readings, and
# convert them to Readings objects.
- for record in self._get_multirecord(b'$history?'):
+ for record in self._get_multirecord(b"$history?"):
parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)
- if not parsed_record or parsed_record['errors'] != 0:
+ if not parsed_record or parsed_record["errors"] != 0:
# The reading is considered invalid, so ignore it.
continue
yield common.GlucoseReading(
_extract_timestamp(parsed_record),
- parsed_record['value'],
- comment='(Sensor)',
+ parsed_record["value"],
+ comment="(Sensor)",
measure_method=common.MeasurementMethod.CGM,
- extra_data={'device_id': parsed_record['device_id']},
+ extra_data={"device_id": parsed_record["device_id"]},
)
# Then get the results of explicit scans and blood tests (and other
# events).
- for record in self._get_multirecord(b'$arresult?'):
+ for record in self._get_multirecord(b"$arresult?"):
reading = _parse_arresult(record)
if reading:
yield reading
def zero_log(self):
- self._send_text_command(b'$resetpatient')
+ self._send_text_command(b"$resetpatient")
diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py
index 66b23ca..5c3971e 100644
--- a/glucometerutils/drivers/fsoptium.py
+++ b/glucometerutils/drivers/fsoptium.py
@@ -20,13 +20,13 @@ import datetime
import logging
import re
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
_CLOCK_RE = re.compile(
- r'^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t'
- r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$')
+ r"^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t"
+ r"(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$"
+)
# The reading can be HI (padded to three-characters by a space) if the value was
# over what the meter was supposed to read. Unlike the "Clock:" line, the months
@@ -34,33 +34,33 @@ _CLOCK_RE = re.compile(
# characters, so accept a space or 'e'/'y' at the end of the month name. Also,
# the time does *not* include seconds.
_READING_RE = re.compile(
- r'^(?P<reading>HI |[0-9]{3}) '
- r'(?P<month>[A-Z][a-z]{2})[ ey] '
- r'(?P<day>[0-9]{2}) '
- r'(?P<year>[0-9]{4}) '
- r'(?P<time>[0-9]{2}:[0-9]{2}) '
- r'(?P<type>[GK]) 0x00$')
+ r"^(?P<reading>HI |[0-9]{3}) "
+ r"(?P<month>[A-Z][a-z]{2})[ ey] "
+ r"(?P<day>[0-9]{2}) "
+ r"(?P<year>[0-9]{4}) "
+ r"(?P<time>[0-9]{2}:[0-9]{2}) "
+ r"(?P<type>[GK]) 0x00$"
+)
-_CHECKSUM_RE = re.compile(
- r'^(?P<checksum>0x[0-9A-F]{4}) END$')
+_CHECKSUM_RE = re.compile(r"^(?P<checksum>0x[0-9A-F]{4}) END$")
# There are two date format used by the device. One uses three-letters month
# names, and that's easy enough. The other uses three-letters month names,
# except for (at least) July. So ignore the fourth character.
# explicit mapping. Note that the mapping *requires* a trailing whitespace.
_MONTH_MATCHES = {
- 'Jan': 1,
- 'Feb': 2,
- 'Mar': 3,
- 'Apr': 4,
- 'May': 5,
- 'Jun': 6,
- 'Jul': 7,
- 'Aug': 8,
- 'Sep': 9,
- 'Oct': 10,
- 'Nov': 11,
- 'Dec': 12
+ "Jan": 1,
+ "Feb": 2,
+ "Mar": 3,
+ "Apr": 4,
+ "May": 5,
+ "Jun": 6,
+ "Jul": 7,
+ "Aug": 8,
+ "Sep": 9,
+ "Oct": 10,
+ "Nov": 11,
+ "Dec": 12,
}
@@ -75,60 +75,59 @@ def _parse_clock(datestr):
raise exceptions.InvalidResponse(datestr)
# int() parses numbers in decimal, so we don't have to worry about '08'
- day = int(match.group('day'))
- month = _MONTH_MATCHES[match.group('month')]
- year = int(match.group('year'))
+ day = int(match.group("day"))
+ month = _MONTH_MATCHES[match.group("month")]
+ year = int(match.group("year"))
- hour, minute, second = (int (x) for x in match.group('time').split(':'))
+ hour, minute, second = (int(x) for x in match.group("time").split(":"))
return datetime.datetime(year, month, day, hour, minute, second)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 19200
- DEFAULT_CABLE_ID = '1a61:3420'
+ DEFAULT_CABLE_ID = "1a61:3420"
def _send_command(self, command):
- cmd_bytes = bytes('$%s\r\n' % command, 'ascii')
- logging.debug('Sending command: %r', cmd_bytes)
+ cmd_bytes = bytes("$%s\r\n" % command, "ascii")
+ logging.debug("Sending command: %r", cmd_bytes)
self.serial_.write(cmd_bytes)
self.serial_.flush()
response = self.serial_.readlines()
- logging.debug('Received response: %r', response)
+ logging.debug("Received response: %r", response)
# We always want to decode the output, and remove stray \r\n. Any
# failure in decoding means the output is invalid anyway.
- decoded_response = [line.decode('ascii').rstrip('\r\n')
- for line in response]
+ decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response]
return decoded_response
def connect(self):
- self._send_command('xmem') # ignore output this time
+ self._send_command("xmem") # ignore output this time
self._fetch_device_information()
def disconnect(self): # pylint: disable=no-self-use
return
def _fetch_device_information(self):
- data = self._send_command('colq')
+ data = self._send_command("colq")
for line in data:
- parsed_line = line.split('\t')
+ parsed_line = line.split("\t")
- if parsed_line[0] == 'S/N:':
+ if parsed_line[0] == "S/N:":
self.device_serialno_ = parsed_line[1]
- elif parsed_line[0] == 'Ver:':
+ elif parsed_line[0] == "Ver:":
self.device_version_ = parsed_line[1]
- if parsed_line[2] == 'MMOL':
+ if parsed_line[2] == "MMOL":
self.device_glucose_unit_ = common.Unit.MMOL_L
else: # I only have a mmol/l device, so I can't be sure.
self.device_glucose_unit_ = common.Unit.MG_DL
# There are more entries: Clock, Market, ROM and Usage, but we don't
# care for those here.
- elif parsed_line[0] == 'CMD OK':
+ elif parsed_line[0] == "CMD OK":
return
# I have not figured out why this happens, but sometimes it's echoing
@@ -142,11 +141,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
A common.MeterInfo object.
"""
return common.MeterInfo(
- 'Freestyle Optium glucometer',
+ "Freestyle Optium glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
"""Returns an identifier of the firmware version of the glucometer.
@@ -179,21 +178,21 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Returns:
A datetime object built according to the returned response.
"""
- data = self._send_command('colq')
+ data = self._send_command("colq")
for line in data:
- if not line.startswith('Clock:'):
+ if not line.startswith("Clock:"):
continue
return _parse_clock(line)
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
def _set_device_datetime(self, date):
- data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M'))
+ data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M"))
- parsed_data = ''.join(data)
- if parsed_data != 'CMD OK':
+ parsed_data = "".join(data)
+ if parsed_data != "CMD OK":
raise exceptions.InvalidResponse(parsed_data)
return self.get_datetime()
@@ -216,50 +215,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
expected.
"""
- data = self._send_command('xmem')
+ data = self._send_command("xmem")
# The first line is empty, the second is the serial number, the third
# the version, the fourth the current time, and the fifth the record
# count.. The last line has a checksum and the end.
count = int(data[4])
if count != (len(data) - 6):
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
# Extract the checksum from the last line.
checksum_match = _CHECKSUM_RE.match(data[-1])
if not checksum_match:
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
- expected_checksum = int(checksum_match.group('checksum'), 16)
+ expected_checksum = int(checksum_match.group("checksum"), 16)
# exclude the last line in the checksum calculation, as that's the
# checksum itself. The final \r\n is added separately.
- calculated_checksum = sum(
- ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa
+ calculated_checksum = sum(ord(c) for c in "\r\n".join(data[:-1])) + 0xD + 0xA
if expected_checksum != calculated_checksum:
- raise exceptions.InvalidChecksum(
- expected_checksum, calculated_checksum)
+ raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum)
for line in data[5:-1]:
match = _READING_RE.match(line)
if not match:
raise exceptions.InvalidResponse(line)
- if match.group('type') != 'G':
- logging.warning(
- 'Non-glucose readings are not supported, ignoring.')
+ if match.group("type") != "G":
+ logging.warning("Non-glucose readings are not supported, ignoring.")
continue
- if match.group('reading') == 'HI ':
+ if match.group("reading") == "HI ":
value = float("inf")
else:
- value = float(match.group('reading'))
+ value = float(match.group("reading"))
- day = int(match.group('day'))
- month = _MONTH_MATCHES[match.group('month')]
- year = int(match.group('year'))
+ day = int(match.group("day"))
+ month = _MONTH_MATCHES[match.group("month")]
+ year = int(match.group("year"))
- hour, minute = map(int, match.group('time').split(':'))
+ hour, minute = map(int, match.group("time").split(":"))
timestamp = datetime.datetime(year, month, day, hour, minute)
diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py
index 58564e5..909fed8 100644
--- a/glucometerutils/drivers/fsprecisionneo.py
+++ b/glucometerutils/drivers/fsprecisionneo.py
@@ -27,25 +27,30 @@ import datetime
from glucometerutils import common
from glucometerutils.support import freestyle
-
# The type is a string because it precedes the parsing of the object.
-_TYPE_GLUCOSE_READING = '7'
-_TYPE_KETONE_READING = '9'
-
-_NeoReading = collections.namedtuple('_NeoReading', (
- 'type', # 7 = blood glucose, 9 = blood ketone
- 'id',
- 'month', 'day', 'year', # year is two-digits
- 'hour', 'minute',
- 'unknown2',
- 'value',
- # Extra trailing and so-far-unused fields; so discard them:
- # * for blood glucose: 10 unknown trailing fields
- #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
- #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
- # * for blood ketone: 2 unknown trailing fields
- #'unknown3', 'unknown4',
-))
+_TYPE_GLUCOSE_READING = "7"
+_TYPE_KETONE_READING = "9"
+
+_NeoReading = collections.namedtuple(
+ "_NeoReading",
+ (
+ "type", # 7 = blood glucose, 9 = blood ketone
+ "id",
+ "month",
+ "day",
+ "year", # year is two-digits
+ "hour",
+ "minute",
+ "unknown2",
+ "value",
+ # Extra trailing and so-far-unused fields; so discard them:
+ # * for blood glucose: 10 unknown trailing fields
+ #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
+ #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
+ # * for blood ketone: 2 unknown trailing fields
+ #'unknown3', 'unknown4',
+ ),
+)
class Device(freestyle.FreeStyleHidDevice):
@@ -56,12 +61,12 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle Precision Neo',
+ "FreeStyle Precision Neo",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
+ version_info=("Software version: " + self._get_version(),),
native_unit=self.get_glucose_unit(),
- patient_name=self.get_patient_name())
+ patient_name=self.get_patient_name(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -69,7 +74,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b'$result?'):
+ for record in self._get_multirecord(b"$result?"):
cls = None
if record and record[0] == _TYPE_GLUCOSE_READING:
cls = common.GlucoseReading
@@ -85,11 +90,15 @@ class Device(freestyle.FreeStyleHidDevice):
if value == "HI":
value = float("inf")
values.append(int(value))
- raw_reading = _NeoReading._make(values[:len(_NeoReading._fields)])
+ raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)])
timestamp = datetime.datetime(
- raw_reading.year + 2000, raw_reading.month, raw_reading.day,
- raw_reading.hour, raw_reading.minute)
+ raw_reading.year + 2000,
+ raw_reading.month,
+ raw_reading.day,
+ raw_reading.hour,
+ raw_reading.minute,
+ )
if record and record[0] == _TYPE_KETONE_READING:
value = freestyle.convert_ketone_unit(raw_reading.value)
diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py
index 39be859..5e90b87 100644
--- a/glucometerutils/drivers/otultra2.py
+++ b/glucometerutils/drivers/otultra2.py
@@ -16,40 +16,40 @@ Expected device path: /dev/ttyUSB0 or similar serial port device.
import datetime
import re
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, serial
# The following two hashes are taken directly from LifeScan's documentation
_MEAL_CODES = {
- 'N': common.Meal.NONE,
- 'B': common.Meal.BEFORE,
- 'A': common.Meal.AFTER,
+ "N": common.Meal.NONE,
+ "B": common.Meal.BEFORE,
+ "A": common.Meal.AFTER,
}
_COMMENT_CODES = {
- '00': '', # would be 'No Comment'
- '01': 'Not Enough Food',
- '02': 'Too Much Food',
- '03': 'Mild Exercise',
- '04': 'Hard Exercise',
- '05': 'Medication',
- '06': 'Stress',
- '07': 'Illness',
- '08': 'Feel Hypo',
- '09': 'Menses',
- '10': 'Vacation',
- '11': 'Other',
+ "00": "", # would be 'No Comment'
+ "01": "Not Enough Food",
+ "02": "Too Much Food",
+ "03": "Mild Exercise",
+ "04": "Hard Exercise",
+ "05": "Medication",
+ "06": "Stress",
+ "07": "Illness",
+ "08": "Feel Hypo",
+ "09": "Menses",
+ "10": "Vacation",
+ "11": "Other",
}
-_DUMP_HEADER_RE = re.compile(
- r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"')
+_DUMP_HEADER_RE = re.compile(r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"')
_DUMP_LINE_RE = re.compile(
r'P (?P<datetime>"[A-Z]{3}","[0-9/]{8}","[0-9:]{8} "),'
r'"(?P<control>[C ]) (?P<value>[0-9]{3})(?P<parityerror>[\? ])",'
- r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00')
+ r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00'
+)
+
+_RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$")
-_RESPONSE_MATCH = re.compile(r'^(.+) ([0-9A-F]{4})\r$')
def _calculate_checksum(bytestring):
"""Calculate the checksum used by OneTouch Ultra and Ultra2 devices
@@ -66,10 +66,11 @@ def _calculate_checksum(bytestring):
checksum = 0
for byte in bytestring:
- checksum = (checksum + byte) & 0xffff
+ checksum = (checksum + byte) & 0xFFFF
return checksum
+
def _validate_and_strip_checksum(line):
"""Verify the simple 16-bit checksum and remove it from the line.
@@ -88,20 +89,19 @@ def _validate_and_strip_checksum(line):
try:
checksum_given = int(checksum_string, 16)
- checksum_calculated = _calculate_checksum(
- bytes(response, 'ascii'))
+ checksum_calculated = _calculate_checksum(bytes(response, "ascii"))
if checksum_given != checksum_calculated:
- raise exceptions.InvalidChecksum(checksum_given,
- checksum_calculated)
+ raise exceptions.InvalidChecksum(checksum_given, checksum_calculated)
except ValueError:
raise exceptions.InvalidChecksum(checksum_given, None)
return response
+
_DATETIME_RE = re.compile(
- r'^"[A-Z]{3}",'
- r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$')
+ r'^"[A-Z]{3}",' r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$'
+)
def _parse_datetime(response):
@@ -121,8 +121,8 @@ def _parse_datetime(response):
raise exceptions.InvalidResponse(response)
date, time = match.groups()
- month, day, year = map(int, date.split('/'))
- hour, minute, second = map(int, time.split(':'))
+ month, day, year = map(int, date.split("/"))
+ hour, minute, second = map(int, time.split(":"))
# Yes, OneTouch2's firmware is not Y2K safe.
return datetime.datetime(2000 + year, month, day, hour, minute, second)
@@ -130,7 +130,7 @@ def _parse_datetime(response):
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
def connect(self): # pylint: disable=no-self-use
return
@@ -144,7 +144,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Args:
cmd: command and parameters to send (without newline)
"""
- cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii')
+ cmdstring = bytes("\x11\r" + cmd + "\r", "ascii")
self.serial_.write(cmdstring)
self.serial_.flush()
@@ -160,7 +160,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
"""
self._send_command(cmd)
- line = self.serial_.readline().decode('ascii')
+ line = self.serial_.readline().decode("ascii")
return _validate_and_strip_checksum(line)
def get_meter_info(self):
@@ -170,11 +170,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
A common.MeterInfo object.
"""
return common.MeterInfo(
- 'OneTouch Ultra 2 glucometer',
+ "OneTouch Ultra 2 glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
"""Returns an identifier of the firmware version of the glucometer.
@@ -183,9 +183,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
The software version returned by the glucometer, such as
"P02.00.00 30/08/06".
"""
- response = self._send_oneliner_command('DM?')
+ response = self._send_oneliner_command("DM?")
- if response[0] != '?':
+ if response[0] != "?":
raise exceptions.InvalidResponse(response)
return response[1:]
@@ -204,7 +204,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
InvalidSerialNumber: if the returned serial number does not match
the OneTouch2 device as per specs.
"""
- response = self._send_oneliner_command('DM@')
+ response = self._send_oneliner_command("DM@")
match = self._SERIAL_NUMBER_RE.match(response)
if not match:
@@ -214,7 +214,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
# 'Y' at the far right of the serial number is the indication of a
# OneTouch Ultra2 device, as per specs.
- if serial_number[-1] != 'Y':
+ if serial_number[-1] != "Y":
raise lifescan.InvalidSerialNumber(serial_number)
return serial_number
@@ -225,12 +225,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Returns:
A datetime object built according to the returned response.
"""
- response = self._send_oneliner_command('DMF')
+ response = self._send_oneliner_command("DMF")
return _parse_datetime(response[2:])
def _set_device_datetime(self, date):
response = self._send_oneliner_command(
- 'DMT' + date.strftime('%m/%d/%y %H:%M:%S'))
+ "DMT" + date.strftime("%m/%d/%y %H:%M:%S")
+ )
return _parse_datetime(response[2:])
def zero_log(self):
@@ -239,8 +240,8 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
This function will clear the memory of the device deleting all the
readings in an irrecoverable way.
"""
- response = self._send_oneliner_command('DMZ')
- if response != 'Z':
+ response = self._send_oneliner_command("DMZ")
+ if response != "Z":
raise exceptions.InvalidResponse(response)
_GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"')
@@ -260,15 +261,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
unit used for display. This is not settable by the user in all modern
meters.
"""
- response = self._send_oneliner_command('DMSU?')
+ response = self._send_oneliner_command("DMSU?")
match = self._GLUCOSE_UNIT_RE.match(response)
unit = match.group(1)
- if unit == 'MG/DL ':
+ if unit == "MG/DL ":
return common.Unit.MG_DL
- if unit == 'MMOL/L':
+ if unit == "MMOL/L":
return common.Unit.MMOL_L
raise exceptions.InvalidGlucoseUnit(response)
@@ -287,10 +288,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
expected.
"""
- self._send_command('DMP')
+ self._send_command("DMP")
data = self.serial_.readlines()
- header = data.pop(0).decode('ascii')
+ header = data.pop(0).decode("ascii")
match = _DUMP_HEADER_RE.match(header)
if not match:
raise exceptions.InvalidResponse(header)
@@ -299,7 +300,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
assert count == len(data)
for line in data:
- line = _validate_and_strip_checksum(line.decode('ascii'))
+ line = _validate_and_strip_checksum(line.decode("ascii"))
match = _DUMP_LINE_RE.match(line)
if not match:
@@ -307,11 +308,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
line_data = match.groupdict()
- date = _parse_datetime(line_data['datetime'])
- meal = _MEAL_CODES[line_data['meal']]
- comment = _COMMENT_CODES[line_data['comment']]
+ date = _parse_datetime(line_data["datetime"])
+ meal = _MEAL_CODES[line_data["meal"]]
+ comment = _COMMENT_CODES[line_data["comment"]]
# OneTouch2 always returns the data in mg/dL even if the glucometer
# is set to mmol/L, so there is no conversion required.
yield common.GlucoseReading(
- date, float(line_data['value']), meal=meal, comment=comment)
+ date, float(line_data["value"]), meal=meal, comment=comment
+ )
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 7f4934e..0d1e7a9 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -22,87 +22,97 @@ import logging
import construct
from glucometerutils import common
-from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial
+from glucometerutils.support import (
+ construct_extras,
+ driver_base,
+ lifescan,
+ lifescan_binary_protocol,
+ serial,
+)
_PACKET = lifescan_binary_protocol.LifeScanPacket(True)
_INVALID_RECORD = 501
-_COMMAND_SUCCESS = construct.Const(b'\x05\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x05\x06")
-_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02')
+_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+ "version" / construct.PascalString(construct.Byte, encoding="ascii"),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
- b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+)
_SERIAL_NUMBER_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'serial_number' / construct.GreedyString(encoding='ascii'),
+ _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"),
)
_DATETIME_REQUEST = construct.Struct(
- construct.Const(b'\x05\x20'), # 0x20 is the datetime
- 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02),
- 'timestamp' / construct.Default(
+ construct.Const(b"\x05\x20"), # 0x20 is the datetime
+ "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02),
+ "timestamp"
+ / construct.Default(
construct_extras.Timestamp(construct.Int32ul), # type: ignore
- datetime.datetime(1970, 1, 1, 0, 0)),
+ datetime.datetime(1970, 1, 1, 0, 0),
+ ),
)
_DATETIME_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
)
-_GLUCOSE_UNIT_REQUEST = construct.Const(
- b'\x05\x09\x02\x09\x00\x00\x00\x00')
+_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")
_READING_COUNT_RESPONSE = construct.Struct(
- construct.Const(b'\x0f'),
- 'count' / construct.Int16ul,
+ construct.Const(b"\x0f"), "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x05\x1f'),
- 'record_id' / construct.Int16ul,
+ construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
- 'value' / construct.Int32ul,
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "value" / construct.Int32ul,
)
-def _make_packet(
- message, sequence_number, expect_receive, acknowledge, disconnect):
+
+def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect):
return _PACKET.build(
- {'data': {'value': {
- 'message': message,
- 'link_control': {
- 'sequence_number': sequence_number,
- 'expect_receive': expect_receive,
- 'acknowledge': acknowledge,
- 'disconnect': disconnect,
- },
- }}})
+ {
+ "data": {
+ "value": {
+ "message": message,
+ "link_control": {
+ "sequence_number": sequence_number,
+ "expect_receive": expect_receive,
+ "acknowledge": acknowledge,
+ "disconnect": disconnect,
+ },
+ }
+ }
+ }
+ )
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
def __init__(self, device):
@@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.sent_counter_ = False
self.expect_receive_ = False
- self.buffered_reader_ = construct.Rebuffered(
- _PACKET, tailcutoff=1024)
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def connect(self):
try:
- self._send_packet(b'', disconnect=True)
+ self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
@@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _send_packet(self, message, acknowledge=False, disconnect=False):
pkt = _make_packet(
- message,
- self.sent_counter_,
- self.expect_receive_,
- acknowledge,
- disconnect)
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
+ )
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
- logging.debug('received packet: %r', raw_pkt)
+ logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
if not pkt.link_control.disconnect and (
- pkt.link_control.sequence_number != self.expect_receive_):
+ pkt.link_control.sequence_number != self.expect_receive_
+ ):
raise lifescan.MalformedCommand(
- 'at position 2[0b] expected %02x, received %02x' % (
- self.expect_receive_, pkt.link_connect.sequence_count))
+ "at position 2[0b] expected %02x, received %02x"
+ % (self.expect_receive_, pkt.link_connect.sequence_count)
+ )
return pkt
def _send_ack(self):
- self._send_packet(b'', acknowledge=True, disconnect=False)
+ self._send_packet(b"", acknowledge=True, disconnect=False)
def _read_ack(self):
pkt = self._read_packet()
@@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch Ultra Easy glucometer',
+ "OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
- response = self._send_request(
- _VERSION_REQUEST, None, _VERSION_RESPONSE)
+ response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self):
response = self._send_request(
- _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
+ )
return response.serial_number
def get_datetime(self):
response = self._send_request(
- _DATETIME_REQUEST, {'request_type': 'read'},
- _DATETIME_RESPONSE)
+ _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
+ )
return response.timestamp
def _set_device_datetime(self, date):
response = self._send_request(
- _DATETIME_REQUEST, {
- 'request_type': 'write',
- 'timestamp': date,
- }, _DATETIME_RESPONSE)
+ _DATETIME_REQUEST,
+ {"request_type": "write", "timestamp": date,},
+ _DATETIME_RESPONSE,
+ )
return response.timestamp
def zero_log(self):
- self._send_request(
- _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD},
- _READING_COUNT_RESPONSE)
+ _READ_RECORD_REQUEST,
+ {"record_id": _INVALID_RECORD},
+ _READING_COUNT_RESPONSE,
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+ _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
+ )
- return common.GlucoseReading(
- response.timestamp, float(response.value))
+ return common.GlucoseReading(response.timestamp, float(response.value))
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
index bde0af3..b8429ea 100644
--- a/glucometerutils/drivers/otverio2015.py
+++ b/glucometerutils/drivers/otverio2015.py
@@ -29,67 +29,61 @@ import construct
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
_PACKET = construct.Padded(
- _REGISTER_SIZE,
- lifescan_binary_protocol.LifeScanPacket(False))
+ _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False)
+)
-_COMMAND_SUCCESS = construct.Const(b'\x03\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
_QUERY_REQUEST = construct.Struct(
- construct.Const(b'\x03\xe6\x02'),
- 'selector' / construct.Enum(
- construct.Byte, serial=0x00, model=0x01, software=0x02),
+ construct.Const(b"\x03\xe6\x02"),
+ "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02),
)
_QUERY_RESPONSE = construct.Struct(
- construct.Const(b'\x03\x06'),
- 'value' / construct.CString(encoding='utf-16-le'),
+ construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"),
)
_READ_PARAMETER_REQUEST = construct.Struct(
- construct.Const(b'\x03'),
- 'selector' / construct.Enum(
- construct.Byte, unit=0x04),
+ construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04),
)
_READ_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02')
+_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
- construct.Const(b'\x03\x20\x01'),
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ construct.Const(b"\x03\x20\x01"),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
-_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00')
+_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'count' / construct.Int16ul,
+ _COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x03\x31\x02'),
- 'record_id' / construct.Int16ul,
- construct.Const(b'\x00'),
+ construct.Const(b"\x03\x31\x02"),
+ "record_id" / construct.Int16ul,
+ construct.Const(b"\x00"),
)
_MEAL_FLAG = {
@@ -100,13 +94,12 @@ _MEAL_FLAG = {
_READ_RECORD_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'inverse_counter' / construct.Int16ul,
+ "inverse_counter" / construct.Int16ul,
construct.Padding(1),
- 'lifetime_counter' / construct.Int16ul,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
- 'value' / construct.Int16ul,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "lifetime_counter" / construct.Int16ul,
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "value" / construct.Int16ul,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(4),
)
@@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver):
def __init__(self, device):
if not device:
raise exceptions.CommandLineError(
- '--device parameter is required, should point to the disk '
- 'device representing the meter.')
+ "--device parameter is required, should point to the disk "
+ "device representing the meter."
+ )
self.device_name_ = device
self.scsi_device_ = SCSIDevice(device, readwrite=True)
@@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver):
def connect(self):
inq = self.scsi_.inquiry()
- logging.debug('Device connected: %r', inq.result)
- vendor = inq.result['t10_vendor_identification'][:32]
- if vendor != b'LifeScan':
+ logging.debug("Device connected: %r", inq.result)
+ vendor = inq.result["t10_vendor_identification"][:32]
+ if vendor != b"LifeScan":
raise exceptions.ConnectionFailed(
- 'Device %s is not a LifeScan glucometer.' % self.device_name_)
+ "Device %s is not a LifeScan glucometer." % self.device_name_
+ )
def disconnect(self): # pylint: disable=no-self-use
return
@@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver):
"""
try:
request = request_format.build(request_obj)
- request_raw = _PACKET.build({'data': {'value': {
- 'message': request,
- }}})
- logging.debug(
- 'Request sent: %s', binascii.hexlify(request_raw))
+ request_raw = _PACKET.build({"data": {"value": {"message": request,}}})
+ logging.debug("Request sent: %s", binascii.hexlify(request_raw))
self.scsi_.write10(lba, 1, request_raw)
response_raw = self.scsi_.read10(lba, 1)
logging.debug(
- 'Response received: %s', binascii.hexlify(response_raw.datain))
+ "Response received: %s", binascii.hexlify(response_raw.datain)
+ )
response_pkt = _PACKET.parse(response_raw.datain).data
- logging.debug('Response packet: %r', response_pkt)
+ logging.debug("Response packet: %r", response_pkt)
response = response_format.parse(response_pkt.value.message)
- logging.debug('Response parsed: %r', response)
+ logging.debug("Response parsed: %r", response)
return response
except construct.ConstructError as e:
@@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver):
def _query_string(self, selector):
response = self._send_request(
- 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE)
+ 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE
+ )
return response.value
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch %s glucometer' % self._query_string('model'),
+ "OneTouch %s glucometer" % self._query_string("model"),
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_serial_number(self):
- return self._query_string('serial')
+ return self._query_string("serial")
def get_version(self):
- return self._query_string('software')
+ return self._query_string("software")
def get_datetime(self):
- response = self._send_request(
- 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date):
- self._send_request(
- 3, _WRITE_RTC_REQUEST, {'timestamp': date},
- _COMMAND_SUCCESS)
+ self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self):
- self._send_request(
- 3, _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'},
- _READ_UNIT_RESPONSE)
+ 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- 3, _READ_RECORD_REQUEST, {'record_id': record_id},
- _READ_RECORD_RESPONSE)
+ 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE
+ )
return common.GlucoseReading(
- response.timestamp, float(response.value), meal=response.meal)
+ response.timestamp, float(response.value), meal=response.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py
index 69bdac9..24327ef 100644
--- a/glucometerutils/drivers/otverioiq.py
+++ b/glucometerutils/drivers/otverioiq.py
@@ -21,63 +21,63 @@ import logging
import construct
from glucometerutils import common
-from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol, serial
+from glucometerutils.support import (
+ driver_base,
+ lifescan,
+ lifescan_binary_protocol,
+ serial,
+)
_PACKET = lifescan_binary_protocol.LifeScanPacket(False)
-_COMMAND_SUCCESS = construct.Const(b'\x03\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
-_VERSION_REQUEST = construct.Const(b'\x03\x0d\x01')
+_VERSION_REQUEST = construct.Const(b"\x03\x0d\x01")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+ "version" / construct.PascalString(construct.Byte, encoding="ascii"),
# NULL-termination is not included in string length.
- construct.Const(b'\x00'),
+ construct.Const(b"\x00"),
)
-_SERIAL_NUMBER_REQUEST = construct.Const(
- b'\x03\x0b\x01\x02')
+_SERIAL_NUMBER_REQUEST = construct.Const(b"\x03\x0b\x01\x02")
_SERIAL_NUMBER_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'serial_number' / construct.CString(encoding='ascii'),
+ _COMMAND_SUCCESS, "serial_number" / construct.CString(encoding="ascii"),
)
-_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02')
+_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
- construct.Const(b'\x03\x20\x01'),
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ construct.Const(b"\x03\x20\x01"),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
-_GLUCOSE_UNIT_REQUEST = construct.Const(
- b'\x03\x09\x02\x02')
+_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x03\x09\x02\x02")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
-_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00')
+_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'count' / construct.Int16ul,
+ _COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x03\x21'),
- 'record_id' / construct.Int16ul,
+ construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul,
)
_MEAL_FLAG = {
@@ -88,18 +88,17 @@ _MEAL_FLAG = {
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
- 'value' / construct.Int16ul,
- 'control_test' / construct.Flag,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "value" / construct.Int16ul,
+ "control_test" / construct.Flag,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(2), # unknown
)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 38400
- DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x
+ DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
def __init__(self, device):
@@ -107,18 +106,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def _send_packet(self, message):
- pkt = _PACKET.build(
- {'data': {'value': {
- 'message': message,
- }}})
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ pkt = _PACKET.build({"data": {"value": {"message": message,}}})
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
- logging.debug('received packet: %r', raw_pkt)
+ logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
@@ -138,66 +134,64 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch Verio IQ glucometer',
+ "OneTouch Verio IQ glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
- response = self._send_request(
- _VERSION_REQUEST, None, _VERSION_RESPONSE)
+ response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self):
response = self._send_request(
- _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
+ )
return response.serial_number
def get_datetime(self):
- response = self._send_request(
- _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date):
- self._send_request(
- _WRITE_RTC_REQUEST, {
- 'timestamp': date,
- }, _COMMAND_SUCCESS)
+ self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date,}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self):
- self._send_request(
- _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+ _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
+ )
if response.control_test:
- logging.debug('control solution test, ignoring.')
+ logging.debug("control solution test, ignoring.")
return None
return common.GlucoseReading(
- response.timestamp, float(response.value), meal=response.meal)
+ response.timestamp, float(response.value), meal=response.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index a6e2ce5..47dd9ca 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -24,46 +24,44 @@ import operator
import construct
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
def xor_checksum(msg):
return functools.reduce(operator.xor, msg)
+
class Direction(enum.Enum):
In = 0x20
Out = 0x10
+
_PACKET = construct.Struct(
- 'stx' / construct.Const(0x53, construct.Byte),
- 'direction' / construct.Mapping(
- construct.Byte,
- {e: e.value for e in Direction}),
- 'length' / construct.Rebuild(
- construct.Byte, lambda this: len(this.message) + 2),
- 'message' / construct.Bytes(lambda this: this.length - 2),
- 'checksum' / construct.Checksum(
- construct.Byte, xor_checksum, construct.this.message),
- 'etx' / construct.Const(0xAA, construct.Byte)
+ "stx" / construct.Const(0x53, construct.Byte),
+ "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
+ "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2),
+ "message" / construct.Bytes(lambda this: this.length - 2),
+ "checksum"
+ / construct.Checksum(construct.Byte, xor_checksum, construct.this.message),
+ "etx" / construct.Const(0xAA, construct.Byte),
)
_FIRST_MESSAGE = construct.Struct(
construct.Const(0x30, construct.Byte),
- 'count' / construct.Int16ub,
+ "count" / construct.Int16ub,
construct.Const(0xAA, construct.Byte)[19],
)
-_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
-_RESPONSE_MESSAGE = b'\x10\x40'
+_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA"
+_RESPONSE_MESSAGE = b"\x10\x40"
-_DATE_SET_MESSAGE = b'\x10\x10'
+_DATE_SET_MESSAGE = b"\x10\x10"
-_DISCONNECT_MESSAGE = b'\x10\x60'
-_DISCONNECTED_MESSAGE = b'\x10\x70'
+_DISCONNECT_MESSAGE = b"\x10\x60"
+_DISCONNECTED_MESSAGE = b"\x10\x70"
-_FETCH_MESSAGE = b'\x10\x60'
+_FETCH_MESSAGE = b"\x10\x60"
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
@@ -73,67 +71,64 @@ _MEAL_FLAG = {
_READING = construct.Struct(
construct.Byte[2],
- 'year' / construct.Byte,
- 'month' / construct.Byte,
- 'day' / construct.Byte,
- 'hour' / construct.Byte,
- 'minute' / construct.Byte,
- 'value' / construct.Int16ub,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "year" / construct.Byte,
+ "month" / construct.Byte,
+ "day" / construct.Byte,
+ "hour" / construct.Byte,
+ "minute" / construct.Byte,
+ "value" / construct.Int16ub,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Byte[7],
)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 38400
- DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable.
+ DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable.
TIMEOUT = 300 # We need to wait for data from the device.
def read_message(self):
pkt = _PACKET.parse_stream(self.serial_)
- logging.debug('received packet: %r', pkt)
+ logging.debug("received packet: %r", pkt)
return pkt.message
def wait_and_ready(self):
- challenge = b'\0'
- while challenge == b'\0':
+ challenge = b"\0"
+ while challenge == b"\0":
challenge = self.serial_.read(1)
# The first packet read may have a prefixed zero, it might be a bug
# in the cp210x driver or device, but discard it if found.
- if challenge == b'\0':
- logging.debug('spurious null byte received')
+ if challenge == b"\0":
+ logging.debug("spurious null byte received")
continue
- if challenge != b'\x53':
+ if challenge != b"\x53":
raise exceptions.ConnectionFailed(
- message='Unexpected starting bytes %r' % challenge)
+ message="Unexpected starting bytes %r" % challenge
+ )
challenge += self.serial_.read(6)
if challenge != _CHALLENGE_PACKET_FULL:
raise exceptions.ConnectionFailed(
- message='Unexpected challenge %r' % challenge)
+ message="Unexpected challenge %r" % challenge
+ )
- logging.debug(
- 'challenge packet received: %s', binascii.hexlify(challenge))
+ logging.debug("challenge packet received: %s", binascii.hexlify(challenge))
self.send_message(_RESPONSE_MESSAGE)
# The first packet only contains the counter of how many readings are
# available.
first_message = _FIRST_MESSAGE.parse(self.read_message())
- logging.debug('received first message: %r', first_message)
+ logging.debug("received first message: %r", first_message)
return first_message.count
def send_message(self, message):
- pkt = _PACKET.build({
- 'message': message,
- 'direction': Direction.Out
- })
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ pkt = _PACKET.build({"message": message, "direction": Direction.Out})
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
def connect(self): # pylint: disable=no-self-use
@@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
def get_meter_info(self): # pylint: disable=no-self-use
- return common.MeterInfo('SD CodeFree glucometer')
+ return common.MeterInfo("SD CodeFree glucometer")
def get_version(self): # pylint: disable=no-self-use
raise NotImplementedError
@@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise NotImplementedError
def _set_device_datetime(self, date):
- setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')
+ setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii")
# Ignore the readings count.
self.wait_and_ready()
@@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
# The date we return should only include up to minute, unfortunately.
- return datetime.datetime(date.year, date.month, date.day,
- date.hour, date.minute)
+ return datetime.datetime(
+ date.year, date.month, date.day, date.hour, date.minute
+ )
def zero_log(self):
raise NotImplementedError
@@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
message = self.read_message()
reading = _READING.parse(message)
- logging.debug('received reading: %r', reading)
+ logging.debug("received reading: %r", reading)
yield common.GlucoseReading(
datetime.datetime(
- 2000 + reading.year, reading.month,
- reading.day, reading.hour, reading.minute),
- reading.value, meal=reading.meal)
+ 2000 + reading.year,
+ reading.month,
+ reading.day,
+ reading.hour,
+ reading.minute,
+ ),
+ reading.value,
+ meal=reading.meal,
+ )
diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py
index 4ab25ee..0385299 100644
--- a/glucometerutils/drivers/td4277.py
+++ b/glucometerutils/drivers/td4277.py
@@ -21,14 +21,13 @@ import operator
import construct
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
class Direction(enum.Enum):
- In = 0xa5
- Out = 0xa3
+ In = 0xA5
+ Out = 0xA3
def byte_checksum(data):
@@ -36,18 +35,18 @@ def byte_checksum(data):
_PACKET = construct.Struct(
- 'data' / construct.RawCopy(
+ "data"
+ / construct.RawCopy(
construct.Struct(
- construct.Const(b'\x51'),
- 'command' / construct.Byte,
- 'message' / construct.Bytes(4),
- 'direction' / construct.Mapping(
- construct.Byte,
- {e: e.value for e in Direction}),
+ construct.Const(b"\x51"),
+ "command" / construct.Byte,
+ "message" / construct.Bytes(4),
+ "direction"
+ / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
),
),
- 'checksum' / construct.Checksum(
- construct.Byte, byte_checksum, construct.this.data.data),
+ "checksum"
+ / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data),
)
_EMPTY_MESSAGE = 0
@@ -60,38 +59,32 @@ _SET_DATETIME = 0x33
_GET_MODEL = 0x24
-_GET_READING_COUNT = 0x2b
+_GET_READING_COUNT = 0x2B
_GET_READING_DATETIME = 0x25
_GET_READING_VALUE = 0x26
_CLEAR_MEMORY = 0x52
_MODEL_STRUCT = construct.Struct(
- construct.Const(b'\x77\x42'),
- construct.Byte,
- construct.Byte,
+ construct.Const(b"\x77\x42"), construct.Byte, construct.Byte,
)
_DATETIME_STRUCT = construct.Struct(
- 'day' / construct.Int16ul,
- 'minute' / construct.Byte,
- 'hour' / construct.Byte,
+ "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte,
)
_DAY_BITSTRUCT = construct.BitStruct(
- 'year' / construct.BitsInteger(7),
- 'month' / construct.BitsInteger(4),
- 'day' / construct.BitsInteger(5),
+ "year" / construct.BitsInteger(7),
+ "month" / construct.BitsInteger(4),
+ "day" / construct.BitsInteger(5),
)
_READING_COUNT_STRUCT = construct.Struct(
- 'count' / construct.Int16ul,
- construct.Int16ul,
+ "count" / construct.Int16ul, construct.Int16ul,
)
_READING_SELECTION_STRUCT = construct.Struct(
- 'record_id' / construct.Int16ul,
- construct.Const(b'\x00\x00'),
+ "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"),
)
_MEAL_FLAG = {
@@ -101,21 +94,24 @@ _MEAL_FLAG = {
}
_READING_VALUE_STRUCT = construct.Struct(
- 'value' / construct.Int16ul,
- construct.Const(b'\x06'),
- 'meal'/ construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "value" / construct.Int16ul,
+ construct.Const(b"\x06"),
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
)
+
def _make_packet(command, message, direction=Direction.Out):
return _PACKET.build(
- {'data': {
- 'value': {
- 'command': command,
- 'message': message,
- 'direction': direction,
- },
- }})
+ {
+ "data": {
+ "value": {
+ "command": command,
+ "message": message,
+ "direction": direction,
+ },
+ }
+ }
+ )
def _parse_datetime(message):
@@ -124,11 +120,12 @@ def _parse_datetime(message):
# unfortunately.
day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day))
return datetime.datetime(
- 2000+day.year, day.month, day.day, date.hour, date.minute)
+ 2000 + day.year, day.month, day.day, date.hour, date.minute
+ )
def _select_record(record_id):
- return _READING_SELECTION_STRUCT.build({'record_id': record_id})
+ return _READING_SELECTION_STRUCT.build({"record_id": record_id})
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
@@ -137,19 +134,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
TIMEOUT = 0.5
def __init__(self, device):
- super(Device, self).__init__('cp2110://' + device)
- self.buffered_reader_ = construct.Rebuffered(
- _PACKET, tailcutoff=1024)
+ super(Device, self).__init__("cp2110://" + device)
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def _send_command(
- self, command, message=_EMPTY_MESSAGE, validate_response=True):
+ def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True):
pkt = _make_packet(command, message)
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
response = self.buffered_reader_.parse_stream(self.serial_)
- logging.debug('received packet: %r', response)
+ logging.debug("received packet: %r", response)
if validate_response and response.data.value.command != command:
raise InvalidResponse(response)
@@ -158,24 +153,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def connect(self):
response_command, message = self._send_command(
- _CONNECT_REQUEST, validate_response=False)
+ _CONNECT_REQUEST, validate_response=False
+ )
if response_command not in _VALID_CONNECT_RESPONSE:
raise exceptions.ConnectionFailed(
- 'Invalid response received: %2x %r' % (
- response_command, message))
+ "Invalid response received: %2x %r" % (response_command, message)
+ )
_, model_message = self._send_command(_GET_MODEL)
try:
_MODEL_STRUCT.parse(model_message)
except construct.ConstructError:
raise exceptions.ConnectionFailed(
- 'Invalid model identified: %r' % model_message)
+ "Invalid model identified: %r" % model_message
+ )
def disconnect(self):
pass
def get_meter_info(self):
- return common.MeterInfo('TaiDoc TD-4277 glucometer')
+ return common.MeterInfo("TaiDoc TD-4277 glucometer")
def get_version(self): # pylint: disable=no-self-use
raise NotImplementedError
@@ -191,18 +188,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _set_device_datetime(self, date):
assert date.year >= 2000
- day_struct = _DAY_BITSTRUCT.build({
- 'year': date.year - 2000,
- 'month': date.month,
- 'day': date.day,
- })
+ day_struct = _DAY_BITSTRUCT.build(
+ {"year": date.year - 2000, "month": date.month, "day": date.day,}
+ )
day_word = construct.Int16ub.parse(day_struct)
- date_message = _DATETIME_STRUCT.build({
- 'day': day_word,
- 'minute': date.minute,
- 'hour': date.hour})
+ date_message = _DATETIME_STRUCT.build(
+ {"day": day_word, "minute": date.minute, "hour": date.hour}
+ )
_, message = self._send_command(_SET_DATETIME, message=date_message)
@@ -215,17 +209,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _get_reading(self, record_id):
_, reading_date_message = self._send_command(
- _GET_READING_DATETIME,
- _select_record(record_id))
+ _GET_READING_DATETIME, _select_record(record_id)
+ )
reading_date = _parse_datetime(reading_date_message)
_, reading_value_message = self._send_command(
- _GET_READING_VALUE,
- _select_record(record_id))
+ _GET_READING_VALUE, _select_record(record_id)
+ )
reading_value = _READING_VALUE_STRUCT.parse(reading_value_message)
return common.GlucoseReading(
- reading_date, reading_value.value, meal=reading_value.meal)
+ reading_date, reading_value.value, meal=reading_value.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py
index 53aa980..52e4d22 100644
--- a/glucometerutils/exceptions.py
+++ b/glucometerutils/exceptions.py
@@ -3,6 +3,7 @@
# SPDX-License-Identifier: MIT
"""Common exceptions for glucometerutils."""
+
class Error(Exception):
"""Base class for the errors."""
@@ -14,7 +15,7 @@ class CommandLineError(Error):
class ConnectionFailed(Error):
"""It was not possible to connect to the meter."""
- def __init__(self, message='Unable to connect to the meter.'):
+ def __init__(self, message="Unable to connect to the meter."):
super(ConnectionFailed, self).__init__(message)
@@ -30,14 +31,16 @@ class InvalidResponse(Error):
def __init__(self, response):
super(InvalidResponse, self).__init__(
- 'Invalid response received:\n%s' % response)
+ "Invalid response received:\n%s" % response
+ )
class InvalidChecksum(InvalidResponse):
def __init__(self, wire, calculated):
super(InvalidChecksum, self).__init__(
- 'Response checksum not matching: %08x (wire) != %08x (calculated)' %
- (wire, calculated))
+ "Response checksum not matching: %08x (wire) != %08x (calculated)"
+ % (wire, calculated)
+ )
class InvalidGlucoseUnit(Error):
@@ -45,12 +48,12 @@ class InvalidGlucoseUnit(Error):
def __init__(self, unit):
super(InvalidGlucoseUnit, self).__init__(
- 'Invalid glucose unit received:\n%s' % unit)
+ "Invalid glucose unit received:\n%s" % unit
+ )
class InvalidDateTime(Error):
"""The device has an invalid date/time setting."""
def __init__(self):
- super(InvalidDateTime, self).__init__(
- 'Invalid date and time for device')
+ super(InvalidDateTime, self).__init__("Invalid date and time for device")
diff --git a/glucometerutils/glucometer.py b/glucometerutils/glucometer.py
index 29502fd..012bdb1 100755
--- a/glucometerutils/glucometer.py
+++ b/glucometerutils/glucometer.py
@@ -10,76 +10,108 @@ import inspect
import logging
import sys
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
+
def main():
if sys.version_info < (3, 5):
- raise Exception(
- 'Unsupported Python version, please use at least Python 3.5')
+ raise Exception("Unsupported Python version, please use at least Python 3.5")
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="action")
parser.add_argument(
- '--driver', action='store', required=True,
- help='Select the driver to use for connecting to the glucometer.')
+ "--driver",
+ action="store",
+ required=True,
+ help="Select the driver to use for connecting to the glucometer.",
+ )
parser.add_argument(
- '--device', action='store', required=False,
- help=('Select the path to the glucometer device. Some devices require '
- 'this argument, others will try autodetection.'))
+ "--device",
+ action="store",
+ required=False,
+ help=(
+ "Select the path to the glucometer device. Some devices require "
+ "this argument, others will try autodetection."
+ ),
+ )
parser.add_argument(
- '--vlog', action='store', required=False, type=int,
- help=('Python logging level. See the levels at '
- 'https://docs.python.org/3/library/logging.html#logging-levels'))
+ "--vlog",
+ action="store",
+ required=False,
+ type=int,
+ help=(
+ "Python logging level. See the levels at "
+ "https://docs.python.org/3/library/logging.html#logging-levels"
+ ),
+ )
subparsers.add_parser(
- 'help', help=('Display a description of the driver, including '
- 'supported features and known quirks.'))
- subparsers.add_parser(
- 'info', help='Display information about the meter.')
- subparsers.add_parser(
- 'zero', help='Zero out the data log of the meter.')
+ "help",
+ help=(
+ "Display a description of the driver, including "
+ "supported features and known quirks."
+ ),
+ )
+ subparsers.add_parser("info", help="Display information about the meter.")
+ subparsers.add_parser("zero", help="Zero out the data log of the meter.")
parser_dump = subparsers.add_parser(
- 'dump', help='Dump the readings stored in the device.')
+ "dump", help="Dump the readings stored in the device."
+ )
parser_dump.add_argument(
- '--unit', action='store',
+ "--unit",
+ action="store",
choices=[unit.value for unit in common.Unit],
- help='Select the unit to use for the dumped data.')
+ help="Select the unit to use for the dumped data.",
+ )
parser_dump.add_argument(
- '--with-ketone', action='store_true', default=False,
- help='Enable ketone reading if available on the glucometer.')
+ "--with-ketone",
+ action="store_true",
+ default=False,
+ help="Enable ketone reading if available on the glucometer.",
+ )
parser_date = subparsers.add_parser(
- 'datetime', help='Reads or sets the date and time of the glucometer.')
+ "datetime", help="Reads or sets the date and time of the glucometer."
+ )
parser_date.add_argument(
- '--set', action='store', nargs='?', const='now', default=None,
- help='Set the date rather than just reading it from the device.')
+ "--set",
+ action="store",
+ nargs="?",
+ const="now",
+ default=None,
+ help="Set the date rather than just reading it from the device.",
+ )
parser_patient = subparsers.add_parser(
- 'patient', help='Reads or sets the patient information.')
+ "patient", help="Reads or sets the patient information."
+ )
parser_patient.add_argument(
- '--set_name', action='store', required=False,
- help='Set the patient name, if the meter supports it.')
+ "--set_name",
+ action="store",
+ required=False,
+ help="Set the patient name, if the meter supports it.",
+ )
args = parser.parse_args()
logging.basicConfig(level=args.vlog)
try:
- driver = importlib.import_module(
- 'glucometerutils.drivers.' + args.driver)
+ driver = importlib.import_module("glucometerutils.drivers." + args.driver)
except ImportError as e:
logging.error(
- 'Error importing driver "%s", please check your --driver '
- 'parameter:\n%s', args.driver, e)
+ 'Error importing driver "%s", please check your --driver ' "parameter:\n%s",
+ args.driver,
+ e,
+ )
return 1
# This check needs to happen before we try to initialize the device, as the
# help action does not require a --device at all.
- if args.action == 'help':
+ if args.action == "help":
print(inspect.getdoc(driver))
return 0
@@ -89,17 +121,20 @@ def main():
device_info = device.get_meter_info()
try:
- if args.action == 'info':
+ if args.action == "info":
try:
time_str = device.get_datetime()
except exceptions.InvalidDateTime:
- time_str = 'INVALID'
+ time_str = "INVALID"
# Also catch any leftover ValueErrors.
except (NotImplementedError, ValueError):
- time_str = 'N/A'
- print("{device_info}Time: {time}".format(
- device_info=str(device_info), time=time_str))
- elif args.action == 'dump':
+ time_str = "N/A"
+ print(
+ "{device_info}Time: {time}".format(
+ device_info=str(device_info), time=time_str
+ )
+ )
+ elif args.action == "dump":
unit = args.unit
if unit is None:
unit = device_info.native_unit
@@ -107,57 +142,58 @@ def main():
readings = device.get_readings()
if not args.with_ketone:
- readings = (reading for reading in readings
- if not isinstance(reading, common.KetoneReading))
+ readings = (
+ reading
+ for reading in readings
+ if not isinstance(reading, common.KetoneReading)
+ )
for reading in sorted(readings, key=lambda r: r.timestamp):
print(reading.as_csv(unit))
- elif args.action == 'datetime':
- if args.set == 'now':
+ elif args.action == "datetime":
+ if args.set == "now":
print(device.set_datetime())
elif args.set:
try:
from dateutil import parser as date_parser
+
new_date = date_parser.parse(args.set)
except ImportError:
logging.error(
- 'Unable to import module "dateutil", '
- 'please install it.')
+ 'Unable to import module "dateutil", ' "please install it."
+ )
return 1
except ValueError:
- logging.error('%s: not a valid date', args.set)
+ logging.error("%s: not a valid date", args.set)
return 1
print(device.set_datetime(new_date))
else:
print(device.get_datetime())
- elif args.action == 'patient':
+ elif args.action == "patient":
if args.set_name != None:
try:
device.set_patient_name(args.set_name)
except NotImplementedError:
- print(
- 'The glucometer does not support setting patient name.')
+ print("The glucometer does not support setting patient name.")
try:
patient_name = device.get_patient_name()
if patient_name is None:
- patient_name = '[N/A]'
- print('Patient Name: {patient_name}'.format(
- patient_name=patient_name))
+ patient_name = "[N/A]"
+ print("Patient Name: {patient_name}".format(patient_name=patient_name))
except NotImplementedError:
- print(
- 'The glucometer does not support retrieving patient name.')
- elif args.action == 'zero':
- confirm = input('Delete the device data log? (y/N) ')
- if confirm.lower() in ['y', 'ye', 'yes']:
+ print("The glucometer does not support retrieving patient name.")
+ elif args.action == "zero":
+ confirm = input("Delete the device data log? (y/N) ")
+ if confirm.lower() in ["y", "ye", "yes"]:
device.zero_log()
- print('\nDevice data log zeroed.')
+ print("\nDevice data log zeroed.")
else:
- print('\nDevice data log not zeroed.')
+ print("\nDevice data log not zeroed.")
return 1
else:
return 1
except exceptions.Error as err:
- print('Error while executing \'%s\': %s' % (args.action, str(err)))
+ print("Error while executing '%s': %s" % (args.action, str(err)))
return 1
device.disconnect()
diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py
index 7abcd9e..b44ee84 100644
--- a/glucometerutils/support/construct_extras.py
+++ b/glucometerutils/support/construct_extras.py
@@ -7,6 +7,7 @@ import datetime
import construct
+
class Timestamp(construct.Adapter):
"""Adapter for converting datetime object into timestamps.
@@ -14,6 +15,7 @@ class Timestamp(construct.Adapter):
and an optional epoch offset to the UNIX Epoch.
"""
+
__slots__ = ["epoch"]
def __init__(self, subcon, epoch=0):
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
index b4db4eb..3b0dc80 100644
--- a/glucometerutils/support/contourusb.py
+++ b/glucometerutils/support/contourusb.py
@@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile(
"(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
"(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
"(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
- "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)")
+ "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)"
+)
_RESULT_RECORD_RE = re.compile(
"^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
"^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
"(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
- "(?P<datetime>[0-9]+)")
+ "(?P<datetime>[0-9]+)"
+)
_RECORD_FORMAT = re.compile(
- '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)'
- '\x0d(?P<end>[\x03\x17]))'
- '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a')
+ "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)"
+ "\x0d(?P<end>[\x03\x17]))"
+ "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a"
+)
+
class FrameError(Exception):
pass
+
class ContourHidDevice(hiddevice.HidDevice):
"""Base class implementing the ContourUSB HID common protocol.
"""
+
blocksize = 64
# Operation modes
@@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice):
while True:
data = self._read()
dstr = data
- result.append(dstr[4:data[3]+4])
- if data[3] != self.blocksize-4:
+ result.append(dstr[4 : data[3] + 4])
+ if data[3] != self.blocksize - 4:
break
- return (b"".join(result))
+ return b"".join(result)
def write(self, data):
- data = b'ABC' + chr(len(data)).encode() + data.encode()
+ data = b"ABC" + chr(len(data)).encode() + data.encode()
pad_length = self.blocksize - len(data)
- data += pad_length * b'\x00'
+ data += pad_length * b"\x00"
self._write(data)
- USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
def parse_header_record(self, text):
header = _HEADER_RECORD_RE.search(text)
- self.field_del = header.group('field_del')
- self.repeat_del = header.group('repeat_del')
- self.component_del = header.group('component_del')
- self.escape_del = header.group('escape_del')
-
- self.product_code = header.group('product_code')
- self.dig_ver = header.group('dig_ver')
- self.anlg_ver = header.group('anlg_ver')
- self.agp_ver = header.group('agp_ver')
-
- self.serial_num = header.group('serial_num')
- self.sku_id = header.group('sku_id')
- self.res_marking = header.group('res_marking')
- self.config_bits = header.group('config_bits')
- self.lang = header.group('lang')
- self.interv = header.group('interv')
- self.ref_method = header.group('ref_method')
- self.internal = header.group('internal')
+ self.field_del = header.group("field_del")
+ self.repeat_del = header.group("repeat_del")
+ self.component_del = header.group("component_del")
+ self.escape_del = header.group("escape_del")
+
+ self.product_code = header.group("product_code")
+ self.dig_ver = header.group("dig_ver")
+ self.anlg_ver = header.group("anlg_ver")
+ self.agp_ver = header.group("agp_ver")
+
+ self.serial_num = header.group("serial_num")
+ self.sku_id = header.group("sku_id")
+ self.res_marking = header.group("res_marking")
+ self.config_bits = header.group("config_bits")
+ self.lang = header.group("lang")
+ self.interv = header.group("interv")
+ self.ref_method = header.group("ref_method")
+ self.internal = header.group("internal")
# U limit
- self.unit = header.group('unit')
- self.lo_bound = header.group('lo_bound')
- self.hi_bound = header.group('hi_bound')
+ self.unit = header.group("unit")
+ self.lo_bound = header.group("lo_bound")
+ self.hi_bound = header.group("hi_bound")
# X field
- self.hypo_limit = header.group('hypo_limit')
- self.overall_low = header.group('overall_low')
- self.pre_food_low = header.group('pre_food_low')
- self.post_food_low = header.group('post_food_low')
- self.overall_high = header.group('overall_high')
- self.pre_food_high = header.group('pre_food_high')
- self.post_food_high = header.group('post_food_high')
- self.hyper_limit = header.group('hyper_limit')
+ self.hypo_limit = header.group("hypo_limit")
+ self.overall_low = header.group("overall_low")
+ self.pre_food_low = header.group("pre_food_low")
+ self.post_food_low = header.group("post_food_low")
+ self.overall_high = header.group("overall_high")
+ self.pre_food_high = header.group("pre_food_high")
+ self.post_food_high = header.group("post_food_high")
+ self.hyper_limit = header.group("hyper_limit")
# Y field
- self.upp_hyper = header.group('upp_hyper')
- self.low_hyper = header.group('low_hyper')
- self.upp_hypo = header.group('upp_hypo')
- self.low_hypo = header.group('low_hypo')
- self.upp_low_target = header.group('upp_low_target')
- self.low_low_target = header.group('low_low_target')
- self.upp_hi_target = header.group('upp_hi_target')
- self.low_hi_target = header.group('low_hi_target')
+ self.upp_hyper = header.group("upp_hyper")
+ self.low_hyper = header.group("low_hyper")
+ self.upp_hypo = header.group("upp_hypo")
+ self.low_hypo = header.group("low_hypo")
+ self.upp_low_target = header.group("upp_low_target")
+ self.low_low_target = header.group("low_low_target")
+ self.upp_hi_target = header.group("upp_hi_target")
+ self.low_hi_target = header.group("low_hi_target")
# Z field
- self.trends = header.group('trends')
+ self.trends = header.group("trends")
- self.total = header.group('total')
- self.spec_ver = header.group('spec_ver')
+ self.total = header.group("total")
+ self.spec_ver = header.group("spec_ver")
# Datetime string in YYYYMMDDHHMM format
- self.datetime = header.group('datetime')
-
+ self.datetime = header.group("datetime")
def checksum(self, text):
"""
Implemented by Anders Hammarquist for glucodump project
More info: https://bitbucket.org/iko/glucodump/src/default/
"""
- checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1]
- return ('00' + checksum)[-2:]
+ checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1]
+ return ("00" + checksum)[-2:]
def checkframe(self, frame):
"""
@@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice):
if not match:
raise FrameError("Couldn't parse frame", frame)
- recno = int(match.group('recno'))
+ recno = int(match.group("recno"))
if self.currecno is None:
self.currecno = recno
@@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice):
return None
if recno != self.currecno:
- raise FrameError("Bad recno, got %r expected %r" %
- (recno, self.currecno),
- frame)
-
- checksum = self.checksum(match.group('check'))
- if checksum != match.group('checksum'):
- raise FrameError("Checksum error: got %s expected %s" %
- (match.group('checksum'), checksum),
- frame)
+ raise FrameError(
+ "Bad recno, got %r expected %r" % (recno, self.currecno), frame
+ )
+
+ checksum = self.checksum(match.group("check"))
+ if checksum != match.group("checksum"):
+ raise FrameError(
+ "Checksum error: got %s expected %s"
+ % (match.group("checksum"), checksum),
+ frame,
+ )
self.currecno = (self.currecno + 1) % 8
- return match.group('text')
+ return match.group("text")
def connect(self):
"""Connecting the device, nothing to be done.
@@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice):
self.state = self.mode_establish
try:
while True:
- self.write('\x04')
+ self.write("\x04")
res = self.read()
if res[0] == 4 and res[-1] == 5:
# we are connected and just got a header
header_record = res.decode()
- stx = header_record.find('\x02')
+ stx = header_record.find("\x02")
if stx != -1:
- result = _RECORD_FORMAT.match(
- header_record[stx:]).group('text')
+ result = _RECORD_FORMAT.match(header_record[stx:]).group("text")
self.parse_header_record(result)
break
else:
@@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice):
int(datetime_str[6:8]), # day
int(datetime_str[8:10]), # hour
int(datetime_str[10:12]), # minute
- 0)
+ 0,
+ )
def sync(self):
"""
@@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
self.state = self.mode_establish
try:
- tometer = '\x04'
+ tometer = "\x04"
result = None
foo = 0
while True:
@@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice):
continue
if data_bytes[-1] == 5:
# got an <ENQ>, send <ACK>
- tometer = '\x06'
+ tometer = "\x06"
self.currecno = None
continue
if self.state == self.mode_data:
@@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice):
# got an <EOT>, done
self.state = self.mode_precommand
break
- stx = data.find('\x02')
+ stx = data.find("\x02")
if stx != -1:
# got <STX>, parse frame
try:
result = self.checkframe(data[stx:])
- tometer = '\x06'
+ tometer = "\x06"
self.state = self.mode_data
except FrameError as e:
- tometer = '\x15' # Couldn't parse, <NAK>
+ tometer = "\x15" # Couldn't parse, <NAK>
else:
# Got something we don't understand, <NAK> it
- tometer = '\x15'
+ tometer = "\x15"
except Exception as e:
raise e
@@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
records_arr = []
for rec in self.sync():
- if rec[0] == 'R':
+ if rec[0] == "R":
# parse using result record regular expression
rec_text = self.parse_result_record(rec)
# get dictionary to use in main driver module without import re
diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py
index b7b3d0f..1caa960 100644
--- a/glucometerutils/support/driver_base.py
+++ b/glucometerutils/support/driver_base.py
@@ -3,7 +3,6 @@ from datetime import datetime
class GlucometerDriver(ABC):
-
def connect(self):
pass
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index d48ac04..341e978 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -18,7 +18,7 @@ from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple
import construct
from glucometerutils import exceptions
-from glucometerutils.support import hiddevice, driver_base
+from glucometerutils.support import driver_base, hiddevice
_INIT_COMMAND = 0x01
_INIT_RESPONSE = 0x71
@@ -30,54 +30,66 @@ _ENCRYPTION_SETUP_COMMAND = 0x14
_ENCRYPTION_SETUP_RESPONSE = 0x33
_ALWAYS_UNENCRYPTED_MESSAGES = (
- _INIT_COMMAND, 0x04, 0x05, 0x06, 0x0c, 0x0d,
- _ENCRYPTION_SETUP_COMMAND, 0x15,
- _ENCRYPTION_SETUP_RESPONSE, 0x34, 0x35,
+ _INIT_COMMAND,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x0C,
+ 0x0D,
+ _ENCRYPTION_SETUP_COMMAND,
+ 0x15,
+ _ENCRYPTION_SETUP_RESPONSE,
+ 0x34,
+ 0x35,
_INIT_RESPONSE,
_KEEPALIVE_RESPONSE,
)
+
def _create_matcher(message_type, content):
# type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool]
def _matcher(message):
- return (
- message[0] == message_type and
- (content is None or content == message[1]))
+ return message[0] == message_type and (content is None or content == message[1])
return _matcher
-_is_init_reply = _create_matcher(_INIT_RESPONSE, b'\x01')
+
+_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01")
_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None)
-_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b'\x85')
-_is_encryption_missing_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x15')
-_is_encryption_setup_error = _create_matcher(
- _ENCRYPTION_SETUP_RESPONSE, b'\x14')
+_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
+_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
+_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
_FREESTYLE_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.Prefixed(construct.Byte, construct.GreedyBytes)),
+ construct.Prefixed(construct.Byte, construct.GreedyBytes),
+ ),
)
_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct(
- 'hid_report' / construct.Const(0, construct.Byte),
- 'message_type' / construct.Byte,
- 'command' / construct.Padded(
+ "hid_report" / construct.Const(0, construct.Byte),
+ "message_type" / construct.Byte,
+ "command"
+ / construct.Padded(
63, # command can only be up to 62 bytes, but one is used for length.
- construct.GreedyBytes),
+ construct.GreedyBytes,
+ ),
)
-_TEXT_COMPLETION_RE = re.compile(b'CMD (?:OK|Fail!)')
+_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
_TEXT_REPLY_FORMAT = re.compile(
- b'^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n'
- b'CMD (?P<status>OK|Fail!)\r\n$', re.DOTALL)
+ b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n"
+ b"CMD (?P<status>OK|Fail!)\r\n$",
+ re.DOTALL,
+)
_MULTIRECORDS_FORMAT = re.compile(
- '^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$',
- re.DOTALL)
+ "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL
+)
def _verify_checksum(message, expected_checksum_hex):
@@ -131,17 +143,18 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
TEXT_CMD = 0x60
TEXT_REPLY_CMD = 0x60
- USB_VENDOR_ID = 0x1a61 # type: int # Abbott Diabetes Care
+ USB_VENDOR_ID = 0x1A61 # type: int # Abbott Diabetes Care
USB_PRODUCT_ID = None # type: int
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
- self._send_command(_INIT_COMMAND, b'')
+ self._send_command(_INIT_COMMAND, b"")
response = self._read_response()
if not _is_init_reply(response):
raise exceptions.ConnectionFailed(
- 'Connection error: unexpected message %02x:%s' % (
- response[0], response[1].hex()))
+ "Connection error: unexpected message %02x:%s"
+ % (response[0], response[1].hex())
+ )
def disconnect(self):
"""Disconnect the device, nothing to be done."""
@@ -162,9 +175,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
meta_construct = _FREESTYLE_MESSAGE
usb_packet = meta_construct.build(
- {'message_type': message_type, 'command': command})
+ {"message_type": message_type, "command": command}
+ )
- logging.debug('Sending packet: %r', usb_packet)
+ logging.debug("Sending packet: %r", usb_packet)
self._write(usb_packet)
def _read_response(self, encrypted=False):
@@ -172,14 +186,14 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
"""Read the response from the device and extracts it."""
usb_packet = self._read()
- logging.debug('Read packet: %r', usb_packet)
+ logging.debug("Read packet: %r", usb_packet)
assert usb_packet
message_type = usb_packet[0]
if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES:
message_length = usb_packet[1]
- message_content = usb_packet[2:2+message_length]
+ message_content = usb_packet[2 : 2 + message_length]
else:
message_content = usb_packet[1:]
@@ -195,15 +209,13 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
return self._read_response(encrypted=encrypted)
if _is_unknown_message_error(message):
- raise exceptions.CommandError('Invalid command')
+ raise exceptions.CommandError("Invalid command")
if _is_encryption_missing_error(message):
- raise exceptions.CommandError(
- 'Device encryption not initialized.')
+ raise exceptions.CommandError("Device encryption not initialized.")
if _is_encryption_setup_error(message):
- raise exceptions.CommandError(
- 'Device encryption initialization failed.')
+ raise exceptions.CommandError("Device encryption initialization failed.")
return message
@@ -213,18 +225,19 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
self._send_command(self.TEXT_CMD, command)
# Reply can stretch multiple buffers
- full_content = b''
+ full_content = b""
while True:
message_type, content = self._read_response()
logging.debug(
- 'Received message: type %02x content %s',
- message_type, content.hex())
+ "Received message: type %02x content %s", message_type, content.hex()
+ )
if message_type != self.TEXT_REPLY_CMD:
raise exceptions.InvalidResponse(
- 'Message type %02x does not match expectations: %r' %
- (message_type, content))
+ "Message type %02x does not match expectations: %r"
+ % (message_type, content)
+ )
full_content += content
@@ -235,17 +248,17 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(full_content)
- message = match.group('message')
- _verify_checksum(message, match.group('checksum'))
+ message = match.group("message")
+ _verify_checksum(message, match.group("checksum"))
- if match.group('status') != b'OK':
+ if match.group("status") != b"OK":
raise exceptions.InvalidResponse(message or "Command failed")
# If there is anything in the response that is not ASCII-safe, this is
# probably in the patient name. The Windows utility does not seem to
# validate those, so just replace anything non-ASCII with the correct
# unknown codepoint.
- return message.decode('ascii', 'replace')
+ return message.decode("ascii", "replace")
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
@@ -253,16 +266,16 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def _get_version(self):
# type: () -> Text
"""Return the software version of the device."""
- return self._send_text_command(b'$swver?').rstrip('\r\n')
+ return self._send_text_command(b"$swver?").rstrip("\r\n")
def get_serial_number(self):
# type: () -> Text
"""Returns the serial number of the device."""
- return self._send_text_command(b'$serlnum?').rstrip('\r\n')
+ return self._send_text_command(b"$serlnum?").rstrip("\r\n")
def get_patient_name(self):
# type: () -> Optional[Text]
- patient_name = self._send_text_command(b'$ptname?').rstrip('\r\n')
+ patient_name = self._send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
@@ -270,11 +283,11 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
def set_patient_name(self, name):
# type: (Text) -> None
try:
- encoded_name = name.encode('ascii')
+ encoded_name = name.encode("ascii")
except UnicodeDecodeError:
- raise ValueError('Only ASCII-safe names are tested working')
+ raise ValueError("Only ASCII-safe names are tested working")
- result = self._send_text_command(b'$ptname,' + encoded_name)
+ result = self._send_text_command(b"$ptname," + encoded_name)
def get_datetime(self):
# type: () -> datetime.datetime
@@ -283,12 +296,12 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
This is one of the few commands that appear common to many of the
FreeStyle devices that use the HID framing protocol.
"""
- date = self._send_text_command(b'$date?').rstrip('\r\n')
- time = self._send_text_command(b'$time?').rstrip('\r\n')
+ date = self._send_text_command(b"$date?").rstrip("\r\n")
+ time = self._send_text_command(b"$time?").rstrip("\r\n")
# Year is returned as an offset to 2000.
- month, day, year = (int(x) for x in date.split(','))
- hour, minute = (int(x) for x in time.split(','))
+ month, day, year = (int(x) for x in date.split(","))
+ hour, minute = (int(x) for x in time.split(","))
# At least Precision Neo devices can have an invalid date (bad RTC?),
# and report 255 for each field, which is not valid for
@@ -304,10 +317,10 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
# The format used by the FreeStyle devices is not composable based on
# standard strftime() (namely it includes no leading zeros), so we need
# to build it manually.
- date_cmd = '$date,{month},{day},{year}'.format(
- month=date.month, day=date.day, year=(date.year-2000))
- time_cmd = '$time,{hour},{minute}'.format(
- hour=date.hour, minute=date.minute)
+ date_cmd = "$date,{month},{day},{year}".format(
+ month=date.month, day=date.day, year=(date.year - 2000)
+ )
+ time_cmd = "$time,{hour},{minute}".format(hour=date.hour, minute=date.minute)
self._send_text_command(bytes(date_cmd, "ascii"))
self._send_text_command(bytes(time_cmd, "ascii"))
@@ -333,7 +346,7 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
in the record file.
"""
message = self._send_text_command(command)
- logging.debug('Received multirecord message:\n%s', message)
+ logging.debug("Received multirecord message:\n%s", message)
if message == "Log Empty\r\n":
return iter(())
@@ -341,9 +354,9 @@ class FreeStyleHidDevice(hiddevice.HidDevice, driver_base.GlucometerDriver, ABC)
if not match:
raise exceptions.InvalidResponse(message)
- records_str = match.group('message')
- _verify_checksum(records_str, match.group('checksum'))
+ records_str = match.group("message")
+ _verify_checksum(records_str, match.group("checksum"))
- logging.debug('Received multi-record string: %s', records_str)
+ logging.debug("Received multi-record string: %s", records_str)
- return csv.reader(records_str.split('\r\n'))
+ return csv.reader(records_str.split("\r\n"))
diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py
index b44e5c7..ad8f3a6 100644
--- a/glucometerutils/support/hiddevice.py
+++ b/glucometerutils/support/hiddevice.py
@@ -45,33 +45,36 @@ class HidDevice:
# type: (Optional[Text]) -> None
if None in (self.USB_VENDOR_ID, self.USB_PRODUCT_ID) and not device:
raise exceptions.CommandLineError(
- '--device parameter is required, should point to a /dev/hidraw '
- 'device node representing the meter.')
+ "--device parameter is required, should point to a /dev/hidraw "
+ "device node representing the meter."
+ )
# If the user passed a device path that does not exist, raise an
# error. This is to avoid writing to a file instead of to a device node.
if device and not os.path.exists(device):
raise exceptions.ConnectionFailed(
- message='Path %s does not exist.' % device)
+ message="Path %s does not exist." % device
+ )
# If the user passed a device, try opening it.
if device:
- self.handle_ = open(device, 'w+b') # type: Optional[BinaryIO]
+ self.handle_ = open(device, "w+b") # type: Optional[BinaryIO]
else:
self.handle_ = None
- logging.info(
- 'No --device parameter provided, using hidapi library.')
+ logging.info("No --device parameter provided, using hidapi library.")
try:
import hid
+
self.hidapi_handle_ = hid.device()
- self.hidapi_handle_.open(
- self.USB_VENDOR_ID, self.USB_PRODUCT_ID)
+ self.hidapi_handle_.open(self.USB_VENDOR_ID, self.USB_PRODUCT_ID)
except ImportError:
raise exceptions.ConnectionFailed(
- message='Missing requied "hidapi" module.')
+ message='Missing requied "hidapi" module.'
+ )
except OSError as e:
raise exceptions.ConnectionFailed(
- message='Unable to connect to meter: %s.' % e)
+ message="Unable to connect to meter: %s." % e
+ )
def _write(self, report):
# type: (bytes) -> None
@@ -95,5 +98,4 @@ class HidDevice:
if self.handle_:
return bytes(self.handle_.read(size))
- return bytes(self.hidapi_handle_.read(
- size, timeout_ms=self.TIMEOUT_MS))
+ return bytes(self.hidapi_handle_.read(size, timeout_ms=self.TIMEOUT_MS))
diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py
index 19155d4..9340e49 100644
--- a/glucometerutils/support/lifescan.py
+++ b/glucometerutils/support/lifescan.py
@@ -8,22 +8,25 @@ from glucometerutils import exceptions
class MissingChecksum(exceptions.InvalidResponse):
"""The response misses the expected 4-digits checksum."""
+
def __init__(self, response):
super(MissingChecksum, self).__init__(
- 'Response is missing checksum: %s' % response)
+ "Response is missing checksum: %s" % response
+ )
class InvalidSerialNumber(exceptions.Error):
"""The serial number is not as expected."""
+
def __init__(self, serial_number):
super(InvalidSerialNumber, self).__init__(
- 'Serial number %s is invalid.' % serial_number)
+ "Serial number %s is invalid." % serial_number
+ )
class MalformedCommand(exceptions.InvalidResponse):
def __init__(self, message):
- super(MalformedCommand, self).__init__(
- 'Malformed command: %s' % message)
+ super(MalformedCommand, self).__init__("Malformed command: %s" % message)
def crc_ccitt(data):
@@ -39,13 +42,13 @@ def crc_ccitt(data):
This function uses the non-default 0xFFFF seed as used by multiple
LifeScan meters.
"""
- crc = 0xffff
+ crc = 0xFFFF
for byte in data:
- crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff
+ crc = (crc >> 8) & 0xFFFF | (crc << 8) & 0xFFFF
crc ^= byte
- crc ^= (crc & 0xff) >> 4
- crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff
- crc ^= (crc & 0xff) << 5
+ crc ^= (crc & 0xFF) >> 4
+ crc ^= (((crc << 8) & 0xFFFF) << 4) & 0xFFFF
+ crc ^= (crc & 0xFF) << 5
- return crc & 0xffff
+ return crc & 0xFFFF
diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py
index 288da83..441226e 100644
--- a/glucometerutils/support/lifescan_binary_protocol.py
+++ b/glucometerutils/support/lifescan_binary_protocol.py
@@ -12,48 +12,51 @@ This module implements an interface to send and receive these messages.
import construct
from glucometerutils import common
-from glucometerutils.support import construct_extras
-from glucometerutils.support import lifescan
+from glucometerutils.support import construct_extras, lifescan
_LINK_CONTROL = construct.BitStruct(
construct.Padding(3),
- 'more' / construct.Default(construct.Flag, False),
- 'disconnect' / construct.Default(construct.Flag, False),
- 'acknowledge' / construct.Default(construct.Flag, False),
- 'expect_receive' / construct.Default(construct.Flag, False),
- 'sequence_number' / construct.Default(construct.Flag, False),
+ "more" / construct.Default(construct.Flag, False),
+ "disconnect" / construct.Default(construct.Flag, False),
+ "acknowledge" / construct.Default(construct.Flag, False),
+ "expect_receive" / construct.Default(construct.Flag, False),
+ "sequence_number" / construct.Default(construct.Flag, False),
)
+
def LifeScanPacket(include_link_control): # pylint: disable=invalid-name
# type: (bool) -> construct.Struct
if include_link_control:
link_control_construct = _LINK_CONTROL
else:
- link_control_construct = construct.Const(b'\x00')
+ link_control_construct = construct.Const(b"\x00")
return construct.Struct(
- 'data' / construct.RawCopy(
+ "data"
+ / construct.RawCopy(
construct.Struct(
- construct.Const(b'\x02'), # stx
- 'length' / construct.Rebuild(
- construct.Byte, lambda this: len(this.message) + 6),
- 'link_control' / link_control_construct,
- 'message' / construct.Bytes(
- lambda this: this.length - 6),
- construct.Const(b'\x03'), # etx
+ construct.Const(b"\x02"), # stx
+ "length"
+ / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 6),
+ "link_control" / link_control_construct,
+ "message" / construct.Bytes(lambda this: this.length - 6),
+ construct.Const(b"\x03"), # etx
),
),
- 'checksum' / construct.Checksum(
- construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data),
+ "checksum"
+ / construct.Checksum(
+ construct.Int16ul, lifescan.crc_ccitt, construct.this.data.data
+ ),
)
+
VERIO_TIMESTAMP = construct_extras.Timestamp(
- construct.Int32ul, epoch=946684800) # 2000-01-01 (not 2010)
+ construct.Int32ul, epoch=946684800
+) # 2000-01-01 (not 2010)
_GLUCOSE_UNIT_MAPPING_TABLE = {
common.Unit.MG_DL: 0x00,
common.Unit.MMOL_L: 0x01,
}
-GLUCOSE_UNIT = construct.Mapping(
- construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE)
+GLUCOSE_UNIT = construct.Mapping(construct.Byte, _GLUCOSE_UNIT_MAPPING_TABLE)
diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py
index 6067cf7..d9e80ea 100644
--- a/glucometerutils/support/serial.py
+++ b/glucometerutils/support/serial.py
@@ -8,7 +8,6 @@ import logging
from typing import Optional, Text
import serial
-
from glucometerutils import exceptions
@@ -48,19 +47,23 @@ class SerialDevice:
assert self.BAUDRATE is not None
if not device and self.DEFAULT_CABLE_ID:
- logging.info(
- 'No --device parameter provided, looking for default cable.')
- device = 'hwgrep://' + self.DEFAULT_CABLE_ID
+ logging.info("No --device parameter provided, looking for default cable.")
+ device = "hwgrep://" + self.DEFAULT_CABLE_ID
if not device:
raise exceptions.CommandLineError(
- 'No --device parameter provided, and no default cable known.')
+ "No --device parameter provided, and no default cable known."
+ )
self.serial_ = serial.serial_for_url(
device,
baudrate=self.BAUDRATE,
timeout=self.TIMEOUT,
writeTimeout=None,
- bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
+ bytesize=serial.EIGHTBITS,
+ parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
- xonxoff=False, rtscts=False, dsrdtr=False)
+ xonxoff=False,
+ rtscts=False,
+ dsrdtr=False,
+ )
diff --git a/reversing_tools/abbott/extract_freestyle.py b/reversing_tools/abbott/extract_freestyle.py
index 1227de1..8e1c0d2 100755
--- a/reversing_tools/abbott/extract_freestyle.py
+++ b/reversing_tools/abbott/extract_freestyle.py
@@ -28,48 +28,76 @@ import usbmon.pcapng
_KEEPALIVE_TYPE = 0x22
_UNENCRYPTED_TYPES = (
- 0x01, 0x04, 0x05, 0x06, 0x0c, 0x0d,
- 0x14, 0x15,
- 0x33, 0x34, 0x35,
+ 0x01,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x0C,
+ 0x0D,
+ 0x14,
+ 0x15,
+ 0x33,
+ 0x34,
+ 0x35,
0x71,
_KEEPALIVE_TYPE,
)
-_ABBOTT_VENDOR_ID = 0x1a61
+_ABBOTT_VENDOR_ID = 0x1A61
_LIBRE2_PRODUCT_ID = 0x3950
-
def main():
if sys.version_info < (3, 7):
- raise Exception(
- 'Unsupported Python version, please use at least Python 3.7.')
+ raise Exception("Unsupported Python version, please use at least Python 3.7.")
parser = argparse.ArgumentParser()
parser.add_argument(
- '--device_address', action='store', type=str,
- help=('Device address (busnum.devnum) of the device to extract capture'
- 'of. If none provided, device descriptors will be relied on.'))
+ "--device_address",
+ action="store",
+ type=str,
+ help=(
+ "Device address (busnum.devnum) of the device to extract capture"
+ "of. If none provided, device descriptors will be relied on."
+ ),
+ )
parser.add_argument(
- '--encrypted_protocol', action='store_true',
- help=('Whether to expect encrypted protocol in the capture.'
- ' Ignored if the device descriptors are present in the capture.'))
+ "--encrypted_protocol",
+ action="store_true",
+ help=(
+ "Whether to expect encrypted protocol in the capture."
+ " Ignored if the device descriptors are present in the capture."
+ ),
+ )
parser.add_argument(
- '--vlog', action='store', required=False, type=int,
- help=('Python logging level. See the levels at '
- 'https://docs.python.org/3/library/logging.html#logging-levels'))
+ "--vlog",
+ action="store",
+ required=False,
+ type=int,
+ help=(
+ "Python logging level. See the levels at "
+ "https://docs.python.org/3/library/logging.html#logging-levels"
+ ),
+ )
parser.add_argument(
- '--print_keepalive', action='store_true',
- help=('Whether to print the keepalive messages sent by the device. '
- 'Keepalive messages are usually safely ignored.'))
+ "--print_keepalive",
+ action="store_true",
+ help=(
+ "Whether to print the keepalive messages sent by the device. "
+ "Keepalive messages are usually safely ignored."
+ ),
+ )
parser.add_argument(
- 'pcap_file', action='store', type=str,
- help='Path to the pcapng file with the USB capture.')
+ "pcap_file",
+ action="store",
+ type=str,
+ help="Path to the pcapng file with the USB capture.",
+ )
args = parser.parse_args()
@@ -82,15 +110,18 @@ def main():
if descriptor.vendor_id == _ABBOTT_VENDOR_ID:
if args.device_address and args.device_address != descriptor.address:
raise Exception(
- 'Multiple Abbott device present in capture, please'
- ' provide a --device_address flag.')
+ "Multiple Abbott device present in capture, please"
+ " provide a --device_address flag."
+ )
args.device_address = descriptor.address
descriptor = session.device_descriptors.get(args.device_address, None)
if not descriptor:
logging.warning(
f"Unable to find device %s in the capture's descriptors."
- " Assuming non-encrypted protocol.", args.device_address)
+ " Assuming non-encrypted protocol.",
+ args.device_address,
+ )
else:
assert descriptor.vendor_id == _ABBOTT_VENDOR_ID
@@ -102,15 +133,17 @@ def main():
if not first.type == usbmon.constants.PacketType.SUBMISSION:
continue
- if not first.address.startswith(f'{args.device_address}.'):
+ if not first.address.startswith(f"{args.device_address}."):
# No need to check second, they will be linked.
continue
if first.xfer_type == usbmon.constants.XferType.INTERRUPT:
pass
- elif (first.xfer_type == usbmon.constants.XferType.CONTROL and
- not first.setup_packet or
- first.setup_packet.type == usbmon.setup.Type.CLASS):
+ elif (
+ first.xfer_type == usbmon.constants.XferType.CONTROL
+ and not first.setup_packet
+ or first.setup_packet.type == usbmon.setup.Type.CLASS
+ ):
pass
else:
continue
@@ -133,19 +166,19 @@ def main():
if args.encrypted_protocol and message_type not in _UNENCRYPTED_TYPES:
# When expecting encrypted communication), we ignore the
# message_length and we keep it with the whole message.
- message_type = f'x{message_type:02x}'
+ message_type = f"x{message_type:02x}"
message = packet.payload[1:]
else:
message_length = packet.payload[1]
- message_type = f' {message_type:02x}'
- message = packet.payload[2:2+message_length]
-
- print(usbmon.chatter.dump_bytes(
- packet.direction,
- message,
- prefix=f'[{message_type}]',
- print_empty=True,
- ), '\n')
+ message_type = f" {message_type:02x}"
+ message = packet.payload[2 : 2 + message_length]
+
+ print(
+ usbmon.chatter.dump_bytes(
+ packet.direction, message, prefix=f"[{message_type}]", print_empty=True,
+ ),
+ "\n",
+ )
if __name__ == "__main__":
diff --git a/reversing_tools/abbott/freestyle_hid_console.py b/reversing_tools/abbott/freestyle_hid_console.py
index 4697da9..5005e4a 100755
--- a/reversing_tools/abbott/freestyle_hid_console.py
+++ b/reversing_tools/abbott/freestyle_hid_console.py
@@ -9,22 +9,37 @@ import sys
from glucometerutils import exceptions
from glucometerutils.support import freestyle
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
- '--text_cmd_type', action='store', type=int, default=0x60,
- help='Message type for text commands sent to the device.')
+ "--text_cmd_type",
+ action="store",
+ type=int,
+ default=0x60,
+ help="Message type for text commands sent to the device.",
+ )
parser.add_argument(
- '--text_reply_type', action='store', type=int, default=0x60,
- help='Message type for text replies received from the device.')
+ "--text_reply_type",
+ action="store",
+ type=int,
+ default=0x60,
+ help="Message type for text replies received from the device.",
+ )
parser.add_argument(
- 'device', action='store',
- help='Path to the HID device to open.')
+ "device", action="store", help="Path to the HID device to open."
+ )
parser.add_argument(
- '--vlog', action='store', required=False, type=int,
- help=('Python logging level. See the levels at '
- 'https://docs.python.org/3/library/logging.html#logging-levels'))
+ "--vlog",
+ action="store",
+ required=False,
+ type=int,
+ help=(
+ "Python logging level. See the levels at "
+ "https://docs.python.org/3/library/logging.html#logging-levels"
+ ),
+ )
args = parser.parse_args()
@@ -38,15 +53,16 @@ def main():
while True:
if sys.stdin.isatty():
- command = input('>>> ')
+ command = input(">>> ")
else:
command = input()
- print('>>> {command}'.format(command=command))
+ print(">>> {command}".format(command=command))
try:
- print(device._send_text_command(bytes(command, 'ascii')))
+ print(device._send_text_command(bytes(command, "ascii")))
except exceptions.InvalidResponse as error:
- print('! {error}'.format(error=error))
+ print("! {error}".format(error=error))
+
if __name__ == "__main__":
main()
diff --git a/test/test_common.py b/test/test_common.py
index daa83c9..3e733e3 100644
--- a/test/test_common.py
+++ b/test/test_common.py
@@ -8,39 +8,35 @@
import datetime
from absl.testing import parameterized
-
from glucometerutils import common
class TestGlucoseConversion(parameterized.TestCase):
-
def test_convert_to_mmol(self):
self.assertEqual(
- 5.56, common.convert_glucose_unit(
- 100, common.Unit.MG_DL, common.Unit.MMOL_L))
+ 5.56,
+ common.convert_glucose_unit(100, common.Unit.MG_DL, common.Unit.MMOL_L),
+ )
def test_convert_to_mgdl(self):
self.assertEqual(
- 180, common.convert_glucose_unit(
- 10, common.Unit.MMOL_L, common.Unit.MG_DL))
+ 180, common.convert_glucose_unit(10, common.Unit.MMOL_L, common.Unit.MG_DL)
+ )
@parameterized.parameters(list(common.Unit))
def test_convert_identity(self, unit):
- self.assertEqual(
- 100, common.convert_glucose_unit(
- 100, unit, unit))
+ self.assertEqual(100, common.convert_glucose_unit(100, unit, unit))
@parameterized.parameters([unit.value for unit in common.Unit])
def test_convert_identity_str(self, unit_str):
- self.assertEqual(
- 100, common.convert_glucose_unit(
- 100, unit_str, unit_str))
+ self.assertEqual(100, common.convert_glucose_unit(100, unit_str, unit_str))
@parameterized.parameters(
- (common.Unit.MMOL_L, 'foo'),
- ('foo', common.Unit.MG_DL),
+ (common.Unit.MMOL_L, "foo"),
+ ("foo", common.Unit.MG_DL),
(None, common.Unit.MG_DL),
- (common.Meal.NONE, common.Unit.MG_DL))
+ (common.Meal.NONE, common.Unit.MG_DL),
+ )
def test_invalid_values(self, from_unit, to_unit):
with self.assertRaises(Exception):
common.convert_glucose_unit(100, from_unit, to_unit)
@@ -52,67 +48,76 @@ class TestGlucoseReading(parameterized.TestCase):
def test_minimal(self):
reading = common.GlucoseReading(self.TEST_DATETIME, 100)
- self.assertEqual(reading.as_csv(common.Unit.MG_DL),
- '"2018-01-01 00:30:45","100.00","","blood sample",""')
+ self.assertEqual(
+ reading.as_csv(common.Unit.MG_DL),
+ '"2018-01-01 00:30:45","100.00","","blood sample",""',
+ )
@parameterized.named_parameters(
- ('_mgdl', common.Unit.MG_DL, 100),
- ('_mmoll', common.Unit.MMOL_L, 5.56))
+ ("_mgdl", common.Unit.MG_DL, 100), ("_mmoll", common.Unit.MMOL_L, 5.56)
+ )
def test_value(self, unit, expected_value):
reading = common.GlucoseReading(self.TEST_DATETIME, 100)
- self.assertAlmostEqual(
- reading.get_value_as(unit), expected_value, places=2)
+ self.assertAlmostEqual(reading.get_value_as(unit), expected_value, places=2)
@parameterized.named_parameters(
- ('_meal_none',
- {'meal': common.Meal.NONE},
- '"2018-01-01 00:30:45","100.00","","blood sample",""'),
- ('_meal_before',
- {'meal': common.Meal.BEFORE},
- '"2018-01-01 00:30:45","100.00","Before Meal","blood sample",""'),
- ('_meal_after',
- {'meal': common.Meal.AFTER},
- '"2018-01-01 00:30:45","100.00","After Meal","blood sample",""'),
- ('_measurement_blood',
- {'measure_method': common.MeasurementMethod.BLOOD_SAMPLE},
- '"2018-01-01 00:30:45","100.00","","blood sample",""'),
- ('_measurement_cgm',
- {'measure_method': common.MeasurementMethod.CGM},
- '"2018-01-01 00:30:45","100.00","","CGM",""'),
- ('_comment',
- {'comment': 'too much'},
- '"2018-01-01 00:30:45","100.00","","blood sample","too much"'),
- ('_comment_quoted',
- {'comment': '"too" much'},
- '"2018-01-01 00:30:45","100.00","","blood sample","\"too\" much"'),
+ (
+ "_meal_none",
+ {"meal": common.Meal.NONE},
+ '"2018-01-01 00:30:45","100.00","","blood sample",""',
+ ),
+ (
+ "_meal_before",
+ {"meal": common.Meal.BEFORE},
+ '"2018-01-01 00:30:45","100.00","Before Meal","blood sample",""',
+ ),
+ (
+ "_meal_after",
+ {"meal": common.Meal.AFTER},
+ '"2018-01-01 00:30:45","100.00","After Meal","blood sample",""',
+ ),
+ (
+ "_measurement_blood",
+ {"measure_method": common.MeasurementMethod.BLOOD_SAMPLE},
+ '"2018-01-01 00:30:45","100.00","","blood sample",""',
+ ),
+ (
+ "_measurement_cgm",
+ {"measure_method": common.MeasurementMethod.CGM},
+ '"2018-01-01 00:30:45","100.00","","CGM",""',
+ ),
+ (
+ "_comment",
+ {"comment": "too much"},
+ '"2018-01-01 00:30:45","100.00","","blood sample","too much"',
+ ),
+ (
+ "_comment_quoted",
+ {"comment": '"too" much'},
+ '"2018-01-01 00:30:45","100.00","","blood sample",""too" much"',
+ ),
)
def test_csv(self, kwargs_dict, expected_csv):
- reading = common.GlucoseReading(
- self.TEST_DATETIME, 100, **kwargs_dict)
+ reading = common.GlucoseReading(self.TEST_DATETIME, 100, **kwargs_dict)
self.assertEqual(reading.as_csv(common.Unit.MG_DL), expected_csv)
class TestMeterInfo(parameterized.TestCase):
-
@parameterized.named_parameters(
- ('_no_serial_number',
- {},
- 'Serial Number: N/A\n'),
- ('_serial_number',
- {'serial_number': 1234},
- 'Serial Number: 1234\n'),
- ('_no_version_information',
- {},
- 'Version Information:\n N/A\n'),
- ('_version_information_1',
- {'version_info': ['test']},
- 'Version Information:\n test\n'),
- ('_version_information_2',
- {'version_info': ['test', 'test2']},
- 'Version Information:\n test\n test2\n'),
- ('_default_native_unit',
- {},
- 'Native Unit: mg/dL\n'),
+ ("_no_serial_number", {}, "Serial Number: N/A\n"),
+ ("_serial_number", {"serial_number": 1234}, "Serial Number: 1234\n"),
+ ("_no_version_information", {}, "Version Information:\n N/A\n"),
+ (
+ "_version_information_1",
+ {"version_info": ["test"]},
+ "Version Information:\n test\n",
+ ),
+ (
+ "_version_information_2",
+ {"version_info": ["test", "test2"]},
+ "Version Information:\n test\n test2\n",
+ ),
+ ("_default_native_unit", {}, "Native Unit: mg/dL\n"),
)
def test_meter_info(self, kwargs_dict, expected_fragment):
info = common.MeterInfo(self.id(), **kwargs_dict)
diff --git a/test/test_construct_extras.py b/test/test_construct_extras.py
index 476d6f8..6bba873 100644
--- a/test/test_construct_extras.py
+++ b/test/test_construct_extras.py
@@ -7,60 +7,66 @@
import datetime
-from absl.testing import absltest
import construct
+from absl.testing import absltest
from glucometerutils.support import construct_extras
-
_TEST_DATE1 = datetime.datetime(1970, 1, 2, 0, 0)
_TEST_DATE2 = datetime.datetime(1971, 1, 1, 0, 0)
_TEST_DATE3 = datetime.datetime(1970, 1, 1, 0, 0)
_NEW_EPOCH = 31536000 # datetime.datetime(1971, 1, 1, 0, 0)
-class TestTimestamp(absltest.TestCase):
+class TestTimestamp(absltest.TestCase):
def test_build_unix_epoch(self):
self.assertEqual(
construct_extras.Timestamp(construct.Int32ul).build(_TEST_DATE1),
- b'\x80\x51\x01\x00')
+ b"\x80\x51\x01\x00",
+ )
def test_parse_unix_epoch(self):
self.assertEqual(
- construct_extras.Timestamp(construct.Int32ul).parse(
- b'\x803\xe1\x01'),
- _TEST_DATE2)
+ construct_extras.Timestamp(construct.Int32ul).parse(b"\x803\xe1\x01"),
+ _TEST_DATE2,
+ )
def test_build_custom_epoch(self):
self.assertEqual(
- construct_extras.Timestamp(
- construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE2),
- b'\x00\x00\x00\x00')
+ construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).build(
+ _TEST_DATE2
+ ),
+ b"\x00\x00\x00\x00",
+ )
def test_parse_custom_epoch(self):
self.assertEqual(
- construct_extras.Timestamp(
- construct.Int32ul, epoch=_NEW_EPOCH).parse(
- b'\x00\x00\x00\x00'),
- _TEST_DATE2)
+ construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).parse(
+ b"\x00\x00\x00\x00"
+ ),
+ _TEST_DATE2,
+ )
def test_build_custom_epoch_negative_failure(self):
with self.assertRaises(construct.core.FormatFieldError):
- construct_extras.Timestamp(
- construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE1)
+ construct_extras.Timestamp(construct.Int32ul, epoch=_NEW_EPOCH).build(
+ _TEST_DATE1
+ )
def test_build_custom_epoch_negative_success(self):
self.assertEqual(
- construct_extras.Timestamp(
- construct.Int32sl, epoch=_NEW_EPOCH).build(_TEST_DATE1),
- b'\x00\x1e\x20\xfe')
+ construct_extras.Timestamp(construct.Int32sl, epoch=_NEW_EPOCH).build(
+ _TEST_DATE1
+ ),
+ b"\x00\x1e\x20\xfe",
+ )
def test_build_varint(self):
self.assertEqual(
- construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3),
- b'\x00')
+ construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3), b"\x00"
+ )
def test_invalid_value(self):
with self.assertRaises(AssertionError):
- construct_extras.Timestamp(construct.Int32ul).build('foo')
+ construct_extras.Timestamp(construct.Int32ul).build("foo")
diff --git a/test/test_contourusb.py b/test/test_contourusb.py
index e2fb6cb..3b8b547 100644
--- a/test/test_contourusb.py
+++ b/test/test_contourusb.py
@@ -5,62 +5,57 @@
# pylint: disable=protected-access,missing-docstring
-from absl.testing import absltest
-
-from glucometerutils.support import contourusb
-
from unittest.mock import Mock
+from absl.testing import absltest
+from glucometerutils.support import contourusb
class TestContourUSB(absltest.TestCase):
- header_record = b'\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05'
-
+ header_record = b"\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05"
+
mock_dev = Mock()
def test_get_datetime(self):
import datetime
- self.datetime = "201908071315" # returned by
+ self.datetime = "201908071315" # returned by
self.assertEqual(
- datetime.datetime(2019,8,7,13,15),
- contourusb.ContourHidDevice.get_datetime(self)
+ datetime.datetime(2019, 8, 7, 13, 15),
+ contourusb.ContourHidDevice.get_datetime(self),
)
-
def test_RECORD_FORMAT_match(self):
- #first decode the header record frame
+ # first decode the header record frame
header_record_decoded = self.header_record.decode()
- stx = header_record_decoded.find('\x02')
+ stx = header_record_decoded.find("\x02")
_RECORD_FORMAT = contourusb._RECORD_FORMAT
- result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text')
+ result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group("text")
self.assertEqual(
- "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304",
- result
+ "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304",
+ result,
)
def test_parse_header_record(self):
-
+
_HEADER_RECORD_RE = contourusb._HEADER_RECORD_RE
_RECORD_FORMAT = contourusb._RECORD_FORMAT
-
header_record_decoded = self.header_record.decode()
- stx = header_record_decoded.find('\x02')
+ stx = header_record_decoded.find("\x02")
-
- result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text')
- contourusb.ContourHidDevice.parse_header_record(self.mock_dev,result)
+ result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group("text")
+ contourusb.ContourHidDevice.parse_header_record(self.mock_dev, result)
self.assertEqual(self.mock_dev.field_del, "\\")
self.assertEqual(self.mock_dev.repeat_del, "^")
self.assertEqual(self.mock_dev.component_del, "&")
self.assertEqual(self.mock_dev.escape_del, "|")
- self.assertEqual(self.mock_dev.product_code, "Bayer7390")
+ self.assertEqual(self.mock_dev.product_code, "Bayer7390")
self.assertEqual(self.mock_dev.dig_ver, "01.24")
self.assertEqual(self.mock_dev.anlg_ver, "01.04")
@@ -100,18 +95,20 @@ class TestContourUSB(absltest.TestCase):
self.assertEqual(self.mock_dev.datetime, "201909221304")
- #TO-DO checksum and checkframe unit tests
+ # TO-DO checksum and checkframe unit tests
def test_parse_result_record(self):
- #first decode the header record frame
+ # first decode the header record frame
result_record = "R|8|^^^Glucose|133|mg/dL^P||B/X||201202052034"
- result_dict = contourusb.ContourHidDevice.parse_result_record(self.mock_dev, result_record)
-
- self.assertEqual(result_dict['record_type'], 'R')
- self.assertEqual(result_dict['seq_num'], '8')
- self.assertEqual(result_dict['test_id'], 'Glucose')
- self.assertEqual(result_dict['value'], '133')
- self.assertEqual(result_dict['unit'], 'mg/dL')
- self.assertEqual(result_dict['ref_method'], 'P')
- self.assertEqual(result_dict['markers'], 'B/X')
- self.assertEqual(result_dict['datetime'], '201202052034') \ No newline at end of file
+ result_dict = contourusb.ContourHidDevice.parse_result_record(
+ self.mock_dev, result_record
+ )
+
+ self.assertEqual(result_dict["record_type"], "R")
+ self.assertEqual(result_dict["seq_num"], "8")
+ self.assertEqual(result_dict["test_id"], "Glucose")
+ self.assertEqual(result_dict["value"], "133")
+ self.assertEqual(result_dict["unit"], "mg/dL")
+ self.assertEqual(result_dict["ref_method"], "P")
+ self.assertEqual(result_dict["markers"], "B/X")
+ self.assertEqual(result_dict["datetime"], "201202052034")
diff --git a/test/test_freestyle.py b/test/test_freestyle.py
index 0ff669d..fb3f3b9 100644
--- a/test/test_freestyle.py
+++ b/test/test_freestyle.py
@@ -6,18 +6,17 @@
# pylint: disable=protected-access,missing-docstring
from absl.testing import absltest
-
from glucometerutils.support import freestyle
class TestFreeStyle(absltest.TestCase):
-
def test_outgoing_command(self):
"""Test the generation of a new outgoing message."""
self.assertEqual(
- b'\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
- b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0',
+ b"\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"
+ b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
freestyle._FREESTYLE_MESSAGE.build(
- {'message_type': 23, 'command': b'command'}),
+ {"message_type": 23, "command": b"command"}
+ ),
)
diff --git a/test/test_fsoptium.py b/test/test_fsoptium.py
index d9a1ccb..bdc76f8 100644
--- a/test/test_fsoptium.py
+++ b/test/test_fsoptium.py
@@ -8,31 +8,24 @@
import datetime
from absl.testing import parameterized
-
-from glucometerutils.drivers import fsoptium
from glucometerutils import exceptions
+from glucometerutils.drivers import fsoptium
class TestFreeStyleOptium(parameterized.TestCase):
-
@parameterized.parameters(
- ('Clock:\tApr 22 2014\t02:14:37',
- datetime.datetime(2014, 4, 22, 2, 14, 37)),
- ('Clock:\tJul 10 2013\t14:26:44',
- datetime.datetime(2013, 7, 10, 14, 26, 44)),
- ('Clock:\tSep 29 2013\t17:35:34',
- datetime.datetime(2013, 9, 29, 17, 35, 34)),
+ ("Clock:\tApr 22 2014\t02:14:37", datetime.datetime(2014, 4, 22, 2, 14, 37)),
+ ("Clock:\tJul 10 2013\t14:26:44", datetime.datetime(2013, 7, 10, 14, 26, 44)),
+ ("Clock:\tSep 29 2013\t17:35:34", datetime.datetime(2013, 9, 29, 17, 35, 34)),
)
def test_parse_clock(self, datestr, datevalue):
- self.assertEqual(
- fsoptium._parse_clock(datestr),
- datevalue)
+ self.assertEqual(fsoptium._parse_clock(datestr), datevalue)
@parameterized.parameters(
- ('Apr 22 2014 02:14:37',),
- ('Clock:\tXxx 10 2013\t14:26',),
- ('Clock:\tSep 29 2013\t17:35:22.34',),
- ('Foo',)
+ ("Apr 22 2014 02:14:37",),
+ ("Clock:\tXxx 10 2013\t14:26",),
+ ("Clock:\tSep 29 2013\t17:35:22.34",),
+ ("Foo",),
)
def test_parse_clock_invalid(self, datestr):
with self.assertRaises(exceptions.InvalidResponse):
diff --git a/test/test_lifescan.py b/test/test_lifescan.py
index 3dc0aa6..b50b5d6 100755
--- a/test/test_lifescan.py
+++ b/test/test_lifescan.py
@@ -8,18 +8,13 @@
import array
from absl.testing import absltest
-
from glucometerutils.support import lifescan
class TestChecksum(absltest.TestCase):
def test_crc(self):
- self.assertEqual(
- 0x41cd,
- lifescan.crc_ccitt(b'\x02\x06\x06\x03'))
+ self.assertEqual(0x41CD, lifescan.crc_ccitt(b"\x02\x06\x06\x03"))
def test_crc_array(self):
- cmd_array = array.array('B', b'\x02\x06\x08\x03')
- self.assertEqual(
- 0x62C2,
- lifescan.crc_ccitt(cmd_array))
+ cmd_array = array.array("B", b"\x02\x06\x08\x03")
+ self.assertEqual(0x62C2, lifescan.crc_ccitt(cmd_array))
diff --git a/test/test_otultra2.py b/test/test_otultra2.py
index 6bea440..6b36602 100644
--- a/test/test_otultra2.py
+++ b/test/test_otultra2.py
@@ -8,36 +8,36 @@
from unittest import mock
from absl.testing import parameterized
-
+from glucometerutils import exceptions
from glucometerutils.drivers import otultra2
from glucometerutils.support import lifescan
-from glucometerutils import exceptions
class TestOTUltra2(parameterized.TestCase):
-
def test_checksum(self):
- checksum = otultra2._calculate_checksum(b'T')
+ checksum = otultra2._calculate_checksum(b"T")
self.assertEqual(0x0054, checksum)
def test_checksum_full(self):
- checksum = otultra2._calculate_checksum(
- b'T "SAT","08/03/13","22:12:00 "')
+ checksum = otultra2._calculate_checksum(b'T "SAT","08/03/13","22:12:00 "')
self.assertEqual(0x0608, checksum)
@parameterized.named_parameters(
- ('_missing_checksum', b'INVALID', lifescan.MissingChecksum),
- ('_short', b'.\r', exceptions.InvalidResponse),
- ('_generic', b'% 2500\r', exceptions.InvalidResponse),
- ('_invalid_serial_number', b'@ "12345678O" 0297\r',
- lifescan.InvalidSerialNumber),
- ('_invalid_checksum', b'% 1337\r', exceptions.InvalidChecksum),
- ('_broken_checksum', b'% 13AZ\r', lifescan.MissingChecksum),
+ ("_missing_checksum", b"INVALID", lifescan.MissingChecksum),
+ ("_short", b".\r", exceptions.InvalidResponse),
+ ("_generic", b"% 2500\r", exceptions.InvalidResponse),
+ (
+ "_invalid_serial_number",
+ b'@ "12345678O" 0297\r',
+ lifescan.InvalidSerialNumber,
+ ),
+ ("_invalid_checksum", b"% 1337\r", exceptions.InvalidChecksum),
+ ("_broken_checksum", b"% 13AZ\r", lifescan.MissingChecksum),
)
def test_invalid_response(self, returned_string, expected_exception):
- with mock.patch('serial.Serial') as mock_serial:
+ with mock.patch("serial.Serial") as mock_serial:
mock_serial.return_value.readline.return_value = returned_string
- device = otultra2.Device('mockdevice')
+ device = otultra2.Device("mockdevice")
with self.assertRaises(expected_exception):
device.get_serial_number()
diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py
index 252b7c7..c6fce83 100644
--- a/test/test_otultraeasy.py
+++ b/test/test_otultraeasy.py
@@ -6,19 +6,18 @@
# pylint: disable=protected-access,missing-docstring
from absl.testing import absltest
-
from glucometerutils.drivers import otultraeasy
class ConstructTest(absltest.TestCase):
-
def test_make_packet_ack(self):
self.assertEqual(
- b'\x02\x06\x08\x03\xc2\x62',
- otultraeasy._make_packet(b'', False, False, False, True))
+ b"\x02\x06\x08\x03\xc2\x62",
+ otultraeasy._make_packet(b"", False, False, False, True),
+ )
def test_make_packet_version_request(self):
self.assertEqual(
- b'\x02\x09\x03\x05\x0d\x02\x03\x08\x9f',
- otultraeasy._make_packet(
- b'\x05\x0d\x02', True, True, False, False))
+ b"\x02\x09\x03\x05\x0d\x02\x03\x08\x9f",
+ otultraeasy._make_packet(b"\x05\x0d\x02", True, True, False, False),
+ )
diff --git a/test/test_td4277.py b/test/test_td4277.py
index 6f4ff9a..fbd4aa2 100644
--- a/test/test_td4277.py
+++ b/test/test_td4277.py
@@ -8,24 +8,21 @@
import datetime
from absl.testing import parameterized
-
+from glucometerutils import exceptions
from glucometerutils.drivers import td4277
from glucometerutils.support import lifescan
-from glucometerutils import exceptions
class TestTD4277Nexus(parameterized.TestCase):
-
@parameterized.parameters(
- (b'\x21\x24\x0e\x15', datetime.datetime(2018, 1, 1, 21, 14)),
- (b'\x21\x26\x0e\x15', datetime.datetime(2019, 1, 1, 21, 14)),
- (b'\x04\x27\x25\x0d', datetime.datetime(2019, 8, 4, 13, 37)),
+ (b"\x21\x24\x0e\x15", datetime.datetime(2018, 1, 1, 21, 14)),
+ (b"\x21\x26\x0e\x15", datetime.datetime(2019, 1, 1, 21, 14)),
+ (b"\x04\x27\x25\x0d", datetime.datetime(2019, 8, 4, 13, 37)),
)
def test_parse_datetime(self, message, date):
- self.assertEqual(td4277._parse_datetime(message),
- date)
+ self.assertEqual(td4277._parse_datetime(message), date)
def test_making_message(self):
self.assertEqual(
- td4277._make_packet(0x22, 0),
- b'\x51\x22\x00\x00\x00\x00\xa3\x16')
+ td4277._make_packet(0x22, 0), b"\x51\x22\x00\x00\x00\x00\xa3\x16"
+ )