summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers
diff options
context:
space:
mode:
authorBen <b-schaefer@posteo.de>2020-02-21 10:45:40 +0100
committerDiego Elio Pettenò <flameeyes@flameeyes.com>2020-03-08 00:36:39 +0100
commite72b02d84e7f67cdf6107862ad580e951a5bbda1 (patch)
tree0887513d2478f55b27abccfeb307f313231bd994 /glucometerutils/drivers
parentpre-commit guide in README (diff)
downloadglucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.gz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.bz2
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.lz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.xz
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.zst
glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.zip
Diffstat (limited to 'glucometerutils/drivers')
-rw-r--r--glucometerutils/drivers/accuchek_reports.py73
-rw-r--r--glucometerutils/drivers/contourusb.py39
-rw-r--r--glucometerutils/drivers/fsinsulinx.py55
-rw-r--r--glucometerutils/drivers/fslibre.py184
-rw-r--r--glucometerutils/drivers/fsoptium.py134
-rw-r--r--glucometerutils/drivers/fsprecisionneo.py61
-rw-r--r--glucometerutils/drivers/otultra2.py112
-rw-r--r--glucometerutils/drivers/otultraeasy.py149
-rw-r--r--glucometerutils/drivers/otverio2015.py123
-rw-r--r--glucometerutils/drivers/otverioiq.py104
-rw-r--r--glucometerutils/drivers/sdcodefree.py108
-rw-r--r--glucometerutils/drivers/td4277.py129
12 files changed, 638 insertions, 633 deletions
diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py
index 06b69bc..c4d7527 100644
--- a/glucometerutils/drivers/accuchek_reports.py
+++ b/glucometerutils/drivers/accuchek_reports.py
@@ -19,45 +19,46 @@ import datetime
import glob
import os
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base
_UNIT_MAP = {
- 'mmol/l': common.Unit.MMOL_L,
- 'mg/dl': common.Unit.MG_DL,
+ "mmol/l": common.Unit.MMOL_L,
+ "mg/dl": common.Unit.MG_DL,
}
-_DATE_CSV_KEY = 'Date'
-_TIME_CSV_KEY = 'Time'
-_RESULT_CSV_KEY = 'Result'
-_UNIT_CSV_KEY = 'Unit'
-_TEMPWARNING_CSV_KEY = 'Temperature warning' # ignored
-_OUTRANGE_CSV_KEY = 'Out of target range' # ignored
-_OTHER_CSV_KEY = 'Other' # ignored
-_BEFORE_MEAL_CSV_KEY = 'Before meal'
-_AFTER_MEAL_CSV_KEY = 'After meal'
+_DATE_CSV_KEY = "Date"
+_TIME_CSV_KEY = "Time"
+_RESULT_CSV_KEY = "Result"
+_UNIT_CSV_KEY = "Unit"
+_TEMPWARNING_CSV_KEY = "Temperature warning" # ignored
+_OUTRANGE_CSV_KEY = "Out of target range" # ignored
+_OTHER_CSV_KEY = "Other" # ignored
+_BEFORE_MEAL_CSV_KEY = "Before meal"
+_AFTER_MEAL_CSV_KEY = "After meal"
# Control test has extra whitespace which is not ignored.
-_CONTROL_CSV_KEY = 'Control test' + ' '*197
+_CONTROL_CSV_KEY = "Control test" + " " * 197
-_DATE_FORMAT = '%d.%m.%Y'
-_TIME_FORMAT = '%H:%M'
+_DATE_FORMAT = "%d.%m.%Y"
+_TIME_FORMAT = "%H:%M"
-_DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT))
+_DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT))
class Device(driver_base.GlucometerDriver):
def __init__(self, device):
if not device or not os.path.isdir(device):
raise exceptions.CommandLineError(
- '--device parameter is required, should point to mount path '
- 'for the meter.')
+ "--device parameter is required, should point to mount path "
+ "for the meter."
+ )
- reports_path = os.path.join(device, '*', 'Reports', '*.csv')
+ reports_path = os.path.join(device, "*", "Reports", "*.csv")
report_files = glob.glob(reports_path)
if not report_files:
raise exceptions.ConnectionFailed(
- 'No report file found in path "%s".' % reports_path)
+ 'No report file found in path "%s".' % reports_path
+ )
self.report_file = report_files[0]
@@ -68,35 +69,32 @@ class Device(driver_base.GlucometerDriver):
next(self.report)
return csv.DictReader(
- self.report,
- delimiter=';',
- skipinitialspace=True,
- quoting=csv.QUOTE_NONE)
+ self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE
+ )
def connect(self):
- self.report = open(
- self.report_file, 'r', newline='\r\n', encoding='utf-8')
+ self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8")
def disconnect(self):
self.report.close()
def get_meter_info(self):
return common.MeterInfo(
- '%s glucometer' % self.get_model(),
+ "%s glucometer" % self.get_model(),
serial_number=self.get_serial_number(),
- native_unit=self.get_glucose_unit())
+ native_unit=self.get_glucose_unit(),
+ )
def get_model(self):
# $device/MODEL/Reports/*.csv
- return os.path.basename(
- os.path.dirname(os.path.dirname(self.report_file)))
+ return os.path.basename(os.path.dirname(os.path.dirname(self.report_file)))
def get_serial_number(self):
self.report.seek(0)
# ignore the first line.
next(self.report)
# The second line of the CSV is serial-no;report-date;report-time;;;;;;;
- return next(self.report).split(';')[0]
+ return next(self.report).split(";")[0]
def get_glucose_unit(self):
# Get the first record available and parse that.
@@ -115,13 +113,12 @@ class Device(driver_base.GlucometerDriver):
def _extract_datetime(self, record): # pylint: disable=no-self-use
# Date and time are in separate column, but we want to parse them
# together.
- date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY]))
+ date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY]))
return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT)
def _extract_meal(self, record): # pylint: disable=no-self-use
if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]:
- raise exceptions.InvalidResponse(
- 'Reading cannot be before and after meal.')
+ raise exceptions.InvalidResponse("Reading cannot be before and after meal.")
elif record[_AFTER_MEAL_CSV_KEY]:
return common.Meal.AFTER
elif record[_BEFORE_MEAL_CSV_KEY]:
@@ -139,5 +136,7 @@ class Device(driver_base.GlucometerDriver):
common.convert_glucose_unit(
float(record[_RESULT_CSV_KEY]),
_UNIT_MAP[record[_UNIT_CSV_KEY]],
- common.Unit.MG_DL),
- meal=self._extract_meal(record))
+ common.Unit.MG_DL,
+ ),
+ meal=self._extract_meal(record),
+ )
diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py
index 397eb4f..5c9ed11 100644
--- a/glucometerutils/drivers/contourusb.py
+++ b/glucometerutils/drivers/contourusb.py
@@ -24,43 +24,44 @@ from glucometerutils import common
from glucometerutils.support import contourusb, driver_base
-def _extract_timestamp(parsed_record, prefix=''):
+def _extract_timestamp(parsed_record, prefix=""):
"""Extract the timestamp from a parsed record.
This leverages the fact that all the reading records have the same base structure.
"""
- datetime_str = parsed_record['datetime']
+ datetime_str = parsed_record["datetime"]
return datetime.datetime(
- int(datetime_str[0:4]), #year
- int(datetime_str[4:6]), #month
- int(datetime_str[6:8]), #day
- int(datetime_str[8:10]), #hour
- int(datetime_str[10:12]), #minute
- 0)
+ int(datetime_str[0:4]), # year
+ int(datetime_str[4:6]), # month
+ int(datetime_str[6:8]), # day
+ int(datetime_str[8:10]), # hour
+ int(datetime_str[10:12]), # minute
+ 0,
+ )
class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver):
"""Glucometer driver for FreeStyle Libre devices."""
- USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
def get_meter_info(self):
self._get_info_record()
return common.MeterInfo(
- 'Contour USB',
+ "Contour USB",
serial_number=self._get_serial_number(),
- version_info=(
- 'Meter versions: ' + self._get_version(),),
- native_unit= self.get_glucose_unit())
+ version_info=("Meter versions: " + self._get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
- if self._get_glucose_unit() == '0':
+ if self._get_glucose_unit() == "0":
return common.Unit.MG_DL
else:
return common.Unit.MMOL_L
-
+
def get_readings(self):
"""
Get reading dump from download data mode(all readings stored)
@@ -69,10 +70,10 @@ class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver):
for parsed_record in self._get_multirecord():
yield common.GlucoseReading(
_extract_timestamp(parsed_record),
- int(parsed_record['value']),
- comment=parsed_record['markers'],
- measure_method=common.MeasurementMethod.BLOOD_SAMPLE
- )
+ int(parsed_record["value"]),
+ comment=parsed_record["markers"],
+ measure_method=common.MeasurementMethod.BLOOD_SAMPLE,
+ )
def get_serial_number(self):
raise NotImplementedError
diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py
index f3cf043..5465b3a 100644
--- a/glucometerutils/drivers/fsinsulinx.py
+++ b/glucometerutils/drivers/fsinsulinx.py
@@ -22,20 +22,30 @@ import datetime
from glucometerutils import common
from glucometerutils.support import freestyle
-
# The type is a string because it precedes the parsing of the object.
-_TYPE_GLUCOSE_READING = '0'
-
-_InsulinxReading = collections.namedtuple('_InsulinxReading', (
- 'type', # 0 = blood glucose
- 'id',
- 'month', 'day', 'year', # year is two-digits
- 'hour', 'minute',
- 'unknown1', 'unknown2', 'unknown3',
- 'unknown4', 'unknown5', 'unknown6',
- 'value',
- 'unknown7', 'unknown8',
-))
+_TYPE_GLUCOSE_READING = "0"
+
+_InsulinxReading = collections.namedtuple(
+ "_InsulinxReading",
+ (
+ "type", # 0 = blood glucose
+ "id",
+ "month",
+ "day",
+ "year", # year is two-digits
+ "hour",
+ "minute",
+ "unknown1",
+ "unknown2",
+ "unknown3",
+ "unknown4",
+ "unknown5",
+ "unknown6",
+ "value",
+ "unknown7",
+ "unknown8",
+ ),
+)
class Device(freestyle.FreeStyleHidDevice):
@@ -46,11 +56,11 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle InsuLinx',
+ "FreeStyle InsuLinx",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self._get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -58,7 +68,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b'$result?'):
+ for record in self._get_multirecord(b"$result?"):
if not record or record[0] != _TYPE_GLUCOSE_READING:
continue
@@ -67,11 +77,14 @@ class Device(freestyle.FreeStyleHidDevice):
raw_reading = _InsulinxReading._make([int(v) for v in record])
timestamp = datetime.datetime(
- raw_reading.year + 2000, raw_reading.month, raw_reading.day,
- raw_reading.hour, raw_reading.minute)
+ raw_reading.year + 2000,
+ raw_reading.month,
+ raw_reading.day,
+ raw_reading.hour,
+ raw_reading.minute,
+ )
yield common.GlucoseReading(timestamp, raw_reading.value)
def zero_log(self):
raise NotImplementedError
-
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py
index f1ac525..29e821a 100644
--- a/glucometerutils/drivers/fslibre.py
+++ b/glucometerutils/drivers/fslibre.py
@@ -27,52 +27,47 @@ from glucometerutils.support import freestyle
# Fields of the records returned by both $history and $arresult?
# Tuple of pairs of idx and field name
_BASE_ENTRY_MAP = (
- (0, 'device_id'),
- (1, 'type'),
- (2, 'month'),
- (3, 'day'),
- (4, 'year'), # 2-digits
- (5, 'hour'),
- (6, 'minute'),
- (7, 'second'),
+ (0, "device_id"),
+ (1, "type"),
+ (2, "month"),
+ (3, "day"),
+ (4, "year"), # 2-digits
+ (5, "hour"),
+ (6, "minute"),
+ (7, "second"),
)
# Fields of the records returned by $history?
-_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + (
- (13, 'value'),
- (15, 'errors'),
-)
+_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ((13, "value"), (15, "errors"),)
# Fields of the results returned by $arresult? where type = 2
_ARRESULT_TYPE2_ENTRY_MAP = (
- (9, 'reading-type'), # 0 = glucose blood strip,
- # 1 = ketone blood strip,
- # 2 = glucose sensor
- (12, 'value'),
- (15, 'sport-flag'),
- (16, 'medication-flag'),
- (17, 'rapid-acting-flag'), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
- (18, 'long-acting-flag'),
- (19, 'custom-comments-bitfield'),
- (23, 'double-long-acting-insulin'),
- (25, 'food-flag'),
- (26, 'food-carbs-grams'),
- (28, 'errors'),
+ (9, "reading-type"), # 0 = glucose blood strip,
+ # 1 = ketone blood strip,
+ # 2 = glucose sensor
+ (12, "value"),
+ (15, "sport-flag"),
+ (16, "medication-flag"),
+ (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
+ (18, "long-acting-flag"),
+ (19, "custom-comments-bitfield"),
+ (23, "double-long-acting-insulin"),
+ (25, "food-flag"),
+ (26, "food-carbs-grams"),
+ (28, "errors"),
)
_ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = (
- (9, 'old_month'),
- (10, 'old_day'),
- (11, 'old_year'),
- (12, 'old_hour'),
- (13, 'old_minute'),
- (14, 'old_second'),
+ (9, "old_month"),
+ (10, "old_day"),
+ (11, "old_year"),
+ (12, "old_hour"),
+ (13, "old_minute"),
+ (14, "old_second"),
)
# Fields only valid when rapid-acting-flag is "1"
-_ARRESULT_RAPID_INSULIN_ENTRY_MAP = (
- (43, 'double-rapid-acting-insulin'),
-)
+_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),)
def _parse_record(record, entry_map):
@@ -82,26 +77,25 @@ def _parse_record(record, entry_map):
return {}
try:
- return {
- key: int(record[idx]) for idx, key in entry_map
- }
+ return {key: int(record[idx]) for idx, key in entry_map}
except IndexError:
return {}
-def _extract_timestamp(parsed_record, prefix=''):
+def _extract_timestamp(parsed_record, prefix=""):
"""Extract the timestamp from a parsed record.
This leverages the fact that all the records have the same base structure.
"""
return datetime.datetime(
- parsed_record[prefix + 'year'] + 2000,
- parsed_record[prefix + 'month'],
- parsed_record[prefix + 'day'],
- parsed_record[prefix + 'hour'],
- parsed_record[prefix + 'minute'],
- parsed_record[prefix + 'second'])
+ parsed_record[prefix + "year"] + 2000,
+ parsed_record[prefix + "month"],
+ parsed_record[prefix + "day"],
+ parsed_record[prefix + "hour"],
+ parsed_record[prefix + "minute"],
+ parsed_record[prefix + "second"],
+ )
def _parse_arresult(record):
@@ -112,24 +106,23 @@ def _parse_arresult(record):
# There are other record types, but we don't currently need to expose these.
if not parsed_record:
return None
- elif parsed_record['type'] == 2:
+ elif parsed_record["type"] == 2:
parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP))
- elif parsed_record['type'] == 5:
+ elif parsed_record["type"] == 5:
parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP))
return common.TimeAdjustment(
_extract_timestamp(parsed_record),
- _extract_timestamp(parsed_record, 'old_'),
- extra_data={'device_id': parsed_record['device_id']},
+ _extract_timestamp(parsed_record, "old_"),
+ extra_data={"device_id": parsed_record["device_id"]},
)
else:
return None
# Check right away if we have rapid insulin
- if parsed_record['rapid-acting-flag']:
- parsed_record.update(
- _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))
+ if parsed_record["rapid-acting-flag"]:
+ parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))
- if parsed_record['errors']:
+ if parsed_record["errors"]:
return None
comment_parts = []
@@ -137,68 +130,69 @@ def _parse_arresult(record):
cls = None
value = None
- if parsed_record['reading-type'] == 2:
- comment_parts.append('(Scan)')
+ if parsed_record["reading-type"] == 2:
+ comment_parts.append("(Scan)")
measure_method = common.MeasurementMethod.CGM
cls = common.GlucoseReading
- value = parsed_record['value']
- elif parsed_record['reading-type'] == 0:
- comment_parts.append('(Blood)')
+ value = parsed_record["value"]
+ elif parsed_record["reading-type"] == 0:
+ comment_parts.append("(Blood)")
measure_method = common.MeasurementMethod.BLOOD_SAMPLE
cls = common.GlucoseReading
- value = parsed_record['value']
- elif parsed_record['reading-type'] == 1:
- comment_parts.append('(Ketone)')
+ value = parsed_record["value"]
+ elif parsed_record["reading-type"] == 1:
+ comment_parts.append("(Ketone)")
measure_method = common.MeasurementMethod.BLOOD_SAMPLE
cls = common.KetoneReading
# automatically convert the raw value in mmol/L
- value = freestyle.convert_ketone_unit(parsed_record['value'])
+ value = freestyle.convert_ketone_unit(parsed_record["value"])
else:
# unknown reading
return None
custom_comments = record[29:35]
for comment_index in range(6):
- if parsed_record['custom-comments-bitfield'] & (1 << comment_index):
+ if parsed_record["custom-comments-bitfield"] & (1 << comment_index):
comment_parts.append(custom_comments[comment_index][1:-1])
- if parsed_record['sport-flag']:
- comment_parts.append('Sport')
+ if parsed_record["sport-flag"]:
+ comment_parts.append("Sport")
- if parsed_record['medication-flag']:
- comment_parts.append('Medication')
+ if parsed_record["medication-flag"]:
+ comment_parts.append("Medication")
- if parsed_record['food-flag']:
- if parsed_record['food-carbs-grams']:
- comment_parts.append(
- 'Food (%d g)' % parsed_record['food-carbs-grams'])
+ if parsed_record["food-flag"]:
+ if parsed_record["food-carbs-grams"]:
+ comment_parts.append("Food (%d g)" % parsed_record["food-carbs-grams"])
else:
- comment_parts.append('Food')
+ comment_parts.append("Food")
- if parsed_record['long-acting-flag']:
- if parsed_record['double-long-acting-insulin']:
+ if parsed_record["long-acting-flag"]:
+ if parsed_record["double-long-acting-insulin"]:
comment_parts.append(
- 'Long-acting insulin (%.1f)' %
- (parsed_record['double-long-acting-insulin']/2.))
+ "Long-acting insulin (%.1f)"
+ % (parsed_record["double-long-acting-insulin"] / 2.0)
+ )
else:
- comment_parts.append('Long-acting insulin')
+ comment_parts.append("Long-acting insulin")
- if parsed_record['rapid-acting-flag']:
+ if parsed_record["rapid-acting-flag"]:
# provide default value, as this record does not always exist
# (even if rapid-acting-flag is set)
- if parsed_record.get('double-rapid-acting-insulin', 0):
+ if parsed_record.get("double-rapid-acting-insulin", 0):
comment_parts.append(
- 'Rapid-acting insulin (%.1f)' %
- (parsed_record['double-rapid-acting-insulin']/2.))
+ "Rapid-acting insulin (%.1f)"
+ % (parsed_record["double-rapid-acting-insulin"] / 2.0)
+ )
else:
- comment_parts.append('Rapid-acting insulin')
+ comment_parts.append("Rapid-acting insulin")
return cls(
_extract_timestamp(parsed_record),
value,
- comment='; '.join(comment_parts),
+ comment="; ".join(comment_parts),
measure_method=measure_method,
- extra_data={'device_id': parsed_record['device_id']},
+ extra_data={"device_id": parsed_record["device_id"]},
)
@@ -210,16 +204,16 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle Libre',
+ "FreeStyle Libre",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
+ version_info=("Software version: " + self._get_version(),),
native_unit=self.get_glucose_unit(),
- patient_name=self.get_patient_name())
+ patient_name=self.get_patient_name(),
+ )
def get_serial_number(self):
"""Overridden function as the command is not compatible."""
- return self._send_text_command(b'$sn?').rstrip('\r\n')
+ return self._send_text_command(b"$sn?").rstrip("\r\n")
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -231,27 +225,27 @@ class Device(freestyle.FreeStyleHidDevice):
# First of all get the usually longer list of sensor readings, and
# convert them to Readings objects.
- for record in self._get_multirecord(b'$history?'):
+ for record in self._get_multirecord(b"$history?"):
parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)
- if not parsed_record or parsed_record['errors'] != 0:
+ if not parsed_record or parsed_record["errors"] != 0:
# The reading is considered invalid, so ignore it.
continue
yield common.GlucoseReading(
_extract_timestamp(parsed_record),
- parsed_record['value'],
- comment='(Sensor)',
+ parsed_record["value"],
+ comment="(Sensor)",
measure_method=common.MeasurementMethod.CGM,
- extra_data={'device_id': parsed_record['device_id']},
+ extra_data={"device_id": parsed_record["device_id"]},
)
# Then get the results of explicit scans and blood tests (and other
# events).
- for record in self._get_multirecord(b'$arresult?'):
+ for record in self._get_multirecord(b"$arresult?"):
reading = _parse_arresult(record)
if reading:
yield reading
def zero_log(self):
- self._send_text_command(b'$resetpatient')
+ self._send_text_command(b"$resetpatient")
diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py
index 66b23ca..5c3971e 100644
--- a/glucometerutils/drivers/fsoptium.py
+++ b/glucometerutils/drivers/fsoptium.py
@@ -20,13 +20,13 @@ import datetime
import logging
import re
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
_CLOCK_RE = re.compile(
- r'^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t'
- r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$')
+ r"^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t"
+ r"(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$"
+)
# The reading can be HI (padded to three-characters by a space) if the value was
# over what the meter was supposed to read. Unlike the "Clock:" line, the months
@@ -34,33 +34,33 @@ _CLOCK_RE = re.compile(
# characters, so accept a space or 'e'/'y' at the end of the month name. Also,
# the time does *not* include seconds.
_READING_RE = re.compile(
- r'^(?P<reading>HI |[0-9]{3}) '
- r'(?P<month>[A-Z][a-z]{2})[ ey] '
- r'(?P<day>[0-9]{2}) '
- r'(?P<year>[0-9]{4}) '
- r'(?P<time>[0-9]{2}:[0-9]{2}) '
- r'(?P<type>[GK]) 0x00$')
+ r"^(?P<reading>HI |[0-9]{3}) "
+ r"(?P<month>[A-Z][a-z]{2})[ ey] "
+ r"(?P<day>[0-9]{2}) "
+ r"(?P<year>[0-9]{4}) "
+ r"(?P<time>[0-9]{2}:[0-9]{2}) "
+ r"(?P<type>[GK]) 0x00$"
+)
-_CHECKSUM_RE = re.compile(
- r'^(?P<checksum>0x[0-9A-F]{4}) END$')
+_CHECKSUM_RE = re.compile(r"^(?P<checksum>0x[0-9A-F]{4}) END$")
# There are two date format used by the device. One uses three-letters month
# names, and that's easy enough. The other uses three-letters month names,
# except for (at least) July. So ignore the fourth character.
# explicit mapping. Note that the mapping *requires* a trailing whitespace.
_MONTH_MATCHES = {
- 'Jan': 1,
- 'Feb': 2,
- 'Mar': 3,
- 'Apr': 4,
- 'May': 5,
- 'Jun': 6,
- 'Jul': 7,
- 'Aug': 8,
- 'Sep': 9,
- 'Oct': 10,
- 'Nov': 11,
- 'Dec': 12
+ "Jan": 1,
+ "Feb": 2,
+ "Mar": 3,
+ "Apr": 4,
+ "May": 5,
+ "Jun": 6,
+ "Jul": 7,
+ "Aug": 8,
+ "Sep": 9,
+ "Oct": 10,
+ "Nov": 11,
+ "Dec": 12,
}
@@ -75,60 +75,59 @@ def _parse_clock(datestr):
raise exceptions.InvalidResponse(datestr)
# int() parses numbers in decimal, so we don't have to worry about '08'
- day = int(match.group('day'))
- month = _MONTH_MATCHES[match.group('month')]
- year = int(match.group('year'))
+ day = int(match.group("day"))
+ month = _MONTH_MATCHES[match.group("month")]
+ year = int(match.group("year"))
- hour, minute, second = (int (x) for x in match.group('time').split(':'))
+ hour, minute, second = (int(x) for x in match.group("time").split(":"))
return datetime.datetime(year, month, day, hour, minute, second)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 19200
- DEFAULT_CABLE_ID = '1a61:3420'
+ DEFAULT_CABLE_ID = "1a61:3420"
def _send_command(self, command):
- cmd_bytes = bytes('$%s\r\n' % command, 'ascii')
- logging.debug('Sending command: %r', cmd_bytes)
+ cmd_bytes = bytes("$%s\r\n" % command, "ascii")
+ logging.debug("Sending command: %r", cmd_bytes)
self.serial_.write(cmd_bytes)
self.serial_.flush()
response = self.serial_.readlines()
- logging.debug('Received response: %r', response)
+ logging.debug("Received response: %r", response)
# We always want to decode the output, and remove stray \r\n. Any
# failure in decoding means the output is invalid anyway.
- decoded_response = [line.decode('ascii').rstrip('\r\n')
- for line in response]
+ decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response]
return decoded_response
def connect(self):
- self._send_command('xmem') # ignore output this time
+ self._send_command("xmem") # ignore output this time
self._fetch_device_information()
def disconnect(self): # pylint: disable=no-self-use
return
def _fetch_device_information(self):
- data = self._send_command('colq')
+ data = self._send_command("colq")
for line in data:
- parsed_line = line.split('\t')
+ parsed_line = line.split("\t")
- if parsed_line[0] == 'S/N:':
+ if parsed_line[0] == "S/N:":
self.device_serialno_ = parsed_line[1]
- elif parsed_line[0] == 'Ver:':
+ elif parsed_line[0] == "Ver:":
self.device_version_ = parsed_line[1]
- if parsed_line[2] == 'MMOL':
+ if parsed_line[2] == "MMOL":
self.device_glucose_unit_ = common.Unit.MMOL_L
else: # I only have a mmol/l device, so I can't be sure.
self.device_glucose_unit_ = common.Unit.MG_DL
# There are more entries: Clock, Market, ROM and Usage, but we don't
# care for those here.
- elif parsed_line[0] == 'CMD OK':
+ elif parsed_line[0] == "CMD OK":
return
# I have not figured out why this happens, but sometimes it's echoing
@@ -142,11 +141,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
A common.MeterInfo object.
"""
return common.MeterInfo(
- 'Freestyle Optium glucometer',
+ "Freestyle Optium glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
"""Returns an identifier of the firmware version of the glucometer.
@@ -179,21 +178,21 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Returns:
A datetime object built according to the returned response.
"""
- data = self._send_command('colq')
+ data = self._send_command("colq")
for line in data:
- if not line.startswith('Clock:'):
+ if not line.startswith("Clock:"):
continue
return _parse_clock(line)
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
def _set_device_datetime(self, date):
- data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M'))
+ data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M"))
- parsed_data = ''.join(data)
- if parsed_data != 'CMD OK':
+ parsed_data = "".join(data)
+ if parsed_data != "CMD OK":
raise exceptions.InvalidResponse(parsed_data)
return self.get_datetime()
@@ -216,50 +215,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
expected.
"""
- data = self._send_command('xmem')
+ data = self._send_command("xmem")
# The first line is empty, the second is the serial number, the third
# the version, the fourth the current time, and the fifth the record
# count.. The last line has a checksum and the end.
count = int(data[4])
if count != (len(data) - 6):
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
# Extract the checksum from the last line.
checksum_match = _CHECKSUM_RE.match(data[-1])
if not checksum_match:
- raise exceptions.InvalidResponse('\n'.join(data))
+ raise exceptions.InvalidResponse("\n".join(data))
- expected_checksum = int(checksum_match.group('checksum'), 16)
+ expected_checksum = int(checksum_match.group("checksum"), 16)
# exclude the last line in the checksum calculation, as that's the
# checksum itself. The final \r\n is added separately.
- calculated_checksum = sum(
- ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa
+ calculated_checksum = sum(ord(c) for c in "\r\n".join(data[:-1])) + 0xD + 0xA
if expected_checksum != calculated_checksum:
- raise exceptions.InvalidChecksum(
- expected_checksum, calculated_checksum)
+ raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum)
for line in data[5:-1]:
match = _READING_RE.match(line)
if not match:
raise exceptions.InvalidResponse(line)
- if match.group('type') != 'G':
- logging.warning(
- 'Non-glucose readings are not supported, ignoring.')
+ if match.group("type") != "G":
+ logging.warning("Non-glucose readings are not supported, ignoring.")
continue
- if match.group('reading') == 'HI ':
+ if match.group("reading") == "HI ":
value = float("inf")
else:
- value = float(match.group('reading'))
+ value = float(match.group("reading"))
- day = int(match.group('day'))
- month = _MONTH_MATCHES[match.group('month')]
- year = int(match.group('year'))
+ day = int(match.group("day"))
+ month = _MONTH_MATCHES[match.group("month")]
+ year = int(match.group("year"))
- hour, minute = map(int, match.group('time').split(':'))
+ hour, minute = map(int, match.group("time").split(":"))
timestamp = datetime.datetime(year, month, day, hour, minute)
diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py
index 58564e5..909fed8 100644
--- a/glucometerutils/drivers/fsprecisionneo.py
+++ b/glucometerutils/drivers/fsprecisionneo.py
@@ -27,25 +27,30 @@ import datetime
from glucometerutils import common
from glucometerutils.support import freestyle
-
# The type is a string because it precedes the parsing of the object.
-_TYPE_GLUCOSE_READING = '7'
-_TYPE_KETONE_READING = '9'
-
-_NeoReading = collections.namedtuple('_NeoReading', (
- 'type', # 7 = blood glucose, 9 = blood ketone
- 'id',
- 'month', 'day', 'year', # year is two-digits
- 'hour', 'minute',
- 'unknown2',
- 'value',
- # Extra trailing and so-far-unused fields; so discard them:
- # * for blood glucose: 10 unknown trailing fields
- #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
- #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
- # * for blood ketone: 2 unknown trailing fields
- #'unknown3', 'unknown4',
-))
+_TYPE_GLUCOSE_READING = "7"
+_TYPE_KETONE_READING = "9"
+
+_NeoReading = collections.namedtuple(
+ "_NeoReading",
+ (
+ "type", # 7 = blood glucose, 9 = blood ketone
+ "id",
+ "month",
+ "day",
+ "year", # year is two-digits
+ "hour",
+ "minute",
+ "unknown2",
+ "value",
+ # Extra trailing and so-far-unused fields; so discard them:
+ # * for blood glucose: 10 unknown trailing fields
+ #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
+ #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
+ # * for blood ketone: 2 unknown trailing fields
+ #'unknown3', 'unknown4',
+ ),
+)
class Device(freestyle.FreeStyleHidDevice):
@@ -56,12 +61,12 @@ class Device(freestyle.FreeStyleHidDevice):
def get_meter_info(self):
"""Return the device information in structured form."""
return common.MeterInfo(
- 'FreeStyle Precision Neo',
+ "FreeStyle Precision Neo",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self._get_version(),),
+ version_info=("Software version: " + self._get_version(),),
native_unit=self.get_glucose_unit(),
- patient_name=self.get_patient_name())
+ patient_name=self.get_patient_name(),
+ )
def get_glucose_unit(self): # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
@@ -69,7 +74,7 @@ class Device(freestyle.FreeStyleHidDevice):
def get_readings(self):
"""Iterate through the reading records in the device."""
- for record in self._get_multirecord(b'$result?'):
+ for record in self._get_multirecord(b"$result?"):
cls = None
if record and record[0] == _TYPE_GLUCOSE_READING:
cls = common.GlucoseReading
@@ -85,11 +90,15 @@ class Device(freestyle.FreeStyleHidDevice):
if value == "HI":
value = float("inf")
values.append(int(value))
- raw_reading = _NeoReading._make(values[:len(_NeoReading._fields)])
+ raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)])
timestamp = datetime.datetime(
- raw_reading.year + 2000, raw_reading.month, raw_reading.day,
- raw_reading.hour, raw_reading.minute)
+ raw_reading.year + 2000,
+ raw_reading.month,
+ raw_reading.day,
+ raw_reading.hour,
+ raw_reading.minute,
+ )
if record and record[0] == _TYPE_KETONE_READING:
value = freestyle.convert_ketone_unit(raw_reading.value)
diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py
index 39be859..5e90b87 100644
--- a/glucometerutils/drivers/otultra2.py
+++ b/glucometerutils/drivers/otultra2.py
@@ -16,40 +16,40 @@ Expected device path: /dev/ttyUSB0 or similar serial port device.
import datetime
import re
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, serial
# The following two hashes are taken directly from LifeScan's documentation
_MEAL_CODES = {
- 'N': common.Meal.NONE,
- 'B': common.Meal.BEFORE,
- 'A': common.Meal.AFTER,
+ "N": common.Meal.NONE,
+ "B": common.Meal.BEFORE,
+ "A": common.Meal.AFTER,
}
_COMMENT_CODES = {
- '00': '', # would be 'No Comment'
- '01': 'Not Enough Food',
- '02': 'Too Much Food',
- '03': 'Mild Exercise',
- '04': 'Hard Exercise',
- '05': 'Medication',
- '06': 'Stress',
- '07': 'Illness',
- '08': 'Feel Hypo',
- '09': 'Menses',
- '10': 'Vacation',
- '11': 'Other',
+ "00": "", # would be 'No Comment'
+ "01": "Not Enough Food",
+ "02": "Too Much Food",
+ "03": "Mild Exercise",
+ "04": "Hard Exercise",
+ "05": "Medication",
+ "06": "Stress",
+ "07": "Illness",
+ "08": "Feel Hypo",
+ "09": "Menses",
+ "10": "Vacation",
+ "11": "Other",
}
-_DUMP_HEADER_RE = re.compile(
- r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"')
+_DUMP_HEADER_RE = re.compile(r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"')
_DUMP_LINE_RE = re.compile(
r'P (?P<datetime>"[A-Z]{3}","[0-9/]{8}","[0-9:]{8} "),'
r'"(?P<control>[C ]) (?P<value>[0-9]{3})(?P<parityerror>[\? ])",'
- r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00')
+ r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00'
+)
+
+_RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$")
-_RESPONSE_MATCH = re.compile(r'^(.+) ([0-9A-F]{4})\r$')
def _calculate_checksum(bytestring):
"""Calculate the checksum used by OneTouch Ultra and Ultra2 devices
@@ -66,10 +66,11 @@ def _calculate_checksum(bytestring):
checksum = 0
for byte in bytestring:
- checksum = (checksum + byte) & 0xffff
+ checksum = (checksum + byte) & 0xFFFF
return checksum
+
def _validate_and_strip_checksum(line):
"""Verify the simple 16-bit checksum and remove it from the line.
@@ -88,20 +89,19 @@ def _validate_and_strip_checksum(line):
try:
checksum_given = int(checksum_string, 16)
- checksum_calculated = _calculate_checksum(
- bytes(response, 'ascii'))
+ checksum_calculated = _calculate_checksum(bytes(response, "ascii"))
if checksum_given != checksum_calculated:
- raise exceptions.InvalidChecksum(checksum_given,
- checksum_calculated)
+ raise exceptions.InvalidChecksum(checksum_given, checksum_calculated)
except ValueError:
raise exceptions.InvalidChecksum(checksum_given, None)
return response
+
_DATETIME_RE = re.compile(
- r'^"[A-Z]{3}",'
- r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$')
+ r'^"[A-Z]{3}",' r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$'
+)
def _parse_datetime(response):
@@ -121,8 +121,8 @@ def _parse_datetime(response):
raise exceptions.InvalidResponse(response)
date, time = match.groups()
- month, day, year = map(int, date.split('/'))
- hour, minute, second = map(int, time.split(':'))
+ month, day, year = map(int, date.split("/"))
+ hour, minute, second = map(int, time.split(":"))
# Yes, OneTouch2's firmware is not Y2K safe.
return datetime.datetime(2000 + year, month, day, hour, minute, second)
@@ -130,7 +130,7 @@ def _parse_datetime(response):
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
def connect(self): # pylint: disable=no-self-use
return
@@ -144,7 +144,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Args:
cmd: command and parameters to send (without newline)
"""
- cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii')
+ cmdstring = bytes("\x11\r" + cmd + "\r", "ascii")
self.serial_.write(cmdstring)
self.serial_.flush()
@@ -160,7 +160,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
"""
self._send_command(cmd)
- line = self.serial_.readline().decode('ascii')
+ line = self.serial_.readline().decode("ascii")
return _validate_and_strip_checksum(line)
def get_meter_info(self):
@@ -170,11 +170,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
A common.MeterInfo object.
"""
return common.MeterInfo(
- 'OneTouch Ultra 2 glucometer',
+ "OneTouch Ultra 2 glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
"""Returns an identifier of the firmware version of the glucometer.
@@ -183,9 +183,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
The software version returned by the glucometer, such as
"P02.00.00 30/08/06".
"""
- response = self._send_oneliner_command('DM?')
+ response = self._send_oneliner_command("DM?")
- if response[0] != '?':
+ if response[0] != "?":
raise exceptions.InvalidResponse(response)
return response[1:]
@@ -204,7 +204,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
InvalidSerialNumber: if the returned serial number does not match
the OneTouch2 device as per specs.
"""
- response = self._send_oneliner_command('DM@')
+ response = self._send_oneliner_command("DM@")
match = self._SERIAL_NUMBER_RE.match(response)
if not match:
@@ -214,7 +214,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
# 'Y' at the far right of the serial number is the indication of a
# OneTouch Ultra2 device, as per specs.
- if serial_number[-1] != 'Y':
+ if serial_number[-1] != "Y":
raise lifescan.InvalidSerialNumber(serial_number)
return serial_number
@@ -225,12 +225,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
Returns:
A datetime object built according to the returned response.
"""
- response = self._send_oneliner_command('DMF')
+ response = self._send_oneliner_command("DMF")
return _parse_datetime(response[2:])
def _set_device_datetime(self, date):
response = self._send_oneliner_command(
- 'DMT' + date.strftime('%m/%d/%y %H:%M:%S'))
+ "DMT" + date.strftime("%m/%d/%y %H:%M:%S")
+ )
return _parse_datetime(response[2:])
def zero_log(self):
@@ -239,8 +240,8 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
This function will clear the memory of the device deleting all the
readings in an irrecoverable way.
"""
- response = self._send_oneliner_command('DMZ')
- if response != 'Z':
+ response = self._send_oneliner_command("DMZ")
+ if response != "Z":
raise exceptions.InvalidResponse(response)
_GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"')
@@ -260,15 +261,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
unit used for display. This is not settable by the user in all modern
meters.
"""
- response = self._send_oneliner_command('DMSU?')
+ response = self._send_oneliner_command("DMSU?")
match = self._GLUCOSE_UNIT_RE.match(response)
unit = match.group(1)
- if unit == 'MG/DL ':
+ if unit == "MG/DL ":
return common.Unit.MG_DL
- if unit == 'MMOL/L':
+ if unit == "MMOL/L":
return common.Unit.MMOL_L
raise exceptions.InvalidGlucoseUnit(response)
@@ -287,10 +288,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
expected.
"""
- self._send_command('DMP')
+ self._send_command("DMP")
data = self.serial_.readlines()
- header = data.pop(0).decode('ascii')
+ header = data.pop(0).decode("ascii")
match = _DUMP_HEADER_RE.match(header)
if not match:
raise exceptions.InvalidResponse(header)
@@ -299,7 +300,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
assert count == len(data)
for line in data:
- line = _validate_and_strip_checksum(line.decode('ascii'))
+ line = _validate_and_strip_checksum(line.decode("ascii"))
match = _DUMP_LINE_RE.match(line)
if not match:
@@ -307,11 +308,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
line_data = match.groupdict()
- date = _parse_datetime(line_data['datetime'])
- meal = _MEAL_CODES[line_data['meal']]
- comment = _COMMENT_CODES[line_data['comment']]
+ date = _parse_datetime(line_data["datetime"])
+ meal = _MEAL_CODES[line_data["meal"]]
+ comment = _COMMENT_CODES[line_data["comment"]]
# OneTouch2 always returns the data in mg/dL even if the glucometer
# is set to mmol/L, so there is no conversion required.
yield common.GlucoseReading(
- date, float(line_data['value']), meal=meal, comment=comment)
+ date, float(line_data["value"]), meal=meal, comment=comment
+ )
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 7f4934e..0d1e7a9 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -22,87 +22,97 @@ import logging
import construct
from glucometerutils import common
-from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial
+from glucometerutils.support import (
+ construct_extras,
+ driver_base,
+ lifescan,
+ lifescan_binary_protocol,
+ serial,
+)
_PACKET = lifescan_binary_protocol.LifeScanPacket(True)
_INVALID_RECORD = 501
-_COMMAND_SUCCESS = construct.Const(b'\x05\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x05\x06")
-_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02')
+_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+ "version" / construct.PascalString(construct.Byte, encoding="ascii"),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
- b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+)
_SERIAL_NUMBER_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'serial_number' / construct.GreedyString(encoding='ascii'),
+ _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"),
)
_DATETIME_REQUEST = construct.Struct(
- construct.Const(b'\x05\x20'), # 0x20 is the datetime
- 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02),
- 'timestamp' / construct.Default(
+ construct.Const(b"\x05\x20"), # 0x20 is the datetime
+ "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02),
+ "timestamp"
+ / construct.Default(
construct_extras.Timestamp(construct.Int32ul), # type: ignore
- datetime.datetime(1970, 1, 1, 0, 0)),
+ datetime.datetime(1970, 1, 1, 0, 0),
+ ),
)
_DATETIME_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
)
-_GLUCOSE_UNIT_REQUEST = construct.Const(
- b'\x05\x09\x02\x09\x00\x00\x00\x00')
+_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")
_READING_COUNT_RESPONSE = construct.Struct(
- construct.Const(b'\x0f'),
- 'count' / construct.Int16ul,
+ construct.Const(b"\x0f"), "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x05\x1f'),
- 'record_id' / construct.Int16ul,
+ construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore
- 'value' / construct.Int32ul,
+ "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore
+ "value" / construct.Int32ul,
)
-def _make_packet(
- message, sequence_number, expect_receive, acknowledge, disconnect):
+
+def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect):
return _PACKET.build(
- {'data': {'value': {
- 'message': message,
- 'link_control': {
- 'sequence_number': sequence_number,
- 'expect_receive': expect_receive,
- 'acknowledge': acknowledge,
- 'disconnect': disconnect,
- },
- }}})
+ {
+ "data": {
+ "value": {
+ "message": message,
+ "link_control": {
+ "sequence_number": sequence_number,
+ "expect_receive": expect_receive,
+ "acknowledge": acknowledge,
+ "disconnect": disconnect,
+ },
+ }
+ }
+ }
+ )
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
- DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
+ DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
def __init__(self, device):
@@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.sent_counter_ = False
self.expect_receive_ = False
- self.buffered_reader_ = construct.Rebuffered(
- _PACKET, tailcutoff=1024)
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def connect(self):
try:
- self._send_packet(b'', disconnect=True)
+ self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
@@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _send_packet(self, message, acknowledge=False, disconnect=False):
pkt = _make_packet(
- message,
- self.sent_counter_,
- self.expect_receive_,
- acknowledge,
- disconnect)
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
+ )
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
- logging.debug('received packet: %r', raw_pkt)
+ logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
if not pkt.link_control.disconnect and (
- pkt.link_control.sequence_number != self.expect_receive_):
+ pkt.link_control.sequence_number != self.expect_receive_
+ ):
raise lifescan.MalformedCommand(
- 'at position 2[0b] expected %02x, received %02x' % (
- self.expect_receive_, pkt.link_connect.sequence_count))
+ "at position 2[0b] expected %02x, received %02x"
+ % (self.expect_receive_, pkt.link_connect.sequence_count)
+ )
return pkt
def _send_ack(self):
- self._send_packet(b'', acknowledge=True, disconnect=False)
+ self._send_packet(b"", acknowledge=True, disconnect=False)
def _read_ack(self):
pkt = self._read_packet()
@@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch Ultra Easy glucometer',
+ "OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
- response = self._send_request(
- _VERSION_REQUEST, None, _VERSION_RESPONSE)
+ response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self):
response = self._send_request(
- _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
+ )
return response.serial_number
def get_datetime(self):
response = self._send_request(
- _DATETIME_REQUEST, {'request_type': 'read'},
- _DATETIME_RESPONSE)
+ _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
+ )
return response.timestamp
def _set_device_datetime(self, date):
response = self._send_request(
- _DATETIME_REQUEST, {
- 'request_type': 'write',
- 'timestamp': date,
- }, _DATETIME_RESPONSE)
+ _DATETIME_REQUEST,
+ {"request_type": "write", "timestamp": date,},
+ _DATETIME_RESPONSE,
+ )
return response.timestamp
def zero_log(self):
- self._send_request(
- _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD},
- _READING_COUNT_RESPONSE)
+ _READ_RECORD_REQUEST,
+ {"record_id": _INVALID_RECORD},
+ _READING_COUNT_RESPONSE,
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+ _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
+ )
- return common.GlucoseReading(
- response.timestamp, float(response.value))
+ return common.GlucoseReading(response.timestamp, float(response.value))
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
index bde0af3..b8429ea 100644
--- a/glucometerutils/drivers/otverio2015.py
+++ b/glucometerutils/drivers/otverio2015.py
@@ -29,67 +29,61 @@ import construct
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
_PACKET = construct.Padded(
- _REGISTER_SIZE,
- lifescan_binary_protocol.LifeScanPacket(False))
+ _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False)
+)
-_COMMAND_SUCCESS = construct.Const(b'\x03\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
_QUERY_REQUEST = construct.Struct(
- construct.Const(b'\x03\xe6\x02'),
- 'selector' / construct.Enum(
- construct.Byte, serial=0x00, model=0x01, software=0x02),
+ construct.Const(b"\x03\xe6\x02"),
+ "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02),
)
_QUERY_RESPONSE = construct.Struct(
- construct.Const(b'\x03\x06'),
- 'value' / construct.CString(encoding='utf-16-le'),
+ construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"),
)
_READ_PARAMETER_REQUEST = construct.Struct(
- construct.Const(b'\x03'),
- 'selector' / construct.Enum(
- construct.Byte, unit=0x04),
+ construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04),
)
_READ_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02')
+_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
- construct.Const(b'\x03\x20\x01'),
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ construct.Const(b"\x03\x20\x01"),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
-_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00')
+_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'count' / construct.Int16ul,
+ _COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x03\x31\x02'),
- 'record_id' / construct.Int16ul,
- construct.Const(b'\x00'),
+ construct.Const(b"\x03\x31\x02"),
+ "record_id" / construct.Int16ul,
+ construct.Const(b"\x00"),
)
_MEAL_FLAG = {
@@ -100,13 +94,12 @@ _MEAL_FLAG = {
_READ_RECORD_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'inverse_counter' / construct.Int16ul,
+ "inverse_counter" / construct.Int16ul,
construct.Padding(1),
- 'lifetime_counter' / construct.Int16ul,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
- 'value' / construct.Int16ul,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "lifetime_counter" / construct.Int16ul,
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "value" / construct.Int16ul,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(4),
)
@@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver):
def __init__(self, device):
if not device:
raise exceptions.CommandLineError(
- '--device parameter is required, should point to the disk '
- 'device representing the meter.')
+ "--device parameter is required, should point to the disk "
+ "device representing the meter."
+ )
self.device_name_ = device
self.scsi_device_ = SCSIDevice(device, readwrite=True)
@@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver):
def connect(self):
inq = self.scsi_.inquiry()
- logging.debug('Device connected: %r', inq.result)
- vendor = inq.result['t10_vendor_identification'][:32]
- if vendor != b'LifeScan':
+ logging.debug("Device connected: %r", inq.result)
+ vendor = inq.result["t10_vendor_identification"][:32]
+ if vendor != b"LifeScan":
raise exceptions.ConnectionFailed(
- 'Device %s is not a LifeScan glucometer.' % self.device_name_)
+ "Device %s is not a LifeScan glucometer." % self.device_name_
+ )
def disconnect(self): # pylint: disable=no-self-use
return
@@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver):
"""
try:
request = request_format.build(request_obj)
- request_raw = _PACKET.build({'data': {'value': {
- 'message': request,
- }}})
- logging.debug(
- 'Request sent: %s', binascii.hexlify(request_raw))
+ request_raw = _PACKET.build({"data": {"value": {"message": request,}}})
+ logging.debug("Request sent: %s", binascii.hexlify(request_raw))
self.scsi_.write10(lba, 1, request_raw)
response_raw = self.scsi_.read10(lba, 1)
logging.debug(
- 'Response received: %s', binascii.hexlify(response_raw.datain))
+ "Response received: %s", binascii.hexlify(response_raw.datain)
+ )
response_pkt = _PACKET.parse(response_raw.datain).data
- logging.debug('Response packet: %r', response_pkt)
+ logging.debug("Response packet: %r", response_pkt)
response = response_format.parse(response_pkt.value.message)
- logging.debug('Response parsed: %r', response)
+ logging.debug("Response parsed: %r", response)
return response
except construct.ConstructError as e:
@@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver):
def _query_string(self, selector):
response = self._send_request(
- 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE)
+ 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE
+ )
return response.value
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch %s glucometer' % self._query_string('model'),
+ "OneTouch %s glucometer" % self._query_string("model"),
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_serial_number(self):
- return self._query_string('serial')
+ return self._query_string("serial")
def get_version(self):
- return self._query_string('software')
+ return self._query_string("software")
def get_datetime(self):
- response = self._send_request(
- 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date):
- self._send_request(
- 3, _WRITE_RTC_REQUEST, {'timestamp': date},
- _COMMAND_SUCCESS)
+ self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self):
- self._send_request(
- 3, _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'},
- _READ_UNIT_RESPONSE)
+ 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- 3, _READ_RECORD_REQUEST, {'record_id': record_id},
- _READ_RECORD_RESPONSE)
+ 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE
+ )
return common.GlucoseReading(
- response.timestamp, float(response.value), meal=response.meal)
+ response.timestamp, float(response.value), meal=response.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py
index 69bdac9..24327ef 100644
--- a/glucometerutils/drivers/otverioiq.py
+++ b/glucometerutils/drivers/otverioiq.py
@@ -21,63 +21,63 @@ import logging
import construct
from glucometerutils import common
-from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol, serial
+from glucometerutils.support import (
+ driver_base,
+ lifescan,
+ lifescan_binary_protocol,
+ serial,
+)
_PACKET = lifescan_binary_protocol.LifeScanPacket(False)
-_COMMAND_SUCCESS = construct.Const(b'\x03\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
-_VERSION_REQUEST = construct.Const(b'\x03\x0d\x01')
+_VERSION_REQUEST = construct.Const(b"\x03\x0d\x01")
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+ "version" / construct.PascalString(construct.Byte, encoding="ascii"),
# NULL-termination is not included in string length.
- construct.Const(b'\x00'),
+ construct.Const(b"\x00"),
)
-_SERIAL_NUMBER_REQUEST = construct.Const(
- b'\x03\x0b\x01\x02')
+_SERIAL_NUMBER_REQUEST = construct.Const(b"\x03\x0b\x01\x02")
_SERIAL_NUMBER_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'serial_number' / construct.CString(encoding='ascii'),
+ _COMMAND_SUCCESS, "serial_number" / construct.CString(encoding="ascii"),
)
-_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02')
+_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
- construct.Const(b'\x03\x20\x01'),
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ construct.Const(b"\x03\x20\x01"),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
-_GLUCOSE_UNIT_REQUEST = construct.Const(
- b'\x03\x09\x02\x02')
+_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x03\x09\x02\x02")
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
-_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00')
+_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'count' / construct.Int16ul,
+ _COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x03\x21'),
- 'record_id' / construct.Int16ul,
+ construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul,
)
_MEAL_FLAG = {
@@ -88,18 +88,17 @@ _MEAL_FLAG = {
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
- 'value' / construct.Int16ul,
- 'control_test' / construct.Flag,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "value" / construct.Int16ul,
+ "control_test" / construct.Flag,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(2), # unknown
)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 38400
- DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x
+ DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
def __init__(self, device):
@@ -107,18 +106,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def _send_packet(self, message):
- pkt = _PACKET.build(
- {'data': {'value': {
- 'message': message,
- }}})
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ pkt = _PACKET.build({"data": {"value": {"message": message,}}})
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
- logging.debug('received packet: %r', raw_pkt)
+ logging.debug("received packet: %r", raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
@@ -138,66 +134,64 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch Verio IQ glucometer',
+ "OneTouch Verio IQ glucometer",
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_version(self):
- response = self._send_request(
- _VERSION_REQUEST, None, _VERSION_RESPONSE)
+ response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
def get_serial_number(self):
response = self._send_request(
- _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
+ )
return response.serial_number
def get_datetime(self):
- response = self._send_request(
- _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date):
- self._send_request(
- _WRITE_RTC_REQUEST, {
- 'timestamp': date,
- }, _COMMAND_SUCCESS)
+ self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date,}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self):
- self._send_request(
- _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+ _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
+ )
if response.control_test:
- logging.debug('control solution test, ignoring.')
+ logging.debug("control solution test, ignoring.")
return None
return common.GlucoseReading(
- response.timestamp, float(response.value), meal=response.meal)
+ response.timestamp, float(response.value), meal=response.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index a6e2ce5..47dd9ca 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -24,46 +24,44 @@ import operator
import construct
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
def xor_checksum(msg):
return functools.reduce(operator.xor, msg)
+
class Direction(enum.Enum):
In = 0x20
Out = 0x10
+
_PACKET = construct.Struct(
- 'stx' / construct.Const(0x53, construct.Byte),
- 'direction' / construct.Mapping(
- construct.Byte,
- {e: e.value for e in Direction}),
- 'length' / construct.Rebuild(
- construct.Byte, lambda this: len(this.message) + 2),
- 'message' / construct.Bytes(lambda this: this.length - 2),
- 'checksum' / construct.Checksum(
- construct.Byte, xor_checksum, construct.this.message),
- 'etx' / construct.Const(0xAA, construct.Byte)
+ "stx" / construct.Const(0x53, construct.Byte),
+ "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
+ "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2),
+ "message" / construct.Bytes(lambda this: this.length - 2),
+ "checksum"
+ / construct.Checksum(construct.Byte, xor_checksum, construct.this.message),
+ "etx" / construct.Const(0xAA, construct.Byte),
)
_FIRST_MESSAGE = construct.Struct(
construct.Const(0x30, construct.Byte),
- 'count' / construct.Int16ub,
+ "count" / construct.Int16ub,
construct.Const(0xAA, construct.Byte)[19],
)
-_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
-_RESPONSE_MESSAGE = b'\x10\x40'
+_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA"
+_RESPONSE_MESSAGE = b"\x10\x40"
-_DATE_SET_MESSAGE = b'\x10\x10'
+_DATE_SET_MESSAGE = b"\x10\x10"
-_DISCONNECT_MESSAGE = b'\x10\x60'
-_DISCONNECTED_MESSAGE = b'\x10\x70'
+_DISCONNECT_MESSAGE = b"\x10\x60"
+_DISCONNECTED_MESSAGE = b"\x10\x70"
-_FETCH_MESSAGE = b'\x10\x60'
+_FETCH_MESSAGE = b"\x10\x60"
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
@@ -73,67 +71,64 @@ _MEAL_FLAG = {
_READING = construct.Struct(
construct.Byte[2],
- 'year' / construct.Byte,
- 'month' / construct.Byte,
- 'day' / construct.Byte,
- 'hour' / construct.Byte,
- 'minute' / construct.Byte,
- 'value' / construct.Int16ub,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "year" / construct.Byte,
+ "month" / construct.Byte,
+ "day" / construct.Byte,
+ "hour" / construct.Byte,
+ "minute" / construct.Byte,
+ "value" / construct.Int16ub,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Byte[7],
)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 38400
- DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable.
+ DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable.
TIMEOUT = 300 # We need to wait for data from the device.
def read_message(self):
pkt = _PACKET.parse_stream(self.serial_)
- logging.debug('received packet: %r', pkt)
+ logging.debug("received packet: %r", pkt)
return pkt.message
def wait_and_ready(self):
- challenge = b'\0'
- while challenge == b'\0':
+ challenge = b"\0"
+ while challenge == b"\0":
challenge = self.serial_.read(1)
# The first packet read may have a prefixed zero, it might be a bug
# in the cp210x driver or device, but discard it if found.
- if challenge == b'\0':
- logging.debug('spurious null byte received')
+ if challenge == b"\0":
+ logging.debug("spurious null byte received")
continue
- if challenge != b'\x53':
+ if challenge != b"\x53":
raise exceptions.ConnectionFailed(
- message='Unexpected starting bytes %r' % challenge)
+ message="Unexpected starting bytes %r" % challenge
+ )
challenge += self.serial_.read(6)
if challenge != _CHALLENGE_PACKET_FULL:
raise exceptions.ConnectionFailed(
- message='Unexpected challenge %r' % challenge)
+ message="Unexpected challenge %r" % challenge
+ )
- logging.debug(
- 'challenge packet received: %s', binascii.hexlify(challenge))
+ logging.debug("challenge packet received: %s", binascii.hexlify(challenge))
self.send_message(_RESPONSE_MESSAGE)
# The first packet only contains the counter of how many readings are
# available.
first_message = _FIRST_MESSAGE.parse(self.read_message())
- logging.debug('received first message: %r', first_message)
+ logging.debug("received first message: %r", first_message)
return first_message.count
def send_message(self, message):
- pkt = _PACKET.build({
- 'message': message,
- 'direction': Direction.Out
- })
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ pkt = _PACKET.build({"message": message, "direction": Direction.Out})
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
def connect(self): # pylint: disable=no-self-use
@@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
def get_meter_info(self): # pylint: disable=no-self-use
- return common.MeterInfo('SD CodeFree glucometer')
+ return common.MeterInfo("SD CodeFree glucometer")
def get_version(self): # pylint: disable=no-self-use
raise NotImplementedError
@@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise NotImplementedError
def _set_device_datetime(self, date):
- setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')
+ setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii")
# Ignore the readings count.
self.wait_and_ready()
@@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
# The date we return should only include up to minute, unfortunately.
- return datetime.datetime(date.year, date.month, date.day,
- date.hour, date.minute)
+ return datetime.datetime(
+ date.year, date.month, date.day, date.hour, date.minute
+ )
def zero_log(self):
raise NotImplementedError
@@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
message = self.read_message()
reading = _READING.parse(message)
- logging.debug('received reading: %r', reading)
+ logging.debug("received reading: %r", reading)
yield common.GlucoseReading(
datetime.datetime(
- 2000 + reading.year, reading.month,
- reading.day, reading.hour, reading.minute),
- reading.value, meal=reading.meal)
+ 2000 + reading.year,
+ reading.month,
+ reading.day,
+ reading.hour,
+ reading.minute,
+ ),
+ reading.value,
+ meal=reading.meal,
+ )
diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py
index 4ab25ee..0385299 100644
--- a/glucometerutils/drivers/td4277.py
+++ b/glucometerutils/drivers/td4277.py
@@ -21,14 +21,13 @@ import operator
import construct
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
class Direction(enum.Enum):
- In = 0xa5
- Out = 0xa3
+ In = 0xA5
+ Out = 0xA3
def byte_checksum(data):
@@ -36,18 +35,18 @@ def byte_checksum(data):
_PACKET = construct.Struct(
- 'data' / construct.RawCopy(
+ "data"
+ / construct.RawCopy(
construct.Struct(
- construct.Const(b'\x51'),
- 'command' / construct.Byte,
- 'message' / construct.Bytes(4),
- 'direction' / construct.Mapping(
- construct.Byte,
- {e: e.value for e in Direction}),
+ construct.Const(b"\x51"),
+ "command" / construct.Byte,
+ "message" / construct.Bytes(4),
+ "direction"
+ / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
),
),
- 'checksum' / construct.Checksum(
- construct.Byte, byte_checksum, construct.this.data.data),
+ "checksum"
+ / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data),
)
_EMPTY_MESSAGE = 0
@@ -60,38 +59,32 @@ _SET_DATETIME = 0x33
_GET_MODEL = 0x24
-_GET_READING_COUNT = 0x2b
+_GET_READING_COUNT = 0x2B
_GET_READING_DATETIME = 0x25
_GET_READING_VALUE = 0x26
_CLEAR_MEMORY = 0x52
_MODEL_STRUCT = construct.Struct(
- construct.Const(b'\x77\x42'),
- construct.Byte,
- construct.Byte,
+ construct.Const(b"\x77\x42"), construct.Byte, construct.Byte,
)
_DATETIME_STRUCT = construct.Struct(
- 'day' / construct.Int16ul,
- 'minute' / construct.Byte,
- 'hour' / construct.Byte,
+ "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte,
)
_DAY_BITSTRUCT = construct.BitStruct(
- 'year' / construct.BitsInteger(7),
- 'month' / construct.BitsInteger(4),
- 'day' / construct.BitsInteger(5),
+ "year" / construct.BitsInteger(7),
+ "month" / construct.BitsInteger(4),
+ "day" / construct.BitsInteger(5),
)
_READING_COUNT_STRUCT = construct.Struct(
- 'count' / construct.Int16ul,
- construct.Int16ul,
+ "count" / construct.Int16ul, construct.Int16ul,
)
_READING_SELECTION_STRUCT = construct.Struct(
- 'record_id' / construct.Int16ul,
- construct.Const(b'\x00\x00'),
+ "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"),
)
_MEAL_FLAG = {
@@ -101,21 +94,24 @@ _MEAL_FLAG = {
}
_READING_VALUE_STRUCT = construct.Struct(
- 'value' / construct.Int16ul,
- construct.Const(b'\x06'),
- 'meal'/ construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "value" / construct.Int16ul,
+ construct.Const(b"\x06"),
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
)
+
def _make_packet(command, message, direction=Direction.Out):
return _PACKET.build(
- {'data': {
- 'value': {
- 'command': command,
- 'message': message,
- 'direction': direction,
- },
- }})
+ {
+ "data": {
+ "value": {
+ "command": command,
+ "message": message,
+ "direction": direction,
+ },
+ }
+ }
+ )
def _parse_datetime(message):
@@ -124,11 +120,12 @@ def _parse_datetime(message):
# unfortunately.
day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day))
return datetime.datetime(
- 2000+day.year, day.month, day.day, date.hour, date.minute)
+ 2000 + day.year, day.month, day.day, date.hour, date.minute
+ )
def _select_record(record_id):
- return _READING_SELECTION_STRUCT.build({'record_id': record_id})
+ return _READING_SELECTION_STRUCT.build({"record_id": record_id})
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
@@ -137,19 +134,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
TIMEOUT = 0.5
def __init__(self, device):
- super(Device, self).__init__('cp2110://' + device)
- self.buffered_reader_ = construct.Rebuffered(
- _PACKET, tailcutoff=1024)
+ super(Device, self).__init__("cp2110://" + device)
+ self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def _send_command(
- self, command, message=_EMPTY_MESSAGE, validate_response=True):
+ def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True):
pkt = _make_packet(command, message)
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
response = self.buffered_reader_.parse_stream(self.serial_)
- logging.debug('received packet: %r', response)
+ logging.debug("received packet: %r", response)
if validate_response and response.data.value.command != command:
raise InvalidResponse(response)
@@ -158,24 +153,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def connect(self):
response_command, message = self._send_command(
- _CONNECT_REQUEST, validate_response=False)
+ _CONNECT_REQUEST, validate_response=False
+ )
if response_command not in _VALID_CONNECT_RESPONSE:
raise exceptions.ConnectionFailed(
- 'Invalid response received: %2x %r' % (
- response_command, message))
+ "Invalid response received: %2x %r" % (response_command, message)
+ )
_, model_message = self._send_command(_GET_MODEL)
try:
_MODEL_STRUCT.parse(model_message)
except construct.ConstructError:
raise exceptions.ConnectionFailed(
- 'Invalid model identified: %r' % model_message)
+ "Invalid model identified: %r" % model_message
+ )
def disconnect(self):
pass
def get_meter_info(self):
- return common.MeterInfo('TaiDoc TD-4277 glucometer')
+ return common.MeterInfo("TaiDoc TD-4277 glucometer")
def get_version(self): # pylint: disable=no-self-use
raise NotImplementedError
@@ -191,18 +188,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _set_device_datetime(self, date):
assert date.year >= 2000
- day_struct = _DAY_BITSTRUCT.build({
- 'year': date.year - 2000,
- 'month': date.month,
- 'day': date.day,
- })
+ day_struct = _DAY_BITSTRUCT.build(
+ {"year": date.year - 2000, "month": date.month, "day": date.day,}
+ )
day_word = construct.Int16ub.parse(day_struct)
- date_message = _DATETIME_STRUCT.build({
- 'day': day_word,
- 'minute': date.minute,
- 'hour': date.hour})
+ date_message = _DATETIME_STRUCT.build(
+ {"day": day_word, "minute": date.minute, "hour": date.hour}
+ )
_, message = self._send_command(_SET_DATETIME, message=date_message)
@@ -215,17 +209,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
def _get_reading(self, record_id):
_, reading_date_message = self._send_command(
- _GET_READING_DATETIME,
- _select_record(record_id))
+ _GET_READING_DATETIME, _select_record(record_id)
+ )
reading_date = _parse_datetime(reading_date_message)
_, reading_value_message = self._send_command(
- _GET_READING_VALUE,
- _select_record(record_id))
+ _GET_READING_VALUE, _select_record(record_id)
+ )
reading_value = _READING_VALUE_STRUCT.parse(reading_value_message)
return common.GlucoseReading(
- reading_date, reading_value.value, meal=reading_value.meal)
+ reading_date, reading_value.value, meal=reading_value.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()