diff options
23 files changed, 409 insertions, 305 deletions
diff --git a/glucometerutils/common.py b/glucometerutils/common.py index ddb2607..bf944c8 100644 --- a/glucometerutils/common.py +++ b/glucometerutils/common.py @@ -6,7 +6,7 @@ import datetime import enum import textwrap -from typing import Any, Dict, Optional, Sequence +from typing import Any, Dict, Optional, Sequence, Union import attr @@ -91,6 +91,10 @@ class KetoneReading: timestamp: datetime.datetime value: float comment: str = "" + measure_method: MeasurementMethod = attr.ib( + default=MeasurementMethod.BLOOD_SAMPLE, + validator=attr.validators.in_({MeasurementMethod.BLOOD_SAMPLE}), + ) extra_data: Dict[str, Any] = attr.Factory(dict) def as_csv(self, unit: Unit) -> str: @@ -100,7 +104,7 @@ class KetoneReading: return '"%s","%.2f","%s","%s"' % ( self.timestamp, self.value, - MeasurementMethod.BLOOD_SAMPLE.value, + self.measure_method.value, self.comment, ) @@ -123,6 +127,9 @@ class TimeAdjustment: ) +AnyReading = Union[GlucoseReading, KetoneReading, TimeAdjustment] + + @attr.s(auto_attribs=True) class MeterInfo: """General information about the meter. diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index e6dc2ea..de29064 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -18,6 +18,7 @@ import csv import datetime import glob import os +from typing import Dict, Generator, NoReturn, Optional from glucometerutils import common, exceptions from glucometerutils.support import driver_base @@ -46,7 +47,7 @@ _DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT)) class Device(driver_base.GlucometerDriver): - def __init__(self, device): + def __init__(self, device: Optional[str]) -> None: if not device or not os.path.isdir(device): raise exceptions.CommandLineError( "--device parameter is required, should point to mount path " @@ -62,7 +63,7 @@ class Device(driver_base.GlucometerDriver): self.report_file = report_files[0] - def _get_records_reader(self): + def _get_records_reader(self) -> csv.DictReader: self.report.seek(0) # Skip the first two lines next(self.report) @@ -72,51 +73,55 @@ class Device(driver_base.GlucometerDriver): self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE ) - def connect(self): + def connect(self) -> None: self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8") - def disconnect(self): + def disconnect(self) -> None: self.report.close() - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( f"{self.get_model()} glucometer", serial_number=self.get_serial_number(), native_unit=self.get_glucose_unit(), ) - def get_model(self): + def get_model(self) -> str: # $device/MODEL/Reports/*.csv return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) - def get_serial_number(self): + def get_serial_number(self) -> str: self.report.seek(0) # ignore the first line. next(self.report) # The second line of the CSV is serial-no;report-date;report-time;;;;;;; return next(self.report).split(";")[0] - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: # Get the first record available and parse that. record = next(self._get_records_reader()) return _UNIT_MAP[record[_UNIT_CSV_KEY]] - def get_datetime(self): + def get_datetime(self) -> NoReturn: raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> NoReturn: raise NotImplementedError - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def _extract_datetime(self, record): # pylint: disable=no-self-use + def _extract_datetime( + self, record: Dict[str, str] + ) -> datetime.datetime: # pylint: disable=no-self-use # Date and time are in separate column, but we want to parse them # together. date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) - def _extract_meal(self, record): # pylint: disable=no-self-use + def _extract_meal( + self, record: Dict[str, str] + ) -> common.Meal: # pylint: disable=no-self-use if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: raise exceptions.InvalidResponse("Reading cannot be before and after meal.") elif record[_AFTER_MEAL_CSV_KEY]: @@ -126,7 +131,7 @@ class Device(driver_base.GlucometerDriver): else: return common.Meal.NONE - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: for record in self._get_records_reader(): if record[_RESULT_CSV_KEY] is None: continue diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 095f920..8696b04 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -19,12 +19,13 @@ http://protocols.ascensia.com/Programming-Guide.aspx """ import datetime +from typing import Dict, Generator, NoReturn, Optional from glucometerutils import common from glucometerutils.support import contourusb -def _extract_timestamp(parsed_record, prefix=""): +def _extract_timestamp(parsed_record: Dict[str, str]): """Extract the timestamp from a parsed record. This leverages the fact that all the reading records have the same base structure. @@ -42,12 +43,12 @@ def _extract_timestamp(parsed_record, prefix=""): class Device(contourusb.ContourHidDevice): - """Glucometer driver for FreeStyle Libre devices.""" + """Glucometer driver for Contour devices.""" - def __init__(self, device): - self._hid_session = contourusb.ContourHidSession((0x1A79, 0x6002), device) + def __init__(self, device: Optional[str]) -> None: + super().__init__((0x1A79, 0x6002), device) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: self._get_info_record() return common.MeterInfo( "Contour USB", @@ -56,13 +57,13 @@ class Device(contourusb.ContourHidDevice): native_unit=self.get_glucose_unit(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: if self._get_glucose_unit() == "0": return common.Unit.MG_DL else: return common.Unit.MMOL_L - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """ Get reading dump from download data mode(all readings stored) This meter supports only blood samples @@ -75,11 +76,11 @@ class Device(contourusb.ContourHidDevice): measure_method=common.MeasurementMethod.BLOOD_SAMPLE, ) - def get_serial_number(self): + def get_serial_number(self) -> NoReturn: raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> NoReturn: raise NotImplementedError - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index a3e54fb..e984719 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -18,6 +18,7 @@ Xavier Claessens. import collections import datetime +from typing import Generator, NoReturn, Optional from glucometerutils import common from glucometerutils.support import freestyle @@ -51,10 +52,10 @@ _InsulinxReading = collections.namedtuple( class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle InsuLinux devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]) -> None: super().__init__(0x3460, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle InsuLinx", @@ -63,11 +64,11 @@ class Device(freestyle.FreeStyleHidDevice): native_unit=self.get_glucose_unit(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" return common.Unit.MG_DL - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterate through the reading records in the device.""" for record in self._session.query_multirecord(b"$result?"): if not record or record[0] != _TYPE_GLUCOSE_READING: @@ -87,5 +88,5 @@ class Device(freestyle.FreeStyleHidDevice): yield common.GlucoseReading(timestamp, raw_reading.value) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index 2e37ec1..558dcf4 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -20,6 +20,7 @@ https://protocols.glucometers.tech/abbott/freestyle-libre """ import datetime +from typing import Dict, Generator, Mapping, Optional, Sequence, Tuple, Type from glucometerutils import common from glucometerutils.support import freestyle @@ -70,7 +71,9 @@ _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = ( _ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),) -def _parse_record(record, entry_map): +def _parse_record( + record: Sequence[str], entry_map: Sequence[Tuple[int, str]] +) -> Dict[str, int]: """Parses a list of string fields into a dictionary of integers.""" if not record: @@ -82,7 +85,9 @@ def _parse_record(record, entry_map): return {} -def _extract_timestamp(parsed_record, prefix=""): +def _extract_timestamp( + parsed_record: Mapping[str, int], prefix: str = "" +) -> datetime.datetime: """Extract the timestamp from a parsed record. This leverages the fact that all the records have the same base structure. @@ -98,7 +103,7 @@ def _extract_timestamp(parsed_record, prefix=""): ) -def _parse_arresult(record): +def _parse_arresult(record: Sequence[str]) -> Optional[common.AnyReading]: """Takes an array of string fields as input and parses it into a Reading.""" parsed_record = _parse_record(record, _BASE_ENTRY_MAP) @@ -126,9 +131,9 @@ def _parse_arresult(record): return None comment_parts = [] - measure_method = None - cls = None - value = None + measure_method: Optional[common.MeasurementMethod] = None + cls: Optional[Type[common.AnyReading]] = None + value: Optional[float] = None if parsed_record["reading-type"] == 2: comment_parts.append("(Scan)") @@ -145,7 +150,10 @@ def _parse_arresult(record): measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.KetoneReading # automatically convert the raw value in mmol/L - value = freestyle.convert_ketone_unit(parsed_record["value"]) + raw_value = parsed_record["value"] + if raw_value is None: + raise ValueError(f"Invalid Ketone value: {parsed_record!r}") + value = freestyle.convert_ketone_unit(raw_value) else: # unknown reading return None @@ -183,7 +191,7 @@ def _parse_arresult(record): else: comment_parts.append("Rapid-acting insulin") - return cls( + reading = cls( _extract_timestamp(parsed_record), value, comment="; ".join(comment_parts), @@ -191,14 +199,16 @@ def _parse_arresult(record): extra_data={"device_id": parsed_record["device_id"]}, ) + return reading + class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Libre devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]) -> None: super().__init__(0x3650, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle Libre", @@ -208,18 +218,17 @@ class Device(freestyle.FreeStyleHidDevice): patient_name=self.get_patient_name(), ) - def get_serial_number(self): + def get_serial_number(self) -> str: """Overridden function as the command is not compatible.""" return self._session.send_text_command(b"$sn?").rstrip("\r\n") - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" # TODO(Flameeyes): figure out how to identify the actual unit on the # device. return common.Unit.MG_DL - def get_readings(self): - + def get_readings(self) -> Generator[common.AnyReading, None, None]: # First of all get the usually longer list of sensor readings, and # convert them to Readings objects. for record in self._session.query_multirecord(b"$history?"): @@ -244,5 +253,5 @@ class Device(freestyle.FreeStyleHidDevice): if reading: yield reading - def zero_log(self): + def zero_log(self) -> None: self._session.send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index 77244af..cafd539 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -19,6 +19,7 @@ https://protocols.glucometers.tech/abbott/freestyle-optium import datetime import logging import re +from typing import Generator, NoReturn, Sequence from glucometerutils import common, exceptions from glucometerutils.support import driver_base, serial @@ -64,8 +65,8 @@ _MONTH_MATCHES = { } -def _parse_clock(datestr): - """Convert the date/time string used by the the device into a datetime. +def _parse_clock(datestr: str) -> datetime.datetime: + """Convert the date/time string used by the device into a datetime. Args: datestr: a string as returned by the device during information handling. @@ -88,7 +89,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 DEFAULT_CABLE_ID = "1a61:3420" - def _send_command(self, command): + def _send_command(self, command: str) -> Sequence[str]: cmd_bytes = bytes(f"$%s\r\n" % command, "ascii") logging.debug("Sending command: %r", cmd_bytes) @@ -104,14 +105,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response] return decoded_response - def connect(self): + def connect(self) -> None: self._send_command("xmem") # ignore output this time self._fetch_device_information() - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _fetch_device_information(self): + def _fetch_device_information(self) -> None: data = self._send_command("colq") for line in data: @@ -134,7 +135,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): # back the commands and not replying to them. raise exceptions.ConnectionFailed() - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Fetch and parses the device information. Returns: @@ -147,7 +148,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: """Returns an identifier of the firmware version of the glucometer. Returns: @@ -155,7 +156,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_version_ - def get_serial_number(self): + def get_serial_number(self) -> str: """Retrieve the serial number of the device. Returns: @@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_serialno_ - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: """Returns a constant representing the unit displayed by the meter. Returns: @@ -172,7 +173,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ return self.device_glucose_unit_ - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: """Returns the current date and time for the glucometer. Returns: @@ -188,7 +189,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse("\n".join(data)) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M")) parsed_data = "".join(data) @@ -197,10 +198,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return self.get_datetime() - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterates over the reading values stored in the glucometer. Args: diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 3009c7b..d6c6975 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -21,8 +21,9 @@ https://protocols.glucometers.tech/abbott/freestyle-precision-neo """ -import collections +import dataclasses import datetime +from typing import Generator, NoReturn, Optional, Sequence, Type from glucometerutils import common from glucometerutils.support import freestyle @@ -31,35 +32,40 @@ from glucometerutils.support import freestyle _TYPE_GLUCOSE_READING = "7" _TYPE_KETONE_READING = "9" -_NeoReading = collections.namedtuple( - "_NeoReading", - ( - "type", # 7 = blood glucose, 9 = blood ketone - "id", - "month", - "day", - "year", # year is two-digits - "hour", - "minute", - "unknown2", - "value", - # Extra trailing and so-far-unused fields; so discard them: - # * for blood glucose: 10 unknown trailing fields - # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', - # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', - # * for blood ketone: 2 unknown trailing fields - # 'unknown3', 'unknown4', - ), -) + +@dataclasses.dataclass +class _NeoReading: + type: int # 7 = blood glucose, 9 = blood ketone + id: int + month: int + day: int + year: int # year is two-digits + hour: int + minute: int + unknown: int + value: float + # Extra trailing and so-far-unused fields; so discard them: + # * for blood glucose: 10 unknown trailing fields + # 'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', + # 'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', + # * for blood ketone: 2 unknown trailing fields + # 'unknown3', 'unknown4', + + def __init__(self, record: Sequence[str]) -> None: + for idx, field in enumerate(dataclasses.fields(self)): + if record[idx] == "HI": + setattr(self, field.name, float("inf")) + else: + setattr(self, field.name, int(record[idx])) class Device(freestyle.FreeStyleHidDevice): """Glucometer driver for FreeStyle Precision Neo devices.""" - def __init__(self, device_path): + def __init__(self, device_path: Optional[str]): super().__init__(0x3850, device_path) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" return common.MeterInfo( "FreeStyle Precision Neo", @@ -69,14 +75,14 @@ class Device(freestyle.FreeStyleHidDevice): patient_name=self.get_patient_name(), ) - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use """Returns the glucose unit of the device.""" return common.Unit.MG_DL - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterate through the reading records in the device.""" for record in self._session.query_multirecord(b"$result?"): - cls = None + cls: Optional[Type[common.AnyReading]] = None if record and record[0] == _TYPE_GLUCOSE_READING: cls = common.GlucoseReading elif record and record[0] == _TYPE_KETONE_READING: @@ -84,14 +90,9 @@ class Device(freestyle.FreeStyleHidDevice): else: continue - # Build a _reading object by parsing each of the entries in the raw + # Build a _NeoReading object by parsing each of the entries in the raw # record - values = [] - for value in record: - if value == "HI": - value = float("inf") - values.append(int(value)) - raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)]) + raw_reading = _NeoReading(record) timestamp = datetime.datetime( raw_reading.year + 2000, @@ -108,5 +109,5 @@ class Device(freestyle.FreeStyleHidDevice): yield cls(timestamp, value) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index 5e90b87..c1af4a0 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -15,6 +15,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import datetime import re +from typing import Generator from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, serial @@ -51,7 +52,7 @@ _DUMP_LINE_RE = re.compile( _RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$") -def _calculate_checksum(bytestring): +def _calculate_checksum(bytestring: bytes) -> int: """Calculate the checksum used by OneTouch Ultra and Ultra2 devices Args: @@ -71,7 +72,7 @@ def _calculate_checksum(bytestring): return checksum -def _validate_and_strip_checksum(line): +def _validate_and_strip_checksum(line: str) -> str: """Verify the simple 16-bit checksum and remove it from the line. Args: @@ -104,7 +105,7 @@ _DATETIME_RE = re.compile( ) -def _parse_datetime(response): +def _parse_datetime(response: str) -> datetime.datetime: """Convert a response with date and time from the meter into a datetime. Args: @@ -132,23 +133,23 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. - def connect(self): # pylint: disable=no-self-use + def connect(self) -> None: # pylint: disable=no-self-use return - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _send_command(self, cmd): + def _send_command(self, cmd: str) -> None: """Send command interface. Args: cmd: command and parameters to send (without newline) """ - cmdstring = bytes("\x11\r" + cmd + "\r", "ascii") + cmdstring = bytes(f"\x11\r{cmd}\r", "ascii") self.serial_.write(cmdstring) self.serial_.flush() - def _send_oneliner_command(self, cmd): + def _send_oneliner_command(self, cmd: str) -> str: """Send command and read a one-line response. Args: @@ -163,7 +164,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): line = self.serial_.readline().decode("ascii") return _validate_and_strip_checksum(line) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: """Fetch and parses the device information. Returns: @@ -176,7 +177,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: """Returns an identifier of the firmware version of the glucometer. Returns: @@ -192,7 +193,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): _SERIAL_NUMBER_RE = re.compile('^@ "([A-Z0-9]{9})"$') - def get_serial_number(self): + def get_serial_number(self) -> str: """Retrieve the serial number of the device. Returns: @@ -219,7 +220,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: """Returns the current date and time for the glucometer. Returns: @@ -228,13 +229,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response = self._send_oneliner_command("DMF") return _parse_datetime(response[2:]) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_oneliner_command( "DMT" + date.strftime("%m/%d/%y %H:%M:%S") ) return _parse_datetime(response[2:]) - def zero_log(self): + def zero_log(self) -> None: """Zeros out the data log of the device. This function will clear the memory of the device deleting all the @@ -246,7 +247,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: """Returns a constant representing the unit displayed by the meter. Returns: @@ -264,6 +265,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response = self._send_oneliner_command("DMSU?") match = self._GLUCOSE_UNIT_RE.match(response) + if match is None: + raise exceptions.InvalidGlucoseUnit(response) + unit = match.group(1) if unit == "MG/DL ": @@ -274,7 +278,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidGlucoseUnit(response) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: """Iterates over the reading values stored in the glucometer. Args: diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 41e2acd..d5eb3c9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -18,6 +18,7 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import binascii import datetime import logging +from typing import Any, Dict, Generator, Optional import construct @@ -92,7 +93,13 @@ _READING_RESPONSE = construct.Struct( ) -def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): +def _make_packet( + message: bytes, + sequence_number: int, + expect_receive: bool, + acknowledge: bool, + disconnect: bool, +): return _PACKET.build( { "data": { @@ -115,24 +122,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__(device) + def __init__(self, device: Optional[str]) -> None: + super().__init__(device) self.sent_counter_ = False self.expect_receive_ = False self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def connect(self): + def connect(self) -> None: try: self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def disconnect(self): + def disconnect(self) -> None: self.connect() - def _send_packet(self, message, acknowledge=False, disconnect=False): + def _send_packet( + self, message: bytes, acknowledge: bool = False, disconnect: bool = False + ) -> None: pkt = _make_packet( message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect ) @@ -141,7 +150,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.serial_.write(pkt) self.serial_.flush() - def _read_packet(self): + def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) @@ -157,14 +166,19 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return pkt - def _send_ack(self): + def _send_ack(self) -> None: self._send_packet(b"", acknowledge=True, disconnect=False) - def _read_ack(self): + def _read_ack(self) -> None: pkt = self._read_packet() assert pkt.link_control.acknowledge - def _send_request(self, request_format, request_obj, response_format): + def _send_request( + self, + request_format: construct.Struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request, acknowledge=False, disconnect=False) @@ -182,7 +196,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), @@ -190,26 +204,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version - def get_serial_number(self): + def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE ) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "write", "timestamp": date}, @@ -217,17 +231,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.timestamp - def zero_log(self): + def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": _INVALID_RECORD}, @@ -235,14 +249,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) return common.GlucoseReading(response.timestamp, float(response.value)) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index 3882d70..4e9138a 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -23,7 +23,9 @@ https://protocols.glucometers.tech/lifescan/onetouch-verio-2015 """ import binascii +import datetime import logging +from typing import Any, Dict, Generator, Optional import construct from pyscsi.pyscsi.scsi import SCSI @@ -105,19 +107,21 @@ _READ_RECORD_RESPONSE = construct.Struct( class Device(driver_base.GlucometerDriver): - def __init__(self, device): + def __init__(self, device: Optional[str]) -> None: if not device: raise exceptions.CommandLineError( "--device parameter is required, should point to the disk " "device representing the meter." ) + super().__init__(device) + self.device_name_ = device self.scsi_device_ = SCSIDevice(device, readwrite=True) self.scsi_ = SCSI(self.scsi_device_) self.scsi_.blocksize = _REGISTER_SIZE - def connect(self): + def connect(self) -> None: inq = self.scsi_.inquiry() logging.debug("Device connected: %r", inq.result) vendor = inq.result["t10_vendor_identification"][:32] @@ -126,14 +130,20 @@ class Device(driver_base.GlucometerDriver): f"Device {self.device_name_} is not a LifeScan glucometer." ) - def disconnect(self): # pylint: disable=no-self-use + def disconnect(self) -> None: # pylint: disable=no-self-use return - def _send_request(self, lba, request_format, request_obj, response_format): + def _send_request( + self, + lba: int, + request_format: construct.Struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: """Send a request to the meter, and read its response. Args: - lba: (int) the address of the block register to use, known + lba: the address of the block register to use, known valid addresses are 3, 4 and 5. request_format: a construct format identifier of the request to send request_obj: the object to format with the provided identifier @@ -168,14 +178,14 @@ class Device(driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def _query_string(self, selector): + def _query_string(self, selector: str) -> str: response = self._send_request( 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE ) return response.value - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: model = self._query_string("model") return common.MeterInfo( f"OneTouch {model} glucometer", @@ -184,39 +194,39 @@ class Device(driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_serial_number(self): + def get_serial_number(self) -> str: return self._query_string("serial") - def get_version(self): + def get_version(self) -> str: return self._query_string("software") - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() - def zero_log(self): + def zero_log(self) -> None: self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE ) @@ -224,7 +234,7 @@ class Device(driver_base.GlucometerDriver): response.timestamp, float(response.value), meal=response.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py index 69f88de..b4a4a6c 100644 --- a/glucometerutils/drivers/otverioiq.py +++ b/glucometerutils/drivers/otverioiq.py @@ -16,7 +16,9 @@ auto-detected. """ import binascii +import datetime import logging +from typing import Any, Dict, Generator, Optional import construct @@ -101,18 +103,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__(device) + def __init__(self, device: Optional[str]) -> None: + super().__init__(device) self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_packet(self, message): + def _send_packet(self, message: bytes) -> None: pkt = _PACKET.build({"data": {"value": {"message": message}}}) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() - def _read_packet(self): + def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) @@ -121,7 +123,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return pkt - def _send_request(self, request_format, request_obj, response_format): + def _send_request( + self, + request_format: construct.struct, + request_obj: Optional[Dict[str, Any]], + response_format: construct.Struct, + ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request) @@ -132,7 +139,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Verio IQ glucometer", serial_number=self.get_serial_number(), @@ -140,47 +147,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): native_unit=self.get_glucose_unit(), ) - def get_version(self): + def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version - def get_serial_number(self): + def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() - def zero_log(self): + def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) - def get_glucose_unit(self): + def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit - def _get_reading_count(self): + def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE ) return response.count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> Optional[common.GlucoseReading]: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) @@ -193,7 +200,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): response.timestamp, float(response.value), meal=response.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): reading = self._get_reading(record_id) diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index 08a3afc..5d1d42c 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -21,6 +21,7 @@ import enum import functools import logging import operator +from typing import Generator, NoReturn import construct @@ -28,7 +29,7 @@ from glucometerutils import common, exceptions from glucometerutils.support import driver_base, serial -def xor_checksum(msg): +def xor_checksum(msg: bytes) -> int: return functools.reduce(operator.xor, msg) @@ -87,12 +88,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. - def read_message(self): + def read_message(self) -> bytes: pkt = _PACKET.parse_stream(self.serial_) logging.debug("received packet: %r", pkt) return pkt.message - def wait_and_ready(self): + def wait_and_ready(self) -> int: challenge = b"\0" while challenge == b"\0": @@ -126,37 +127,37 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return first_message.count - def send_message(self, message): + def send_message(self, message: bytes) -> None: pkt = _PACKET.build({"message": message, "direction": Direction.Out}) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) - def connect(self): # pylint: disable=no-self-use + def connect(self) -> None: # pylint: disable=no-self-use print("Please connect and turn on the device.") - def disconnect(self): + def disconnect(self) -> None: self.send_message(_DISCONNECT_MESSAGE) response = self.read_message() if response != _DISCONNECTED_MESSAGE: - raise exceptions.InvalidResponse(response=response) + raise exceptions.InvalidResponse(response=repr(response)) - def get_meter_info(self): # pylint: disable=no-self-use + def get_meter_info(self) -> common.MeterInfo: # pylint: disable=no-self-use return common.MeterInfo("SD CodeFree glucometer") - def get_version(self): # pylint: disable=no-self-use + def get_version(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_serial_number(self): # pylint: disable=no-self-use + def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_glucose_unit(self): # pylint: disable=no-self-use + def get_glucose_unit(self) -> common.Unit: # pylint: disable=no-self-use # Device does not provide information on glucose unit. return common.Unit.MG_DL - def get_datetime(self): # pylint: disable=no-self-use + def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii") # Ignore the readings count. @@ -165,17 +166,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.send_message(setdatecmd) response = self.read_message() if response != _DATE_SET_MESSAGE: - raise exceptions.InvalidResponse(response=response) + raise exceptions.InvalidResponse(response=repr(response)) # The date we return should only include up to minute, unfortunately. return datetime.datetime( date.year, date.month, date.day, date.hour, date.minute ) - def zero_log(self): + def zero_log(self) -> NoReturn: raise NotImplementedError - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: count = self.wait_and_ready() for _ in range(count): diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py index 31901db..be8a180 100644 --- a/glucometerutils/drivers/td4277.py +++ b/glucometerutils/drivers/td4277.py @@ -18,6 +18,7 @@ import enum import functools import logging import operator +from typing import Generator, NoReturn, Optional, Tuple import construct @@ -49,7 +50,7 @@ _PACKET = construct.Struct( / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data), ) -_EMPTY_MESSAGE = 0 +_EMPTY_MESSAGE = b"\x00\x00\x00\x00" _CONNECT_REQUEST = 0x22 _VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54} @@ -100,7 +101,9 @@ _READING_VALUE_STRUCT = construct.Struct( ) -def _make_packet(command, message, direction=Direction.Out): +def _make_packet( + command: int, message: bytes, direction: Direction = Direction.Out +) -> bytes: return _PACKET.build( { "data": { @@ -114,7 +117,7 @@ def _make_packet(command, message, direction=Direction.Out): ) -def _parse_datetime(message): +def _parse_datetime(message: bytes) -> datetime.datetime: date = _DATETIME_STRUCT.parse(message) # We can't parse the day properly with a single pass of Construct # unfortunately. @@ -124,7 +127,7 @@ def _parse_datetime(message): ) -def _select_record(record_id): +def _select_record(record_id: int) -> bytes: return _READING_SELECTION_STRUCT.build({"record_id": record_id}) @@ -133,11 +136,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 TIMEOUT = 0.5 - def __init__(self, device): - super(Device, self).__init__("cp2110://" + device) + def __init__(self, device: Optional[str]): + super().__init__(f"cp2110://{device}") self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True): + def _send_command( + self, + command: int, + message: bytes = _EMPTY_MESSAGE, + validate_response: bool = True, + ) -> Tuple[int, bytes]: pkt = _make_packet(command, message) logging.debug("sending packet: %s", binascii.hexlify(pkt)) @@ -151,7 +159,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return response.data.value.command, response.data.value.message - def connect(self): + def connect(self) -> None: response_command, message = self._send_command( _CONNECT_REQUEST, validate_response=False ) @@ -168,24 +176,24 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): f"Invalid model identified: {model_message!r}" ) - def disconnect(self): + def disconnect(self) -> None: pass - def get_meter_info(self): + def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo("TaiDoc TD-4277 glucometer") - def get_version(self): # pylint: disable=no-self-use + def get_version(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_serial_number(self): # pylint: disable=no-self-use + def get_serial_number(self) -> NoReturn: # pylint: disable=no-self-use raise NotImplementedError - def get_datetime(self): + def get_datetime(self) -> datetime.datetime: _, message = self._send_command(_GET_DATETIME) return _parse_datetime(message) - def _set_device_datetime(self, date): + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: assert date.year >= 2000 day_struct = _DAY_BITSTRUCT.build( @@ -202,12 +210,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): return _parse_datetime(message) - def _get_reading_count(self): + def _get_reading_count(self) -> int: _, message = self._send_command(_GET_READING_COUNT) return _READING_COUNT_STRUCT.parse(message).count - def _get_reading(self, record_id): + def _get_reading(self, record_id: int) -> common.GlucoseReading: _, reading_date_message = self._send_command( _GET_READING_DATETIME, _select_record(record_id) ) @@ -222,14 +230,14 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): reading_date, reading_value.value, meal=reading_value.meal ) - def get_readings(self): + def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id) - def zero_log(self): + def zero_log(self) -> None: self._send_command(_CLEAR_MEMORY) - def get_glucose_unit(self): + def get_glucose_unit(self) -> NoReturn: """Maybe this could be implemented by someone who knows the device""" raise NotImplementedError diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py index 1f72308..d56dbe6 100644 --- a/glucometerutils/exceptions.py +++ b/glucometerutils/exceptions.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: MIT """Common exceptions for glucometerutils.""" +from typing import Any, Optional + class Error(Exception): """Base class for the errors.""" @@ -15,42 +17,43 @@ class CommandLineError(Error): class ConnectionFailed(Error): """It was not possible to connect to the meter.""" - def __init__(self, message="Unable to connect to the meter."): - super(ConnectionFailed, self).__init__(message) + def __init__(self, message: str = "Unable to connect to the meter.") -> None: + super().__init__(message) class CommandError(Error): """It was not possible to send a command to the device.""" - def __init__(self, message="Unable to send command to device."): - super(CommandError, self).__init__(message) + def __init__(self, message: str = "Unable to send command to device.") -> None: + super().__init__(message) class InvalidResponse(Error): """The response received from the meter was not understood""" - def __init__(self, response): - super(InvalidResponse, self).__init__(f"Invalid response received:\n{response}") + def __init__(self, response: str) -> None: + super().__init__(f"Invalid response received:\n{response}") class InvalidChecksum(InvalidResponse): - def __init__(self, wire, calculated): - super(InvalidChecksum, self).__init__( - f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)" - ) + def __init__(self, wire: int, calculated: Optional[int]) -> None: + if calculated is not None: + message = f"Response checksum not matching: {wire:08x} (wire) != {calculated:08x} (calculated)" + else: + message = f"Unable to calculate checksum. Expected {wire:08x}." + + super().__init__(message) class InvalidGlucoseUnit(Error): """Unable to parse the given glucose unit""" - def __init__(self, unit): - super(InvalidGlucoseUnit, self).__init__( - f"Invalid glucose unit received:\n{unit}" - ) + def __init__(self, unit: Any) -> None: + super().__init__(f"Invalid glucose unit received:\n{unit}") class InvalidDateTime(Error): """The device has an invalid date/time setting.""" - def __init__(self): - super(InvalidDateTime, self).__init__("Invalid date and time for device") + def __init__(self) -> None: + super().__init__("Invalid date and time for device") diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py index b44ee84..4ed8a7a 100644 --- a/glucometerutils/support/construct_extras.py +++ b/glucometerutils/support/construct_extras.py @@ -18,15 +18,15 @@ class Timestamp(construct.Adapter): __slots__ = ["epoch"] - def __init__(self, subcon, epoch=0): + def __init__(self, subcon, epoch: int = 0) -> None: super(Timestamp, self).__init__(subcon) self.epoch = epoch - def _encode(self, obj, context, path): + def _encode(self, obj: datetime.datetime, context, path) -> int: assert isinstance(obj, datetime.datetime) epoch_date = datetime.datetime.utcfromtimestamp(self.epoch) delta = obj - epoch_date return int(delta.total_seconds()) - def _decode(self, obj, context, path): + def _decode(self, obj: int, context, path) -> datetime.datetime: return datetime.datetime.utcfromtimestamp(obj + self.epoch) diff --git a/glucometerutils/support/driver_base.py b/glucometerutils/support/driver_base.py index d3e45b7..2bd4264 100644 --- a/glucometerutils/support/driver_base.py +++ b/glucometerutils/support/driver_base.py @@ -1,38 +1,45 @@ -from abc import ABC, abstractmethod -from datetime import datetime -from typing import Optional, Text +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +import abc +import datetime +from typing import Generator, Optional, Text -class GlucometerDriver(ABC): - def __init__(self, device_path): - # type: (Optional[Text]) -> None +from glucometerutils import common + + +class GlucometerDriver(abc.ABC): + def __init__(self, device_path: Optional[Text]) -> None: pass - def connect(self): + def connect(self) -> None: pass - def disconnect(self): + def disconnect(self) -> None: pass - @abstractmethod - def get_meter_info(self): + @abc.abstractmethod + def get_meter_info(self) -> common.MeterInfo: """Return the device information in structured form.""" pass - @abstractmethod - def get_serial_number(self): + @abc.abstractmethod + def get_serial_number(self) -> str: pass - @abstractmethod - def get_glucose_unit(self): + @abc.abstractmethod + def get_glucose_unit(self) -> common.Unit: """Returns the glucose unit of the device.""" pass - @abstractmethod - def get_datetime(self): + @abc.abstractmethod + def get_datetime(self) -> datetime.datetime: pass - def set_datetime(self, date=None): + def set_datetime( + self, date: Optional[datetime.datetime] = None + ) -> datetime.datetime: """Sets the date and time of the glucometer. Args: @@ -43,17 +50,17 @@ class GlucometerDriver(ABC): A datetime object built according to the returned response. """ if not date: - date = datetime.now() + date = datetime.datetime.now() return self._set_device_datetime(date) - @abstractmethod - def _set_device_datetime(self, date): + @abc.abstractmethod + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: pass - @abstractmethod - def zero_log(self): + @abc.abstractmethod + def zero_log(self) -> None: pass - @abstractmethod - def get_readings(self): + @abc.abstractmethod + def get_readings(self) -> Generator[common.AnyReading, None, None]: pass diff --git a/glucometerutils/support/freestyle.py b/glucometerutils/support/freestyle.py index c77282b..245fde7 100644 --- a/glucometerutils/support/freestyle.py +++ b/glucometerutils/support/freestyle.py @@ -12,7 +12,7 @@ import csv import datetime import logging import re -from typing import AnyStr, Callable, Iterator, List, Optional, Text, Tuple +from typing import AnyStr, Callable, Iterator, List, Optional, Tuple import construct @@ -45,9 +45,10 @@ _ALWAYS_UNENCRYPTED_MESSAGES = ( ) -def _create_matcher(message_type, content): - # type: (int, Optional[bytes]) -> Callable[[Tuple[int, bytes]], bool] - def _matcher(message): +def _create_matcher( + message_type: int, content: Optional[bytes] +) -> Callable[[Tuple[int, bytes]], bool]: + def _matcher(message: Tuple[int, bytes]) -> bool: return message[0] == message_type and (content is None or content == message[1]) return _matcher @@ -91,8 +92,7 @@ _MULTIRECORDS_FORMAT = re.compile( ) -def _verify_checksum(message, expected_checksum_hex): - # type: (AnyStr, AnyStr) -> None +def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None: """Calculate the simple checksum of the message and compare with expected. Args: @@ -116,7 +116,7 @@ def _verify_checksum(message, expected_checksum_hex): raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) -def convert_ketone_unit(raw_value): +def convert_ketone_unit(raw_value: float) -> float: """Convert raw ketone value as read in the device to its value in mmol/L. As per https://protocols.glucometers.tech/abbott/freestyle-libre this is @@ -132,9 +132,12 @@ ABBOTT_VENDOR_ID = 0x1A61 class FreeStyleHidSession: def __init__( - self, product_id, device_path, text_message_type, text_reply_message_type - ): - # type: (int, Optional[Text], int, int) -> None + self, + product_id: int, + device_path: Optional[str], + text_message_type: int, + text_reply_message_type: int, + ) -> None: self._hid_session = hiddevice.HidSession( (ABBOTT_VENDOR_ID, product_id), device_path @@ -151,13 +154,12 @@ class FreeStyleHidSession: f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" ) - def send_command(self, message_type, command, encrypted=False): - # type: (int, bytes, bool) -> None + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): """Send a raw command to the device. Args: - message_type: (int) The first byte sent with the report to the device. - command: (bytes) The command to send out the device. + message_type: The first byte sent with the report to the device. + command: The command to send out the device. """ if encrypted: assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES @@ -172,8 +174,7 @@ class FreeStyleHidSession: logging.debug("Sending packet: %r", usb_packet) self._hid_session.write(usb_packet) - def read_response(self, encrypted=False): - # type: (bool) -> Tuple[int, bytes] + def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: """Read the response from the device and extracts it.""" usb_packet = self._hid_session.read() @@ -211,8 +212,7 @@ class FreeStyleHidSession: return message - def send_text_command(self, command): - # type: (bytes) -> Text + def send_text_command(self, command: bytes) -> str: """Send a command to the device that expects a text reply.""" self.send_command(self._text_message_type, command) @@ -237,13 +237,13 @@ class FreeStyleHidSession: match = _TEXT_REPLY_FORMAT.search(full_content) if not match: - raise exceptions.InvalidResponse(full_content) + raise exceptions.InvalidResponse(repr(full_content)) message = match.group("message") _verify_checksum(message, match.group("checksum")) if match.group("status") != b"OK": - raise exceptions.InvalidResponse(message or "Command failed") + raise exceptions.InvalidResponse(repr(message) or "Command failed") # If there is anything in the response that is not ASCII-safe, this is # probably in the patient name. The Windows utility does not seem to @@ -251,8 +251,7 @@ class FreeStyleHidSession: # unknown codepoint. return message.decode("ascii", "replace") - def query_multirecord(self, command): - # type: (bytes) -> Iterator[List[Text]] + def query_multirecord(self, command: bytes) -> Iterator[List[str]]: """Queries for, and returns, "multirecords" results. Multirecords are used for querying events, readings, history and similar @@ -298,43 +297,44 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): though. """ - def __init__(self, product_id, device_path, text_cmd=0x60, text_reply_cmd=0x60): - # type: (int, Optional[Text], int, int) -> None + def __init__( + self, + product_id: int, + device_path: Optional[str], + text_cmd: int = 0x60, + text_reply_cmd: int = 0x60, + ) -> None: super().__init__(device_path) self._session = FreeStyleHidSession( product_id, device_path, text_cmd, text_reply_cmd ) - def connect(self): + def connect(self) -> None: """Open connection to the device, starting the knocking sequence.""" self._session.connect() - def disconnect(self): + def disconnect(self) -> None: """Disconnect the device, nothing to be done.""" pass # Some of the commands are also shared across devices that use this HID # protocol, but not many. Only provide here those that do seep to change # between them. - def _get_version(self): - # type: () -> Text + def _get_version(self) -> str: """Return the software version of the device.""" return self._session.send_text_command(b"$swver?").rstrip("\r\n") - def get_serial_number(self): - # type: () -> Text + def get_serial_number(self) -> str: """Returns the serial number of the device.""" return self._session.send_text_command(b"$serlnum?").rstrip("\r\n") - def get_patient_name(self): - # type: () -> Optional[Text] + def get_patient_name(self) -> Optional[str]: patient_name = self._session.send_text_command(b"$ptname?").rstrip("\r\n") if not patient_name: return None return patient_name - def set_patient_name(self, name): - # type: (Text) -> None + def set_patient_name(self, name: str) -> None: try: encoded_name = name.encode("ascii") except UnicodeDecodeError: @@ -342,8 +342,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): self._session.send_text_command(b"$ptname," + encoded_name) - def get_datetime(self): - # type: () -> datetime.datetime + def get_datetime(self) -> datetime.datetime: """Gets the date and time as reported by the device. This is one of the few commands that appear common to many of the @@ -364,9 +363,7 @@ class FreeStyleHidDevice(driver_base.GlucometerDriver): except ValueError: raise exceptions.InvalidDateTime() - def _set_device_datetime(self, date): - # type: (datetime.datetime) -> datetime.datetime - + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: # The format used by the FreeStyle devices is not composable based on # standard strftime() (namely it includes no leading zeros), so we need # to build it manually. diff --git a/glucometerutils/support/hiddevice.py b/glucometerutils/support/hiddevice.py index 41ad17c..124b2e6 100644 --- a/glucometerutils/support/hiddevice.py +++ b/glucometerutils/support/hiddevice.py @@ -6,7 +6,7 @@ import logging import os -from typing import BinaryIO, Optional, Text, Tuple +from typing import BinaryIO, Optional, Tuple from glucometerutils import exceptions @@ -18,8 +18,12 @@ class HidSession: methods abstracting the HID library. """ - def __init__(self, usb_id, device, timeout_ms=0): - # type: (Optional[Tuple[int, int]], Optional[Text], int) -> None + def __init__( + self, + usb_id: Optional[Tuple[int, int]], + device: Optional[str], + timeout_ms: int = 0, + ) -> None: """Construct a new session object. Args: @@ -66,8 +70,7 @@ class HidSession: message=f"Unable to connect to meter: {e}." ) - def write(self, report): - # type: (bytes) -> None + def write(self, report: bytes) -> None: """Writes a report to the HID handle.""" if self.handle_: @@ -78,8 +81,7 @@ class HidSession: if written < 0: raise exceptions.CommandError() - def read(self, size=64): - # type: (int) -> bytes + def read(self, size: int = 64) -> bytes: """Read a report from the HID handle. This is important as it handles the one incompatible interface between diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py index 1c329c6..20869f9 100644 --- a/glucometerutils/support/lifescan.py +++ b/glucometerutils/support/lifescan.py @@ -9,7 +9,7 @@ from glucometerutils import exceptions class MissingChecksum(exceptions.InvalidResponse): """The response misses the expected 4-digits checksum.""" - def __init__(self, response): + def __init__(self, response: str): super(MissingChecksum, self).__init__( f"Response is missing checksum: {response}" ) @@ -18,19 +18,18 @@ class MissingChecksum(exceptions.InvalidResponse): class InvalidSerialNumber(exceptions.Error): """The serial number is not as expected.""" - def __init__(self, serial_number): + def __init__(self, serial_number: str): super(InvalidSerialNumber, self).__init__( f"Serial number {serial_number} is invalid." ) class MalformedCommand(exceptions.InvalidResponse): - def __init__(self, message): + def __init__(self, message: str): super(MalformedCommand, self).__init__(f"Malformed command: {message}") -def crc_ccitt(data): - # type: (bytes) -> int +def crc_ccitt(data: bytes) -> int: """Calculate the CRC-16-CCITT with LifeScan's common seed. Args: diff --git a/glucometerutils/support/lifescan_binary_protocol.py b/glucometerutils/support/lifescan_binary_protocol.py index 441226e..1cef4d9 100644 --- a/glucometerutils/support/lifescan_binary_protocol.py +++ b/glucometerutils/support/lifescan_binary_protocol.py @@ -24,8 +24,9 @@ _LINK_CONTROL = construct.BitStruct( ) -def LifeScanPacket(include_link_control): # pylint: disable=invalid-name - # type: (bool) -> construct.Struct +def LifeScanPacket( + include_link_control: bool, +) -> construct.Struct: # pylint: disable=invalid-name if include_link_control: link_control_construct = _LINK_CONTROL else: diff --git a/glucometerutils/support/serial.py b/glucometerutils/support/serial.py index d9e80ea..6a2d142 100644 --- a/glucometerutils/support/serial.py +++ b/glucometerutils/support/serial.py @@ -5,9 +5,10 @@ """ import logging -from typing import Optional, Text +from typing import Optional import serial + from glucometerutils import exceptions @@ -37,13 +38,12 @@ class SerialDevice: """ - BAUDRATE = None # type: int - DEFAULT_CABLE_ID = None # type: Text + BAUDRATE: Optional[int] = None + DEFAULT_CABLE_ID: Optional[str] = None - TIMEOUT = 1 # type: float + TIMEOUT: float = 1 - def __init__(self, device): - # type: (Optional[Text]) -> None + def __init__(self, device: Optional[str]) -> None: assert self.BAUDRATE is not None if not device and self.DEFAULT_CABLE_ID: diff --git a/glucometerutils/tests/test_common.py b/glucometerutils/tests/test_common.py index 3e733e3..2a3a23d 100644 --- a/glucometerutils/tests/test_common.py +++ b/glucometerutils/tests/test_common.py @@ -6,10 +6,13 @@ # pylint: disable=protected-access,missing-docstring import datetime +import unittest from absl.testing import parameterized from glucometerutils import common +TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45) + class TestGlucoseConversion(parameterized.TestCase): def test_convert_to_mmol(self): @@ -43,11 +46,8 @@ class TestGlucoseConversion(parameterized.TestCase): class TestGlucoseReading(parameterized.TestCase): - - TEST_DATETIME = datetime.datetime(2018, 1, 1, 0, 30, 45) - def test_minimal(self): - reading = common.GlucoseReading(self.TEST_DATETIME, 100) + reading = common.GlucoseReading(TEST_DATETIME, 100) self.assertEqual( reading.as_csv(common.Unit.MG_DL), '"2018-01-01 00:30:45","100.00","","blood sample",""', @@ -57,7 +57,7 @@ class TestGlucoseReading(parameterized.TestCase): ("_mgdl", common.Unit.MG_DL, 100), ("_mmoll", common.Unit.MMOL_L, 5.56) ) def test_value(self, unit, expected_value): - reading = common.GlucoseReading(self.TEST_DATETIME, 100) + reading = common.GlucoseReading(TEST_DATETIME, 100) self.assertAlmostEqual(reading.get_value_as(unit), expected_value, places=2) @parameterized.named_parameters( @@ -98,10 +98,36 @@ class TestGlucoseReading(parameterized.TestCase): ), ) def test_csv(self, kwargs_dict, expected_csv): - reading = common.GlucoseReading(self.TEST_DATETIME, 100, **kwargs_dict) + reading = common.GlucoseReading(TEST_DATETIME, 100, **kwargs_dict) self.assertEqual(reading.as_csv(common.Unit.MG_DL), expected_csv) +class TestKetoneReading(unittest.TestCase): + def test_measure_method(self): + """Raise an exception if an invalid measurement method is provided. + + We allow measure_method as a parameter for compatibility with the other + Readings, but we don't want anything _but_ the BLOOD_SAMPLE method. + """ + with self.subTest("No measure_method parameter."): + self.assertIsNotNone(common.KetoneReading(TEST_DATETIME, 100)) + + with self.subTest("measure_method=MeasurementMethod.BLOOD_SAMPLE is valid"): + self.assertIsNotNone( + common.KetoneReading( + TEST_DATETIME, + 100, + measure_method=common.MeasurementMethod.BLOOD_SAMPLE, + ) + ) + + with self.subTest("measure_method=MeasurementMethod.TIME raises ValueError"): + with self.assertRaises(ValueError): + common.KetoneReading( + TEST_DATETIME, 100, measure_method=common.MeasurementMethod.TIME + ) + + class TestMeterInfo(parameterized.TestCase): @parameterized.named_parameters( ("_no_serial_number", {}, "Serial Number: N/A\n"), diff --git a/pyproject.toml b/pyproject.toml index 3ab8a28..4e2ae7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,6 @@ multi_line_output = 3 include_trailing_comma = true known_first_party = ['glucometerutils'] -known_third_party = ['construct', 'hidapi', 'pyserial', 'pyscsi'] +known_third_party = ['construct', 'hidapi', 'pyscsi', 'serial'] [tool.setuptools_scm] |