summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--glucometerutils/common.py11
-rw-r--r--glucometerutils/drivers/accuchek_reports.py33
-rw-r--r--glucometerutils/drivers/contourusb.py21
-rw-r--r--glucometerutils/drivers/fsinsulinx.py11
-rw-r--r--glucometerutils/drivers/fslibre.py39
-rw-r--r--glucometerutils/drivers/fsoptium.py29
-rw-r--r--glucometerutils/drivers/fsprecisionneo.py69
-rw-r--r--glucometerutils/drivers/otultra2.py36
-rw-r--r--glucometerutils/drivers/otultraeasy.py54
-rw-r--r--glucometerutils/drivers/otverio2015.py42
-rw-r--r--glucometerutils/drivers/otverioiq.py37
-rw-r--r--glucometerutils/drivers/sdcodefree.py33
-rw-r--r--glucometerutils/drivers/td4277.py46
-rw-r--r--glucometerutils/exceptions.py35
-rw-r--r--glucometerutils/support/construct_extras.py6
-rw-r--r--glucometerutils/support/driver_base.py55
-rw-r--r--glucometerutils/support/freestyle.py75
-rw-r--r--glucometerutils/support/hiddevice.py16
-rw-r--r--glucometerutils/support/lifescan.py9
-rw-r--r--glucometerutils/support/lifescan_binary_protocol.py5
-rw-r--r--glucometerutils/support/serial.py12
-rw-r--r--glucometerutils/tests/test_common.py38
-rw-r--r--pyproject.toml2
23 files changed, 409 insertions, 305 deletions
diff --git a/glucometerutils/common.py b/glucometerutils/common.py
index ddb2607..bf944c8 100644
--- a/glucometerutils/common.py
+++ b/glucometerutils/common.py
@@ -6,7 +6,7 @@
import datetime
import enum
import textwrap
-from typing import Any, Dict, Optional, Sequence
+from typing import Any, Dict, Optional, Sequence, Union
import attr
@@ -91,6 +91,10 @@ class KetoneReading:
timestamp: datetime.datetime
value: float
comment: str = ""
+ measure_method: MeasurementMethod = attr.ib(
+ default=MeasurementMethod.BLOOD_SAMPLE,
+ validator=attr.validators.in_({MeasurementMethod.BLOOD_SAMPLE}),
+ )
extra_data: Dict[str, Any] = attr.Factory(dict)
def as_csv(self, unit: Unit) -> str:
@@ -100,7 +104,7 @@ class KetoneReading:
return '"%s","%.2f","%s","%s"' % (
self.timestamp,
self.value,
- MeasurementMethod.BLOOD_SAMPLE.value,
+ self.measure_method.value,
self.comment,
)
@@ -123,6 +127,9 @@ class TimeAdjustment:
)
+AnyReading = Union[GlucoseReading, KetoneReading, TimeAdjustment]
+
+
@attr.s(auto_attribs=True)
class MeterInfo:
"""General information about the meter.
diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py
index e6dc2ea..de29064 100644
--- a/glucometerutils/drivers/accuchek_reports.py
+++ b/glucometerutils/drivers/accuchek_reports.py
@@ -18,6 +18,7 @@ import csv
import datetime
import glob
import os
+from typing import Dict, Generator, NoReturn, Optional
from glucometerutils import common, exceptions
from glucometerutils.support import driver_base
@@ -46,7 +47,7 @@ _DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT))
class Device(driver_base.GlucometerDriver):
- def __init__(self, device):
+ def __init__(self, device: Optional[str]) -> None:
if not device or not os.path.isdir(device):
raise exceptions.CommandLineError(
"--device parameter is required, should point to mount path "
@@ -62,7 +63,7 @@ class Device(driver_base.GlucometerDriver):
self.report_file = report_files[0]
- def _get_records_reader(self):
+ def _get_records_reader(self) -> csv.DictReader:
self.report.seek(0)
# Skip the first two lines
next(self.report)
@@ -72,51 +73,55 @@ class Device(driver_base.GlucometerDriver):
self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE
)
- def connect(self):
+ def connect(self) -> None:
self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8")
- def disconnect(self):
+ def disconnect(self) -> None:
self.report.close()
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
f"{self.get_model()} glucometer",
serial_number=self.get_serial_number(),
native_unit=self.get_glucose_unit(),
)
- def get_model(self):
+ def get_model(self) -> str:
# $device/MODEL/Reports/*.csv
return os.path.basename(os.path.dirname(os.path.dirname(self.report_file)))
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
self.report.seek(0)
# ignore the first line.
next(self.report)
# The second line of the CSV is serial-no;report-date;report-time;;;;;;;
return next(self.report).split(";")[0]
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
# Get the first record available and parse that.
record = next(self._get_records_reader())
return _UNIT_MAP[record[_UNIT_CSV_KEY]]
- def get_datetime(self):
+ def get_datetime(self) -> NoReturn:
raise NotImplementedError
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> NoReturn:
raise NotImplementedError
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
- def _extract_datetime(self, record): # pylint: disable=no-self-use
+ def _extract_datetime(
+ self, record: Dict[str, str]
+ ) -> datetime.datetime: # pylint: disable=no-self-use
# Date and time are in separate column, but we want to parse them
# together.
date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY]))
return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT)
- def _extract_meal(self, record): # pylint: disable=no-self-use
+ def _extract_meal(
+ self, record: Dict[str, str]
+ ) -> common.Meal: # pylint: disable=no-self-use
if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]:
raise exceptions.InvalidResponse("Reading cannot be before and after meal.")
elif record[_AFTER_MEAL_CSV_KEY]:
@@ -126,7 +131,7 @@ class Device(driver_base.GlucometerDriver):
else:
return common.Meal.NONE
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
for record in self._get_records_reader():
if record[_RESULT_CSV_KEY] is None:
continue
diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py
index 095f920..8696b04 100644
--- a/glucometerutils/drivers/contourusb.py
+++ b/glucometerutils/drivers/contourusb.py
@@ -19,12 +19,13 @@ http://protocols.ascensia.com/Programming-Guide.aspx
"""
import datetime
+from typing import Dict, Generator, NoReturn, Optional
from glucometerutils import common
from glucometerutils.support import contourusb
-def _extract_timestamp(parsed_record, prefix=""):
+def _extract_timestamp(parsed_record: Dict[str, str]):
"""Extract the timestamp from a parsed record.
This leverages the fact that all the reading records have the same base structure.
@@ -42,12 +43,12 @@ def _extract_timestamp(parsed_record, prefix=""):
class Device(contourusb.ContourHidDevice):
- """Glucometer driver for FreeStyle Libre devices."""
+ """Glucometer driver for Contour devices."""
- def __init__(self, device):
- self._hid_session = contourusb.ContourHidSession((0x1A79, 0x6002), device)
+ def __init__(self, device: Optional[str]) -> None:
+ super().__init__((0x1A79, 0x6002), device)
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
self._get_info_record()
return common.MeterInfo(
"Contour USB",
@@ -56,13 +57,13 @@ class Device(contourusb.ContourHidDevice):
native_unit=self.get_glucose_unit(),
)
- def get_glucose_unit(self): # pylint: disable=no-self-use
+ def get_glucose_unit(self) -> common.Unit:
if self._get_glucose_unit() == "0":
return common.Unit.MG_DL
else:
return common.Unit.MMOL_L
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
"""
Get reading dump from download data mode(all readings stored)
This meter supports only blood samples
@@ -75,11 +76,11 @@ class Device(contourusb.ContourHidDevice):
measure_method=common.MeasurementMethod.BLOOD_SAMPLE,
)
- def get_serial_number(self):
+ def get_serial_number(self) -> NoReturn:
raise NotImplementedError
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> NoReturn:
raise NotImplementedError
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py
index a3e54fb..e984719 100644
--- a/glucometerutils/drivers/fsinsulinx.py
+++ b/glucometerutils/drivers/fsinsulinx.py
@@ -18,6 +18,7 @@ Xavier Claessens.
import collections
import datetime
+from typing import Generator, NoReturn, Optional
from glucometerutils import common
from glucometerutils.support import freestyle
@@ -51,10 +52,10 @@ _InsulinxReading = collections.namedtuple(
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle InsuLinux devices."""
- def __init__(self, device_path):
+ def __init__(self, device_path: Optional[str]) -> None:
super().__init__(0x3460, device_path)
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
"""Return the device information in structured form."""
return common.MeterInfo(
"FreeStyle InsuLinx",
@@ -63,11 +64,11 @@ class Device(freestyle.FreeStyleHidDevice):
native_unit=self.get_glucose_unit(),
)
- def get_glucose_unit(self): # pylint: disable=no-self-use
+ def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
return common.Unit.MG_DL
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
"""Iterate through the reading records in the device."""
for record in self._session.query_multirecord(b"$result?"):
if not record or record[0] != _TYPE_GLUCOSE_READING:
@@ -87,5 +88,5 @@ class Device(freestyle.FreeStyleHidDevice):
yield common.GlucoseReading(timestamp, raw_reading.value)
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py
index 2e37ec1..558dcf4 100644
--- a/glucometerutils/drivers/fslibre.py
+++ b/glucometerutils/drivers/fslibre.py
@@ -20,6 +20,7 @@ https://protocols.glucometers.tech/abbott/freestyle-libre
"""
import datetime
+from typing import Dict, Generator, Mapping, Optional, Sequence, Tuple, Type
from glucometerutils import common
from glucometerutils.support import freestyle
@@ -70,7 +71,9 @@ _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = (
_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),)
-def _parse_record(record, entry_map):
+def _parse_record(
+ record: Sequence[str], entry_map: Sequence[Tuple[int, str]]
+) -> Dict[str, int]:
"""Parses a list of string fields into a dictionary of integers."""
if not record:
@@ -82,7 +85,9 @@ def _parse_record(record, entry_map):
return {}
-def _extract_timestamp(parsed_record, prefix=""):
+def _extract_timestamp(
+ parsed_record: Mapping[str, int], prefix: str = ""
+) -> datetime.datetime:
"""Extract the timestamp from a parsed record.
This leverages the fact that all the records have the same base structure.
@@ -98,7 +103,7 @@ def _extract_timestamp(parsed_record, prefix=""):
)
-def _parse_arresult(record):
+def _parse_arresult(record: Sequence[str]) -> Optional[common.AnyReading]:
"""Takes an array of string fields as input and parses it into a Reading."""
parsed_record = _parse_record(record, _BASE_ENTRY_MAP)
@@ -126,9 +131,9 @@ def _parse_arresult(record):
return None
comment_parts = []
- measure_method = None
- cls = None
- value = None
+ measure_method: Optional[common.MeasurementMethod] = None
+ cls: Optional[Type[common.AnyReading]] = None
+ value: Optional[float] = None
if parsed_record["reading-type"] == 2:
comment_parts.append("(Scan)")
@@ -145,7 +150,10 @@ def _parse_arresult(record):
measure_method = common.MeasurementMethod.BLOOD_SAMPLE
cls = common.KetoneReading
# automatically convert the raw value in mmol/L
- value = freestyle.convert_ketone_unit(parsed_record["value"])
+ raw_value = parsed_record["value"]
+ if raw_value is None:
+ raise ValueError(f"Invalid Ketone value: {parsed_record!r}")
+ value = freestyle.convert_ketone_unit(raw_value)
else:
# unknown reading
return None
@@ -183,7 +191,7 @@ def _parse_arresult(record):
else:
comment_parts.append("Rapid-acting insulin")
- return cls(
+ reading = cls(
_extract_timestamp(parsed_record),
value,
comment="; ".join(comment_parts),
@@ -191,14 +199,16 @@ def _parse_arresult(record):
extra_data={"device_id": parsed_record["device_id"]},
)
+ return reading
+
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle Libre devices."""
- def __init__(self, device_path):
+ def __init__(self, device_path: Optional[str]) -> None:
super().__init__(0x3650, device_path)
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
"""Return the device information in structured form."""
return common.MeterInfo(
"FreeStyle Libre",
@@ -208,18 +218,17 @@ class Device(freestyle.FreeStyleHidDevice):
patient_name=self.get_patient_name(),
)
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
"""Overridden function as the command is not compatible."""
return self._session.send_text_command(b"$sn?").rstrip("\r\n")
- def get_glucose_unit(self): # pylint: disable=no-self-use
+ def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
# TODO(Flameeyes): figure out how to identify the actual unit on the
# device.
return common.Unit.MG_DL
- def get_readings(self):
-
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
# First of all get the usually longer list of sensor readings, and
# convert them to Readings objects.
for record in self._session.query_multirecord(b"$history?"):
@@ -244,5 +253,5 @@ class Device(freestyle.FreeStyleHidDevice):
if reading:
yield reading
- def zero_log(self):
+ def zero_log(self) -> None:
self._session.send_text_command(b"$resetpatient")
diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py
index 77244af..cafd539 100644
--- a/glucometerutils/drivers/fsoptium.py
+++ b/glucometerutils/drivers/fsoptium.py
@@ -19,6 +19,7 @@ https://protocols.glucometers.tech/abbott/freestyle-optium
import datetime
import logging
import re
+from typing import Generator, NoReturn, Sequence
from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, serial
@@ -64,8 +65,8 @@ _MONTH_MATCHES = {
}
-def _parse_clock(datestr):
- """Convert the date/time string used by the the device into a datetime.
+def _parse_clock(datestr: str) -> datetime.datetime:
+ """Convert the date/time string used by the device into a datetime.
Args:
datestr: a string as returned by the device during information handling.
@@ -88,7 +89,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 19200
DEFAULT_CABLE_ID = "1a61:3420"
- def _send_command(self, command):
+ def _send_command(self, command: str) -> Sequence[str]:
cmd_bytes = bytes(f"$%s\r\n" % command, "ascii")
logging.debug("Sending command: %r", cmd_bytes)
@@ -104,14 +105,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response]
return decoded_response
- def connect(self):
+ def connect(self) -> None:
self._send_command("xmem") # ignore output this time
self._fetch_device_information()
- def disconnect(self): # pylint: disable=no-self-use
+ def disconnect(self) -> None: # pylint: disable=no-self-use
return
- def _fetch_device_information(self):
+ def _fetch_device_information(self) -> None:
data = self._send_command("colq")
for line in data:
@@ -134,7 +135,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
# back the commands and not replying to them.
raise exceptions.ConnectionFailed()
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
"""Fetch and parses the device information.
Returns:
@@ -147,7 +148,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
"""Returns an identifier of the firmware version of the glucometer.
Returns:
@@ -155,7 +156,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
"""
return self.device_version_
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
"""Retrieve the serial number of the device.
Returns:
@@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
"""
return self.device_serialno_
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
"""Returns a constant representing the unit displayed by the meter.
Returns:
@@ -172,7 +173,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
"""
return self.device_glucose_unit_
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
"""Returns the current date and time for the glucometer.
Returns:
@@ -188,7 +189,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse("\n".join(data))
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M"))
parsed_data = "".join(data)
@@ -197,10 +198,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return self.get_datetime()
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
"""Iterates over the reading values stored in the glucometer.
Args:
diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py
index 3009c7b..d6c6975 100644
--- a/glucometerutils/drivers/fsprecisionneo.py
+++ b/glucometerutils/drivers/fsprecisionneo.py
@@ -21,8 +21,9 @@ https://protocols.glucometers.tech/abbott/freestyle-precision-neo
"""
-import collections
+import dataclasses
import datetime
+from typing import Generator, NoReturn, Optional, Sequence, Type
from glucometerutils import common
from glucometerutils.support import freestyle
@@ -31,35 +32,40 @@ from glucometerutils.support import freestyle
_TYPE_GLUCOSE_READING = "7"
_TYPE_KETONE_READING = "9"
-_NeoReading = collections.namedtuple(
- "_NeoReading",
- (
- "type", # 7 = blood glucose, 9 = blood ketone
- "id",
- "month",
- "day",
- "year", # year is two-digits
- "hour",
- "minute",
- "unknown2",
- "value",
- # Extra trailing and so-far-unused fields; so discard them:
- # * for blood glucose: 10 unknown trailing fields
- # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
- # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
- # * for blood ketone: 2 unknown trailing fields
- # 'unknown3', 'unknown4',
- ),
-)
+
+@dataclasses.dataclass
+class _NeoReading:
+ type: int # 7 = blood glucose, 9 = blood ketone
+ id: int
+ month: int
+ day: int
+ year: int # year is two-digits
+ hour: int
+ minute: int
+ unknown: int
+ value: float
+ # Extra trailing and so-far-unused fields; so discard them:
+ # * for blood glucose: 10 unknown trailing fields
+ # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7',
+ # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12',
+ # * for blood ketone: 2 unknown trailing fields
+ # 'unknown3', 'unknown4',
+
+ def __init__(self, record: Sequence[str]) -> None:
+ for idx, field in enumerate(dataclasses.fields(self)):
+ if record[idx] == "HI":
+ setattr(self, field.name, float("inf"))
+ else:
+ setattr(self, field.name, int(record[idx]))
class Device(freestyle.FreeStyleHidDevice):
"""Glucometer driver for FreeStyle Precision Neo devices."""
- def __init__(self, device_path):
+ def __init__(self, device_path: Optional[str]):
super().__init__(0x3850, device_path)
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
"""Return the device information in structured form."""
return common.MeterInfo(
"FreeStyle Precision Neo",
@@ -69,14 +75,14 @@ class Device(freestyle.FreeStyleHidDevice):
patient_name=self.get_patient_name(),
)
- def get_glucose_unit(self): # pylint: disable=no-self-use
+ def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use
"""Returns the glucose unit of the device."""
return common.Unit.MG_DL
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
"""Iterate through the reading records in the device."""
for record in self._session.query_multirecord(b"$result?"):
- cls = None
+ cls: Optional[Type[common.AnyReading]] = None
if record and record[0] == _TYPE_GLUCOSE_READING:
cls = common.GlucoseReading
elif record and record[0] == _TYPE_KETONE_READING:
@@ -84,14 +90,9 @@ class Device(freestyle.FreeStyleHidDevice):
else:
continue
- # Build a _reading object by parsing each of the entries in the raw
+ # Build a _NeoReading object by parsing each of the entries in the raw
# record
- values = []
- for value in record:
- if value == "HI":
- value = float("inf")
- values.append(int(value))
- raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)])
+ raw_reading = _NeoReading(record)
timestamp = datetime.datetime(
raw_reading.year + 2000,
@@ -108,5 +109,5 @@ class Device(freestyle.FreeStyleHidDevice):
yield cls(timestamp, value)
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py
index 5e90b87..c1af4a0 100644
--- a/glucometerutils/drivers/otultra2.py
+++ b/glucometerutils/drivers/otultra2.py
@@ -15,6 +15,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device.
import datetime
import re
+from typing import Generator
from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, serial
@@ -51,7 +52,7 @@ _DUMP_LINE_RE = re.compile(
_RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$")
-def _calculate_checksum(bytestring):
+def _calculate_checksum(bytestring: bytes) -> int:
"""Calculate the checksum used by OneTouch Ultra and Ultra2 devices
Args:
@@ -71,7 +72,7 @@ def _calculate_checksum(bytestring):
return checksum
-def _validate_and_strip_checksum(line):
+def _validate_and_strip_checksum(line: str) -> str:
"""Verify the simple 16-bit checksum and remove it from the line.
Args:
@@ -104,7 +105,7 @@ _DATETIME_RE = re.compile(
)
-def _parse_datetime(response):
+def _parse_datetime(response: str) -> datetime.datetime:
"""Convert a response with date and time from the meter into a datetime.
Args:
@@ -132,23 +133,23 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 9600
DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
- def connect(self): # pylint: disable=no-self-use
+ def connect(self) -> None: # pylint: disable=no-self-use
return
- def disconnect(self): # pylint: disable=no-self-use
+ def disconnect(self) -> None: # pylint: disable=no-self-use
return
- def _send_command(self, cmd):
+ def _send_command(self, cmd: str) -> None:
"""Send command interface.
Args:
cmd: command and parameters to send (without newline)
"""
- cmdstring = bytes("\x11\r" + cmd + "\r", "ascii")
+ cmdstring = bytes(f"\x11\r{cmd}\r", "ascii")
self.serial_.write(cmdstring)
self.serial_.flush()
- def _send_oneliner_command(self, cmd):
+ def _send_oneliner_command(self, cmd: str) -> str:
"""Send command and read a one-line response.
Args:
@@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
line = self.serial_.readline().decode("ascii")
return _validate_and_strip_checksum(line)
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
"""Fetch and parses the device information.
Returns:
@@ -176,7 +177,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
"""Returns an identifier of the firmware version of the glucometer.
Returns:
@@ -192,7 +193,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
_SERIAL_NUMBER_RE = re.compile('^@ "([A-Z0-9]{9})"$')
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
"""Retrieve the serial number of the device.
Returns:
@@ -219,7 +220,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return serial_number
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
"""Returns the current date and time for the glucometer.
Returns:
@@ -228,13 +229,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
response = self._send_oneliner_command("DMF")
return _parse_datetime(response[2:])
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
response = self._send_oneliner_command(
"DMT" + date.strftime("%m/%d/%y %H:%M:%S")
)
return _parse_datetime(response[2:])
- def zero_log(self):
+ def zero_log(self) -> None:
"""Zeros out the data log of the device.
This function will clear the memory of the device deleting all the
@@ -246,7 +247,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
_GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"')
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
"""Returns a constant representing the unit displayed by the meter.
Returns:
@@ -264,6 +265,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
response = self._send_oneliner_command("DMSU?")
match = self._GLUCOSE_UNIT_RE.match(response)
+ if match is None:
+ raise exceptions.InvalidGlucoseUnit(response)
+
unit = match.group(1)
if unit == "MG/DL ":
@@ -274,7 +278,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidGlucoseUnit(response)
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
"""Iterates over the reading values stored in the glucometer.
Args:
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py
index 41e2acd..d5eb3c9 100644
--- a/glucometerutils/drivers/otultraeasy.py
+++ b/glucometerutils/drivers/otultraeasy.py
@@ -18,6 +18,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device.
import binascii
import datetime
import logging
+from typing import Any, Dict, Generator, Optional
import construct
@@ -92,7 +93,13 @@ _READING_RESPONSE = construct.Struct(
)
-def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect):
+def _make_packet(
+ message: bytes,
+ sequence_number: int,
+ expect_receive: bool,
+ acknowledge: bool,
+ disconnect: bool,
+):
return _PACKET.build(
{
"data": {
@@ -115,24 +122,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable.
TIMEOUT = 0.5
- def __init__(self, device):
- super(Device, self).__init__(device)
+ def __init__(self, device: Optional[str]) -> None:
+ super().__init__(device)
self.sent_counter_ = False
self.expect_receive_ = False
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def connect(self):
+ def connect(self) -> None:
try:
self._send_packet(b"", disconnect=True)
self._read_ack()
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def disconnect(self):
+ def disconnect(self) -> None:
self.connect()
- def _send_packet(self, message, acknowledge=False, disconnect=False):
+ def _send_packet(
+ self, message: bytes, acknowledge: bool = False, disconnect: bool = False
+ ) -> None:
pkt = _make_packet(
message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
)
@@ -141,7 +150,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.serial_.write(pkt)
self.serial_.flush()
- def _read_packet(self):
+ def _read_packet(self) -> construct.Container:
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
logging.debug("received packet: %r", raw_pkt)
@@ -157,14 +166,19 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return pkt
- def _send_ack(self):
+ def _send_ack(self) -> None:
self._send_packet(b"", acknowledge=True, disconnect=False)
- def _read_ack(self):
+ def _read_ack(self) -> None:
pkt = self._read_packet()
assert pkt.link_control.acknowledge
- def _send_request(self, request_format, request_obj, response_format):
+ def _send_request(
+ self,
+ request_format: construct.Struct,
+ request_obj: Optional[Dict[str, Any]],
+ response_format: construct.Struct,
+ ) -> construct.Container:
try:
request = request_format.build(request_obj)
self._send_packet(request, acknowledge=False, disconnect=False)
@@ -182,7 +196,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
"OneTouch Ultra Easy glucometer",
serial_number=self.get_serial_number(),
@@ -190,26 +204,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
response = self._send_request(
_SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
)
return response.serial_number
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
)
return response.timestamp
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
response = self._send_request(
_DATETIME_REQUEST,
{"request_type": "write", "timestamp": date},
@@ -217,17 +231,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
)
return response.timestamp
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
_GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
)
return response.unit
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
response = self._send_request(
_READ_RECORD_REQUEST,
{"record_id": _INVALID_RECORD},
@@ -235,14 +249,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
)
return response.count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> common.GlucoseReading:
response = self._send_request(
_READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
)
return common.GlucoseReading(response.timestamp, float(response.value))
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
index 3882d70..4e9138a 100644
--- a/glucometerutils/drivers/otverio2015.py
+++ b/glucometerutils/drivers/otverio2015.py
@@ -23,7 +23,9 @@ https://protocols.glucometers.tech/lifescan/onetouch-verio-2015
"""
import binascii
+import datetime
import logging
+from typing import Any, Dict, Generator, Optional
import construct
from pyscsi.pyscsi.scsi import SCSI
@@ -105,19 +107,21 @@ _READ_RECORD_RESPONSE = construct.Struct(
class Device(driver_base.GlucometerDriver):
- def __init__(self, device):
+ def __init__(self, device: Optional[str]) -> None:
if not device:
raise exceptions.CommandLineError(
"--device parameter is required, should point to the disk "
"device representing the meter."
)
+ super().__init__(device)
+
self.device_name_ = device
self.scsi_device_ = SCSIDevice(device, readwrite=True)
self.scsi_ = SCSI(self.scsi_device_)
self.scsi_.blocksize = _REGISTER_SIZE
- def connect(self):
+ def connect(self) -> None:
inq = self.scsi_.inquiry()
logging.debug("Device connected: %r", inq.result)
vendor = inq.result["t10_vendor_identification"][:32]
@@ -126,14 +130,20 @@ class Device(driver_base.GlucometerDriver):
f"Device {self.device_name_} is not a LifeScan glucometer."
)
- def disconnect(self): # pylint: disable=no-self-use
+ def disconnect(self) -> None: # pylint: disable=no-self-use
return
- def _send_request(self, lba, request_format, request_obj, response_format):
+ def _send_request(
+ self,
+ lba: int,
+ request_format: construct.Struct,
+ request_obj: Optional[Dict[str, Any]],
+ response_format: construct.Struct,
+ ) -> construct.Container:
"""Send a request to the meter, and read its response.
Args:
- lba: (int) the address of the block register to use, known
+ lba: the address of the block register to use, known
valid addresses are 3, 4 and 5.
request_format: a construct format identifier of the request to send
request_obj: the object to format with the provided identifier
@@ -168,14 +178,14 @@ class Device(driver_base.GlucometerDriver):
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def _query_string(self, selector):
+ def _query_string(self, selector: str) -> str:
response = self._send_request(
3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE
)
return response.value
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
model = self._query_string("model")
return common.MeterInfo(
f"OneTouch {model} glucometer",
@@ -184,39 +194,39 @@ class Device(driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
return self._query_string("serial")
- def get_version(self):
+ def get_version(self) -> str:
return self._query_string("software")
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE
)
return response.unit
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
response = self._send_request(
3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
)
return response.count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> common.GlucoseReading:
response = self._send_request(
3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE
)
@@ -224,7 +234,7 @@ class Device(driver_base.GlucometerDriver):
response.timestamp, float(response.value), meal=response.meal
)
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)
diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py
index 69f88de..b4a4a6c 100644
--- a/glucometerutils/drivers/otverioiq.py
+++ b/glucometerutils/drivers/otverioiq.py
@@ -16,7 +16,9 @@ auto-detected.
"""
import binascii
+import datetime
import logging
+from typing import Any, Dict, Generator, Optional
import construct
@@ -101,18 +103,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
- def __init__(self, device):
- super(Device, self).__init__(device)
+ def __init__(self, device: Optional[str]) -> None:
+ super().__init__(device)
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def _send_packet(self, message):
+ def _send_packet(self, message: bytes) -> None:
pkt = _PACKET.build({"data": {"value": {"message": message}}})
logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
- def _read_packet(self):
+ def _read_packet(self) -> construct.Container:
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
logging.debug("received packet: %r", raw_pkt)
@@ -121,7 +123,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return pkt
- def _send_request(self, request_format, request_obj, response_format):
+ def _send_request(
+ self,
+ request_format: construct.struct,
+ request_obj: Optional[Dict[str, Any]],
+ response_format: construct.Struct,
+ ) -> construct.Container:
try:
request = request_format.build(request_obj)
self._send_packet(request)
@@ -132,7 +139,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
"OneTouch Verio IQ glucometer",
serial_number=self.get_serial_number(),
@@ -140,47 +147,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
response = self._send_request(
_SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
)
return response.serial_number
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
_GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
)
return response.unit
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
response = self._send_request(
_READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
)
return response.count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> Optional[common.GlucoseReading]:
response = self._send_request(
_READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
)
@@ -193,7 +200,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
response.timestamp, float(response.value), meal=response.meal
)
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
reading = self._get_reading(record_id)
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index 08a3afc..5d1d42c 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -21,6 +21,7 @@ import enum
import functools
import logging
import operator
+from typing import Generator, NoReturn
import construct
@@ -28,7 +29,7 @@ from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, serial
-def xor_checksum(msg):
+def xor_checksum(msg: bytes) -> int:
return functools.reduce(operator.xor, msg)
@@ -87,12 +88,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable.
TIMEOUT = 300 # We need to wait for data from the device.
- def read_message(self):
+ def read_message(self) -> bytes:
pkt = _PACKET.parse_stream(self.serial_)
logging.debug("received packet: %r", pkt)
return pkt.message
- def wait_and_ready(self):
+ def wait_and_ready(self) -> int:
challenge = b"\0"
while challenge == b"\0":
@@ -126,37 +127,37 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return first_message.count
- def send_message(self, message):
+ def send_message(self, message: bytes) -> None:
pkt = _PACKET.build({"message": message, "direction": Direction.Out})
logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
- def connect(self): # pylint: disable=no-self-use
+ def connect(self) -> None: # pylint: disable=no-self-use
print("Please connect and turn on the device.")
- def disconnect(self):
+ def disconnect(self) -> None:
self.send_message(_DISCONNECT_MESSAGE)
response = self.read_message()
if response != _DISCONNECTED_MESSAGE:
- raise exceptions.InvalidResponse(response=response)
+ raise exceptions.InvalidResponse(response=repr(response))
- def get_meter_info(self): # pylint: disable=no-self-use
+ def get_meter_info(self) -> common.MeterInfo: # pylint: disable=no-self-use
return common.MeterInfo("SD CodeFree glucometer")
- def get_version(self): # pylint: disable=no-self-use
+ def get_version(self) -> NoReturn: # pylint: disable=no-self-use
raise NotImplementedError
- def get_serial_number(self): # pylint: disable=no-self-use
+ def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use
raise NotImplementedError
- def get_glucose_unit(self): # pylint: disable=no-self-use
+ def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use
# Device does not provide information on glucose unit.
return common.Unit.MG_DL
- def get_datetime(self): # pylint: disable=no-self-use
+ def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use
raise NotImplementedError
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii")
# Ignore the readings count.
@@ -165,17 +166,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
self.send_message(setdatecmd)
response = self.read_message()
if response != _DATE_SET_MESSAGE:
- raise exceptions.InvalidResponse(response=response)
+ raise exceptions.InvalidResponse(response=repr(response))
# The date we return should only include up to minute, unfortunately.
return datetime.datetime(
date.year, date.month, date.day, date.hour, date.minute
)
- def zero_log(self):
+ def zero_log(self) -> NoReturn:
raise NotImplementedError
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
count = self.wait_and_ready()
for _ in range(count):
diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py
index 31901db..be8a180 100644
--- a/glucometerutils/drivers/td4277.py
+++ b/glucometerutils/drivers/td4277.py
@@ -18,6 +18,7 @@ import enum
import functools
import logging
import operator
+from typing import Generator, NoReturn, Optional, Tuple
import construct
@@ -49,7 +50,7 @@ _PACKET = construct.Struct(
/ construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data),
)
-_EMPTY_MESSAGE = 0
+_EMPTY_MESSAGE = b"\x00\x00\x00\x00"
_CONNECT_REQUEST = 0x22
_VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54}
@@ -100,7 +101,9 @@ _READING_VALUE_STRUCT = construct.Struct(
)
-def _make_packet(command, message, direction=Direction.Out):
+def _make_packet(
+ command: int, message: bytes, direction: Direction = Direction.Out
+) -> bytes:
return _PACKET.build(
{
"data": {
@@ -114,7 +117,7 @@ def _make_packet(command, message, direction=Direction.Out):
)
-def _parse_datetime(message):
+def _parse_datetime(message: bytes) -> datetime.datetime:
date = _DATETIME_STRUCT.parse(message)
# We can't parse the day properly with a single pass of Construct
# unfortunately.
@@ -124,7 +127,7 @@ def _parse_datetime(message):
)
-def _select_record(record_id):
+def _select_record(record_id: int) -> bytes:
return _READING_SELECTION_STRUCT.build({"record_id": record_id})
@@ -133,11 +136,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 19200
TIMEOUT = 0.5
- def __init__(self, device):
- super(Device, self).__init__("cp2110://" + device)
+ def __init__(self, device: Optional[str]):
+ super().__init__(f"cp2110://{device}")
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True):
+ def _send_command(
+ self,
+ command: int,
+ message: bytes = _EMPTY_MESSAGE,
+ validate_response: bool = True,
+ ) -> Tuple[int, bytes]:
pkt = _make_packet(command, message)
logging.debug("sending packet: %s", binascii.hexlify(pkt))
@@ -151,7 +159,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return response.data.value.command, response.data.value.message
- def connect(self):
+ def connect(self) -> None:
response_command, message = self._send_command(
_CONNECT_REQUEST, validate_response=False
)
@@ -168,24 +176,24 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
f"Invalid model identified: {model_message!r}"
)
- def disconnect(self):
+ def disconnect(self) -> None:
pass
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo("TaiDoc TD-4277 glucometer")
- def get_version(self): # pylint: disable=no-self-use
+ def get_version(self) -> NoReturn: # pylint: disable=no-self-use
raise NotImplementedError
- def get_serial_number(self): # pylint: disable=no-self-use
+ def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use
raise NotImplementedError
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
_, message = self._send_command(_GET_DATETIME)
return _parse_datetime(message)
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
assert date.year >= 2000
day_struct = _DAY_BITSTRUCT.build(
@@ -202,12 +210,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return _parse_datetime(message)
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
_, message = self._send_command(_GET_READING_COUNT)
return _READING_COUNT_STRUCT.parse(message).count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> common.GlucoseReading:
_, reading_date_message = self._send_command(
_GET_READING_DATETIME, _select_record(record_id)
)
@@ -222,14 +230,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
reading_date, reading_value.value, meal=reading_value.meal
)
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
yield self._get_reading(record_id)
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_command(_CLEAR_MEMORY)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> NoReturn:
"""Maybe this could be implemented by someone who knows the device"""
raise NotImplementedError
diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py
index 1f72308..d56dbe6 100644
--- a/glucometerutils/exceptions.py
+++ b/glucometerutils/exceptions.py
@@ -3,6 +3,8 @@
# SPDX-License-Identifier: MIT
"""Common exceptions for glucometerutils."""
+from typing import Any, Optional
+
class Error(Exception):
"""Base class for the errors."""
@@ -15,42 +17,43 @@ class CommandLineError(Error):
class ConnectionFailed(Error):
"""It was not possible to connect to the meter."""
- def __init__(self, message="Unable to connect to the meter."):
- super(ConnectionFailed, self).__init__(message)
+ def __init__(self, message: str = "Unable to connect to the meter.") -> None:
+ super().__init__(message)
class CommandError(Error):
"""It was not possible to send a command to the device."""
- def __init__(self, message="Unable to send command to device."):
- super(CommandError, self).__init__(message)
+ def __init__(self, message: str = "Unable to send command to device.") -> None:
+ super().__init__(message)
class InvalidResponse(Error):
"""The response received from the meter was not understood"""
- def __init__(self, response):
- super(InvalidResponse, self).__init__(f"Invalid response received:\n{response}")
+ def __init__(self, response: str) -> None:
+ super().__init__(f"Invalid response received:\n{response}")
class InvalidChecksum(InvalidResponse):
- def __init__(self, wire, calculated):
- super(InvalidChecksum, self).__init__(
- f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)"
- )
+ def __init__(self, wire: int, calculated: Optional[int]) -> None:
+ if calculated is not None:
+ message = f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)"
+ else:
+ message = f"Unable to calculate checksum. Expected {wire:08x}."
+
+ super().__init__(message)
class InvalidGlucoseUnit(Error):
"""Unable to parse the given glucose unit"""
- def __init__(self, unit):
- super(InvalidGlucoseUnit, self).__init__(
- f"Invalid glucose unit received:\n{unit}"
- )
+ def __init__(self, unit: Any) -> None:
+ super().__init__(f"Invalid glucose unit received:\n{unit}")
class InvalidDateTime(Error):
"""The device has an invalid date/time setting."""
- def __init__(self):
- super(InvalidDateTime, self).__init__("Invalid date and time for device")
+ def __init__(self) -> None:
+ super().__init__("Invalid date and time for device")
diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py
index b44ee84..4ed8a7a 100644
--- a/glucometerutils/support/construct_extras.py
+++ b/glucometerutils/support/construct_extras.py
@@ -18,15 +18,15 @@ class Timestamp(construct.Adapter):
__slots__ = ["epoch"]
- def __init__(self, subcon, epoch=0):
+ def __init__(self, subcon, epoch: int = 0) -> None:
super(Timestamp, self).__init__(subcon)
self.epoch = epoch
- def _encode(self, obj, context, path):
+ def _encode(self, obj: datetime.datetime, context, path) -> int:
assert isinstance(obj, datetime.datetime)
epoch_date = datetime.datetime.utcfromtimestamp(self.epoch)
delta = obj - epoch_date
return int(delta.total_seconds())
- def _decode(self, obj, context, path):
+ def _decode(self, obj: int, context, path) -> datetime.datetime:
return datetime.datetime.utcfromtimestamp(obj + self.epoch)
diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py
index d3e45b7..2bd4264 100644
--- a/glucometerutils/support/driver_base.py
+++ b/glucometerutils/support/driver_base.py
@@ -1,38 +1,45 @@
-from abc import ABC, abstractmethod
-from datetime import datetime
-from typing import Optional, Text
+# -*- coding: utf-8 -*-
+#
+# SPDX-License-Identifier: MIT
+import abc
+import datetime
+from typing import Generator, Optional, Text
-class GlucometerDriver(ABC):
- def __init__(self, device_path):
- # type: (Optional[Text]) -> None
+from glucometerutils import common
+
+
+class GlucometerDriver(abc.ABC):
+ def __init__(self, device_path: Optional[Text]) -> None:
pass
- def connect(self):
+ def connect(self) -> None:
pass
- def disconnect(self):
+ def disconnect(self) -> None:
pass
- @abstractmethod
- def get_meter_info(self):
+ @abc.abstractmethod
+ def get_meter_info(self) -> common.MeterInfo:
"""Return the device information in structured form."""
pass
- @abstractmethod
- def get_serial_number(self):
+ @abc.abstractmethod
+ def get_serial_number(self) -> str:
pass
- @abstractmethod
- def get_glucose_unit(self):
+ @abc.abstractmethod
+ def get_glucose_unit(self) -> common.Unit:
"""Returns the glucose unit of the device."""
pass
- @abstractmethod
- def get_datetime(self):
+ @abc.abstractmethod
+ def get_datetime(self) -> datetime.datetime:
pass
- def set_datetime(self, date=None):
+ def set_datetime(
+ self, date: Optional[datetime.datetime] = None
+ ) -> datetime.datetime:
"""Sets the date and time of the glucometer.
Args:
@@ -43,17 +50,17 @@ class GlucometerDriver(ABC):
A datetime object built according to the returned response.
"""
if not date:
- date = datetime.now()
+ date = datetime.datetime.now()
return self._set_device_datetime(date)
- @abstractmethod
- def _set_device_datetime(self, date):
+ @abc.abstractmethod
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
pass
- @abstractmethod
- def zero_log(self):
+ @abc.abstractmethod
+ def zero_log(self) -> None:
pass
- @abstractmethod
- def get_readings(self):
+ @abc.abstractmethod
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
pass
diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py
index c77282b..245fde7 100644
--- a/glucometerutils/support/freestyle.py
+++ b/glucometerutils/support/freestyle.py
@@ -12,7 +12,7 @@ import csv
import datetime
import logging
import re
-from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple
+from typing import AnyStr, Callable, Iterator, List, Optional, Tuple
import construct
@@ -45,9 +45,10 @@ _ALWAYS_UNENCRYPTED_MESSAGES = (
)
-def _create_matcher(message_type, content):
- # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool]
- def _matcher(message):
+def _create_matcher(
+ message_type: int, content: Optional[bytes]
+) -> Callable[[Tuple[int, bytes]], bool]:
+ def _matcher(message: Tuple[int, bytes]) -> bool:
return message[0] == message_type and (content is None or content == message[1])
return _matcher
@@ -91,8 +92,7 @@ _MULTIRECORDS_FORMAT = re.compile(
)
-def _verify_checksum(message, expected_checksum_hex):
- # type: (AnyStr, AnyStr) -> None
+def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None:
"""Calculate the simple checksum of the message and compare with expected.
Args:
@@ -116,7 +116,7 @@ def _verify_checksum(message, expected_checksum_hex):
raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum)
-def convert_ketone_unit(raw_value):
+def convert_ketone_unit(raw_value: float) -> float:
"""Convert raw ketone value as read in the device to its value in mmol/L.
As per https://protocols.glucometers.tech/abbott/freestyle-libre this is
@@ -132,9 +132,12 @@ ABBOTT_VENDOR_ID = 0x1A61
class FreeStyleHidSession:
def __init__(
- self, product_id, device_path, text_message_type, text_reply_message_type
- ):
- # type: (int, Optional[Text], int, int) -> None
+ self,
+ product_id: int,
+ device_path: Optional[str],
+ text_message_type: int,
+ text_reply_message_type: int,
+ ) -> None:
self._hid_session = hiddevice.HidSession(
(ABBOTT_VENDOR_ID, product_id), device_path
@@ -151,13 +154,12 @@ class FreeStyleHidSession:
f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}"
)
- def send_command(self, message_type, command, encrypted=False):
- # type: (int, bytes, bool) -> None
+ def send_command(self, message_type: int, command: bytes, encrypted: bool = False):
"""Send a raw command to the device.
Args:
- message_type: (int) The first byte sent with the report to the device.
- command: (bytes) The command to send out the device.
+ message_type: The first byte sent with the report to the device.
+ command: The command to send out the device.
"""
if encrypted:
assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES
@@ -172,8 +174,7 @@ class FreeStyleHidSession:
logging.debug("Sending packet: %r", usb_packet)
self._hid_session.write(usb_packet)
- def read_response(self, encrypted=False):
- # type: (bool) -> Tuple[int, bytes]
+ def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]:
"""Read the response from the device and extracts it."""
usb_packet = self._hid_session.read()
@@ -211,8 +212,7 @@ class FreeStyleHidSession:
return message
- def send_text_command(self, command):
- # type: (bytes) -> Text
+ def send_text_command(self, command: bytes) -> str:
"""Send a command to the device that expects a text reply."""
self.send_command(self._text_message_type, command)
@@ -237,13 +237,13 @@ class FreeStyleHidSession:
match = _TEXT_REPLY_FORMAT.search(full_content)
if not match:
- raise exceptions.InvalidResponse(full_content)
+ raise exceptions.InvalidResponse(repr(full_content))
message = match.group("message")
_verify_checksum(message, match.group("checksum"))
if match.group("status") != b"OK":
- raise exceptions.InvalidResponse(message or "Command failed")
+ raise exceptions.InvalidResponse(repr(message) or "Command failed")
# If there is anything in the response that is not ASCII-safe, this is
# probably in the patient name. The Windows utility does not seem to
@@ -251,8 +251,7 @@ class FreeStyleHidSession:
# unknown codepoint.
return message.decode("ascii", "replace")
- def query_multirecord(self, command):
- # type: (bytes) -> Iterator[List[Text]]
+ def query_multirecord(self, command: bytes) -> Iterator[List[str]]:
"""Queries for, and returns, "multirecords" results.
Multirecords are used for querying events, readings, history and similar
@@ -298,43 +297,44 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver):
though.
"""
- def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60):
- # type: (int, Optional[Text], int, int) -> None
+ def __init__(
+ self,
+ product_id: int,
+ device_path: Optional[str],
+ text_cmd: int = 0x60,
+ text_reply_cmd: int = 0x60,
+ ) -> None:
super().__init__(device_path)
self._session = FreeStyleHidSession(
product_id, device_path, text_cmd, text_reply_cmd
)
- def connect(self):
+ def connect(self) -> None:
"""Open connection to the device, starting the knocking sequence."""
self._session.connect()
- def disconnect(self):
+ def disconnect(self) -> None:
"""Disconnect the device, nothing to be done."""
pass
# Some of the commands are also shared across devices that use this HID
# protocol, but not many. Only provide here those that do seep to change
# between them.
- def _get_version(self):
- # type: () -> Text
+ def _get_version(self) -> str:
"""Return the software version of the device."""
return self._session.send_text_command(b"$swver?").rstrip("\r\n")
- def get_serial_number(self):
- # type: () -> Text
+ def get_serial_number(self) -> str:
"""Returns the serial number of the device."""
return self._session.send_text_command(b"$serlnum?").rstrip("\r\n")
- def get_patient_name(self):
- # type: () -> Optional[Text]
+ def get_patient_name(self) -> Optional[str]:
patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n")
if not patient_name:
return None
return patient_name
- def set_patient_name(self, name):
- # type: (Text) -> None
+ def set_patient_name(self, name: str) -> None:
try:
encoded_name = name.encode("ascii")
except UnicodeDecodeError:
@@ -342,8 +342,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver):
self._session.send_text_command(b"$ptname," + encoded_name)
- def get_datetime(self):
- # type: () -> datetime.datetime
+ def get_datetime(self) -> datetime.datetime:
"""Gets the date and time as reported by the device.
This is one of the few commands that appear common to many of the
@@ -364,9 +363,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver):
except ValueError:
raise exceptions.InvalidDateTime()
- def _set_device_datetime(self, date):
- # type: (datetime.datetime) -> datetime.datetime
-
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
# The format used by the FreeStyle devices is not composable based on
# standard strftime() (namely it includes no leading zeros), so we need
# to build it manually.
diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py
index 41ad17c..124b2e6 100644
--- a/glucometerutils/support/hiddevice.py
+++ b/glucometerutils/support/hiddevice.py
@@ -6,7 +6,7 @@
import logging
import os
-from typing import BinaryIO, Optional, Text, Tuple
+from typing import BinaryIO, Optional, Tuple
from glucometerutils import exceptions
@@ -18,8 +18,12 @@ class HidSession:
methods abstracting the HID library.
"""
- def __init__(self, usb_id, device, timeout_ms=0):
- # type: (Optional[Tuple[int, int]], Optional[Text], int) -> None
+ def __init__(
+ self,
+ usb_id: Optional[Tuple[int, int]],
+ device: Optional[str],
+ timeout_ms: int = 0,
+ ) -> None:
"""Construct a new session object.
Args:
@@ -66,8 +70,7 @@ class HidSession:
message=f"Unable to connect to meter: {e}."
)
- def write(self, report):
- # type: (bytes) -> None
+ def write(self, report: bytes) -> None:
"""Writes a report to the HID handle."""
if self.handle_:
@@ -78,8 +81,7 @@ class HidSession:
if written < 0:
raise exceptions.CommandError()
- def read(self, size=64):
- # type: (int) -> bytes
+ def read(self, size: int = 64) -> bytes:
"""Read a report from the HID handle.
This is important as it handles the one incompatible interface between
diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py
index 1c329c6..20869f9 100644
--- a/glucometerutils/support/lifescan.py
+++ b/glucometerutils/support/lifescan.py
@@ -9,7 +9,7 @@ from glucometerutils import exceptions
class MissingChecksum(exceptions.InvalidResponse):
"""The response misses the expected 4-digits checksum."""
- def __init__(self, response):
+ def __init__(self, response: str):
super(MissingChecksum, self).__init__(
f"Response is missing checksum: {response}"
)
@@ -18,19 +18,18 @@ class MissingChecksum(exceptions.InvalidResponse):
class InvalidSerialNumber(exceptions.Error):
"""The serial number is not as expected."""
- def __init__(self, serial_number):
+ def __init__(self, serial_number: str):
super(InvalidSerialNumber, self).__init__(
f"Serial number {serial_number} is invalid."
)
class MalformedCommand(exceptions.InvalidResponse):
- def __init__(self, message):
+ def __init__(self, message: str):
super(MalformedCommand, self).__init__(f"Malformed command: {message}")
-def crc_ccitt(data):
- # type: (bytes) -> int
+def crc_ccitt(data: bytes) -> int:
"""Calculate the CRC-16-CCITT with LifeScan's common seed.
Args:
diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py
index 441226e..1cef4d9 100644
--- a/glucometerutils/support/lifescan_binary_protocol.py
+++ b/glucometerutils/support/lifescan_binary_protocol.py
@@ -24,8 +24,9 @@ _LINK_CONTROL = construct.BitStruct(
)
-def LifeScanPacket(include_link_control): # pylint: disable=invalid-name
- # type: (bool) -> construct.Struct
+def LifeScanPacket(
+ include_link_control: bool,
+) -> construct.Struct: # pylint: disable=invalid-name
if include_link_control:
link_control_construct = _LINK_CONTROL
else:
diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py
index d9e80ea..6a2d142 100644
--- a/glucometerutils/support/serial.py
+++ b/glucometerutils/support/serial.py
@@ -5,9 +5,10 @@
"""
import logging
-from typing import Optional, Text
+from typing import Optional
import serial
+
from glucometerutils import exceptions
@@ -37,13 +38,12 @@ class SerialDevice:
"""
- BAUDRATE = None # type: int
- DEFAULT_CABLE_ID = None # type: Text
+ BAUDRATE: Optional[int] = None
+ DEFAULT_CABLE_ID: Optional[str] = None
- TIMEOUT = 1 # type: float
+ TIMEOUT: float = 1
- def __init__(self, device):
- # type: (Optional[Text]) -> None
+ def __init__(self, device: Optional[str]) -> None:
assert self.BAUDRATE is not None
if not device and self.DEFAULT_CABLE_ID:
diff --git a/glucometerutils/tests/test_common.py b/glucometerutils/tests/test_common.py
index 3e733e3..2a3a23d 100644
--- a/glucometerutils/tests/test_common.py
+++ b/glucometerutils/tests/test_common.py
@@ -6,10 +6,13 @@
# pylint: disable=protected-access,missing-docstring
import datetime
+import unittest
from absl.testing import parameterized
from glucometerutils import common
+TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45)
+
class TestGlucoseConversion(parameterized.TestCase):
def test_convert_to_mmol(self):
@@ -43,11 +46,8 @@ class TestGlucoseConversion(parameterized.TestCase):
class TestGlucoseReading(parameterized.TestCase):
-
- TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45)
-
def test_minimal(self):
- reading = common.GlucoseReading(self.TEST_DATETIME, 100)
+ reading = common.GlucoseReading(TEST_DATETIME, 100)
self.assertEqual(
reading.as_csv(common.Unit.MG_DL),
'"2018-01-01 00:30:45","100.00","","blood sample",""',
@@ -57,7 +57,7 @@ class TestGlucoseReading(parameterized.TestCase):
("_mgdl", common.Unit.MG_DL, 100), ("_mmoll", common.Unit.MMOL_L, 5.56)
)
def test_value(self, unit, expected_value):
- reading = common.GlucoseReading(self.TEST_DATETIME, 100)
+ reading = common.GlucoseReading(TEST_DATETIME, 100)
self.assertAlmostEqual(reading.get_value_as(unit), expected_value, places=2)
@parameterized.named_parameters(
@@ -98,10 +98,36 @@ class TestGlucoseReading(parameterized.TestCase):
),
)
def test_csv(self, kwargs_dict, expected_csv):
- reading = common.GlucoseReading(self.TEST_DATETIME, 100, **kwargs_dict)
+ reading = common.GlucoseReading(TEST_DATETIME, 100, **kwargs_dict)
self.assertEqual(reading.as_csv(common.Unit.MG_DL), expected_csv)
+class TestKetoneReading(unittest.TestCase):
+ def test_measure_method(self):
+ """Raise an exception if an invalid measurement method is provided.
+
+ We allow measure_method as a parameter for compatibility with the other
+ Readings, but we don't want anything _but_ the BLOOD_SAMPLE method.
+ """
+ with self.subTest("No measure_method parameter."):
+ self.assertIsNotNone(common.KetoneReading(TEST_DATETIME, 100))
+
+ with self.subTest("measure_method=MeasurementMethod.BLOOD_SAMPLE is valid"):
+ self.assertIsNotNone(
+ common.KetoneReading(
+ TEST_DATETIME,
+ 100,
+ measure_method=common.MeasurementMethod.BLOOD_SAMPLE,
+ )
+ )
+
+ with self.subTest("measure_method=MeasurementMethod.TIME raises ValueError"):
+ with self.assertRaises(ValueError):
+ common.KetoneReading(
+ TEST_DATETIME, 100, measure_method=common.MeasurementMethod.TIME
+ )
+
+
class TestMeterInfo(parameterized.TestCase):
@parameterized.named_parameters(
("_no_serial_number", {}, "Serial Number: N/A\n"),
diff --git a/pyproject.toml b/pyproject.toml
index 3ab8a28..4e2ae7e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,6 +35,6 @@ multi_line_output = 3
include_trailing_comma = true
known_first_party = ['glucometerutils']
-known_third_party = ['construct', 'hidapi', 'pyserial', 'pyscsi']
+known_third_party = ['construct', 'hidapi', 'pyscsi', 'serial']
[tool.setuptools_scm]