diff options
author | Ben <b-schaefer@posteo.de> | 2020-02-21 10:45:40 +0100 |
---|---|---|
committer | Diego Elio Pettenò <flameeyes@flameeyes.com> | 2020-03-08 00:36:39 +0100 |
commit | e72b02d84e7f67cdf6107862ad580e951a5bbda1 (patch) | |
tree | 0887513d2478f55b27abccfeb307f313231bd994 /glucometerutils/drivers | |
parent | pre-commit guide in README (diff) | |
download | glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.gz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.bz2 glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.lz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.xz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.zst glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.zip |
Diffstat (limited to 'glucometerutils/drivers')
-rw-r--r-- | glucometerutils/drivers/accuchek_reports.py | 73 | ||||
-rw-r--r-- | glucometerutils/drivers/contourusb.py | 39 | ||||
-rw-r--r-- | glucometerutils/drivers/fsinsulinx.py | 55 | ||||
-rw-r--r-- | glucometerutils/drivers/fslibre.py | 184 | ||||
-rw-r--r-- | glucometerutils/drivers/fsoptium.py | 134 | ||||
-rw-r--r-- | glucometerutils/drivers/fsprecisionneo.py | 61 | ||||
-rw-r--r-- | glucometerutils/drivers/otultra2.py | 112 | ||||
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 149 | ||||
-rw-r--r-- | glucometerutils/drivers/otverio2015.py | 123 | ||||
-rw-r--r-- | glucometerutils/drivers/otverioiq.py | 104 | ||||
-rw-r--r-- | glucometerutils/drivers/sdcodefree.py | 108 | ||||
-rw-r--r-- | glucometerutils/drivers/td4277.py | 129 |
12 files changed, 638 insertions, 633 deletions
diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index 06b69bc..c4d7527 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -19,45 +19,46 @@ import datetime import glob import os -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base _UNIT_MAP = { - 'mmol/l': common.Unit.MMOL_L, - 'mg/dl': common.Unit.MG_DL, + "mmol/l": common.Unit.MMOL_L, + "mg/dl": common.Unit.MG_DL, } -_DATE_CSV_KEY = 'Date' -_TIME_CSV_KEY = 'Time' -_RESULT_CSV_KEY = 'Result' -_UNIT_CSV_KEY = 'Unit' -_TEMPWARNING_CSV_KEY = 'Temperature warning' # ignored -_OUTRANGE_CSV_KEY = 'Out of target range' # ignored -_OTHER_CSV_KEY = 'Other' # ignored -_BEFORE_MEAL_CSV_KEY = 'Before meal' -_AFTER_MEAL_CSV_KEY = 'After meal' +_DATE_CSV_KEY = "Date" +_TIME_CSV_KEY = "Time" +_RESULT_CSV_KEY = "Result" +_UNIT_CSV_KEY = "Unit" +_TEMPWARNING_CSV_KEY = "Temperature warning" # ignored +_OUTRANGE_CSV_KEY = "Out of target range" # ignored +_OTHER_CSV_KEY = "Other" # ignored +_BEFORE_MEAL_CSV_KEY = "Before meal" +_AFTER_MEAL_CSV_KEY = "After meal" # Control test has extra whitespace which is not ignored. -_CONTROL_CSV_KEY = 'Control test' + ' '*197 +_CONTROL_CSV_KEY = "Control test" + " " * 197 -_DATE_FORMAT = '%d.%m.%Y' -_TIME_FORMAT = '%H:%M' +_DATE_FORMAT = "%d.%m.%Y" +_TIME_FORMAT = "%H:%M" -_DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT)) +_DATETIME_FORMAT = " ".join((_DATE_FORMAT, _TIME_FORMAT)) class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device or not os.path.isdir(device): raise exceptions.CommandLineError( - '--device parameter is required, should point to mount path ' - 'for the meter.') + "--device parameter is required, should point to mount path " + "for the meter." + ) - reports_path = os.path.join(device, '*', 'Reports', '*.csv') + reports_path = os.path.join(device, "*", "Reports", "*.csv") report_files = glob.glob(reports_path) if not report_files: raise exceptions.ConnectionFailed( - 'No report file found in path "%s".' % reports_path) + 'No report file found in path "%s".' % reports_path + ) self.report_file = report_files[0] @@ -68,35 +69,32 @@ class Device(driver_base.GlucometerDriver): next(self.report) return csv.DictReader( - self.report, - delimiter=';', - skipinitialspace=True, - quoting=csv.QUOTE_NONE) + self.report, delimiter=";", skipinitialspace=True, quoting=csv.QUOTE_NONE + ) def connect(self): - self.report = open( - self.report_file, 'r', newline='\r\n', encoding='utf-8') + self.report = open(self.report_file, "r", newline="\r\n", encoding="utf-8") def disconnect(self): self.report.close() def get_meter_info(self): return common.MeterInfo( - '%s glucometer' % self.get_model(), + "%s glucometer" % self.get_model(), serial_number=self.get_serial_number(), - native_unit=self.get_glucose_unit()) + native_unit=self.get_glucose_unit(), + ) def get_model(self): # $device/MODEL/Reports/*.csv - return os.path.basename( - os.path.dirname(os.path.dirname(self.report_file))) + return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) def get_serial_number(self): self.report.seek(0) # ignore the first line. next(self.report) # The second line of the CSV is serial-no;report-date;report-time;;;;;;; - return next(self.report).split(';')[0] + return next(self.report).split(";")[0] def get_glucose_unit(self): # Get the first record available and parse that. @@ -115,13 +113,12 @@ class Device(driver_base.GlucometerDriver): def _extract_datetime(self, record): # pylint: disable=no-self-use # Date and time are in separate column, but we want to parse them # together. - date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) + date_and_time = " ".join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) def _extract_meal(self, record): # pylint: disable=no-self-use if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: - raise exceptions.InvalidResponse( - 'Reading cannot be before and after meal.') + raise exceptions.InvalidResponse("Reading cannot be before and after meal.") elif record[_AFTER_MEAL_CSV_KEY]: return common.Meal.AFTER elif record[_BEFORE_MEAL_CSV_KEY]: @@ -139,5 +136,7 @@ class Device(driver_base.GlucometerDriver): common.convert_glucose_unit( float(record[_RESULT_CSV_KEY]), _UNIT_MAP[record[_UNIT_CSV_KEY]], - common.Unit.MG_DL), - meal=self._extract_meal(record)) + common.Unit.MG_DL, + ), + meal=self._extract_meal(record), + ) diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 397eb4f..5c9ed11 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -24,43 +24,44 @@ from glucometerutils import common from glucometerutils.support import contourusb, driver_base -def _extract_timestamp(parsed_record, prefix=''): +def _extract_timestamp(parsed_record, prefix=""): """Extract the timestamp from a parsed record. This leverages the fact that all the reading records have the same base structure. """ - datetime_str = parsed_record['datetime'] + datetime_str = parsed_record["datetime"] return datetime.datetime( - int(datetime_str[0:4]), #year - int(datetime_str[4:6]), #month - int(datetime_str[6:8]), #day - int(datetime_str[8:10]), #hour - int(datetime_str[10:12]), #minute - 0) + int(datetime_str[0:4]), # year + int(datetime_str[4:6]), # month + int(datetime_str[6:8]), # day + int(datetime_str[8:10]), # hour + int(datetime_str[10:12]), # minute + 0, + ) class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): """Glucometer driver for FreeStyle Libre devices.""" - USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int def get_meter_info(self): self._get_info_record() return common.MeterInfo( - 'Contour USB', + "Contour USB", serial_number=self._get_serial_number(), - version_info=( - 'Meter versions: ' + self._get_version(),), - native_unit= self.get_glucose_unit()) + version_info=("Meter versions: " + self._get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use - if self._get_glucose_unit() == '0': + if self._get_glucose_unit() == "0": return common.Unit.MG_DL else: return common.Unit.MMOL_L - + def get_readings(self): """ Get reading dump from download data mode(all readings stored) @@ -69,10 +70,10 @@ class Device(contourusb.ContourHidDevice, driver_base.GlucometerDriver): for parsed_record in self._get_multirecord(): yield common.GlucoseReading( _extract_timestamp(parsed_record), - int(parsed_record['value']), - comment=parsed_record['markers'], - measure_method=common.MeasurementMethod.BLOOD_SAMPLE - ) + int(parsed_record["value"]), + comment=parsed_record["markers"], + measure_method=common.MeasurementMethod.BLOOD_SAMPLE, + ) def get_serial_number(self): raise NotImplementedError diff --git a/glucometerutils/drivers/fsinsulinx.py b/glucometerutils/drivers/fsinsulinx.py index f3cf043..5465b3a 100644 --- a/glucometerutils/drivers/fsinsulinx.py +++ b/glucometerutils/drivers/fsinsulinx.py @@ -22,20 +22,30 @@ import datetime from glucometerutils import common from glucometerutils.support import freestyle - # The type is a string because it precedes the parsing of the object. -_TYPE_GLUCOSE_READING = '0' - -_InsulinxReading = collections.namedtuple('_InsulinxReading', ( - 'type', # 0 = blood glucose - 'id', - 'month', 'day', 'year', # year is two-digits - 'hour', 'minute', - 'unknown1', 'unknown2', 'unknown3', - 'unknown4', 'unknown5', 'unknown6', - 'value', - 'unknown7', 'unknown8', -)) +_TYPE_GLUCOSE_READING = "0" + +_InsulinxReading = collections.namedtuple( + "_InsulinxReading", + ( + "type", # 0 = blood glucose + "id", + "month", + "day", + "year", # year is two-digits + "hour", + "minute", + "unknown1", + "unknown2", + "unknown3", + "unknown4", + "unknown5", + "unknown6", + "value", + "unknown7", + "unknown8", + ), +) class Device(freestyle.FreeStyleHidDevice): @@ -46,11 +56,11 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle InsuLinx', + "FreeStyle InsuLinx", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self._get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -58,7 +68,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b'$result?'): + for record in self._get_multirecord(b"$result?"): if not record or record[0] != _TYPE_GLUCOSE_READING: continue @@ -67,11 +77,14 @@ class Device(freestyle.FreeStyleHidDevice): raw_reading = _InsulinxReading._make([int(v) for v in record]) timestamp = datetime.datetime( - raw_reading.year + 2000, raw_reading.month, raw_reading.day, - raw_reading.hour, raw_reading.minute) + raw_reading.year + 2000, + raw_reading.month, + raw_reading.day, + raw_reading.hour, + raw_reading.minute, + ) yield common.GlucoseReading(timestamp, raw_reading.value) def zero_log(self): raise NotImplementedError - diff --git a/glucometerutils/drivers/fslibre.py b/glucometerutils/drivers/fslibre.py index f1ac525..29e821a 100644 --- a/glucometerutils/drivers/fslibre.py +++ b/glucometerutils/drivers/fslibre.py @@ -27,52 +27,47 @@ from glucometerutils.support import freestyle # Fields of the records returned by both $history and $arresult? # Tuple of pairs of idx and field name _BASE_ENTRY_MAP = ( - (0, 'device_id'), - (1, 'type'), - (2, 'month'), - (3, 'day'), - (4, 'year'), # 2-digits - (5, 'hour'), - (6, 'minute'), - (7, 'second'), + (0, "device_id"), + (1, "type"), + (2, "month"), + (3, "day"), + (4, "year"), # 2-digits + (5, "hour"), + (6, "minute"), + (7, "second"), ) # Fields of the records returned by $history? -_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ( - (13, 'value'), - (15, 'errors'), -) +_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + ((13, "value"), (15, "errors"),) # Fields of the results returned by $arresult? where type = 2 _ARRESULT_TYPE2_ENTRY_MAP = ( - (9, 'reading-type'), # 0 = glucose blood strip, - # 1 = ketone blood strip, - # 2 = glucose sensor - (12, 'value'), - (15, 'sport-flag'), - (16, 'medication-flag'), - (17, 'rapid-acting-flag'), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP - (18, 'long-acting-flag'), - (19, 'custom-comments-bitfield'), - (23, 'double-long-acting-insulin'), - (25, 'food-flag'), - (26, 'food-carbs-grams'), - (28, 'errors'), + (9, "reading-type"), # 0 = glucose blood strip, + # 1 = ketone blood strip, + # 2 = glucose sensor + (12, "value"), + (15, "sport-flag"), + (16, "medication-flag"), + (17, "rapid-acting-flag"), # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP + (18, "long-acting-flag"), + (19, "custom-comments-bitfield"), + (23, "double-long-acting-insulin"), + (25, "food-flag"), + (26, "food-carbs-grams"), + (28, "errors"), ) _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = ( - (9, 'old_month'), - (10, 'old_day'), - (11, 'old_year'), - (12, 'old_hour'), - (13, 'old_minute'), - (14, 'old_second'), + (9, "old_month"), + (10, "old_day"), + (11, "old_year"), + (12, "old_hour"), + (13, "old_minute"), + (14, "old_second"), ) # Fields only valid when rapid-acting-flag is "1" -_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ( - (43, 'double-rapid-acting-insulin'), -) +_ARRESULT_RAPID_INSULIN_ENTRY_MAP = ((43, "double-rapid-acting-insulin"),) def _parse_record(record, entry_map): @@ -82,26 +77,25 @@ def _parse_record(record, entry_map): return {} try: - return { - key: int(record[idx]) for idx, key in entry_map - } + return {key: int(record[idx]) for idx, key in entry_map} except IndexError: return {} -def _extract_timestamp(parsed_record, prefix=''): +def _extract_timestamp(parsed_record, prefix=""): """Extract the timestamp from a parsed record. This leverages the fact that all the records have the same base structure. """ return datetime.datetime( - parsed_record[prefix + 'year'] + 2000, - parsed_record[prefix + 'month'], - parsed_record[prefix + 'day'], - parsed_record[prefix + 'hour'], - parsed_record[prefix + 'minute'], - parsed_record[prefix + 'second']) + parsed_record[prefix + "year"] + 2000, + parsed_record[prefix + "month"], + parsed_record[prefix + "day"], + parsed_record[prefix + "hour"], + parsed_record[prefix + "minute"], + parsed_record[prefix + "second"], + ) def _parse_arresult(record): @@ -112,24 +106,23 @@ def _parse_arresult(record): # There are other record types, but we don't currently need to expose these. if not parsed_record: return None - elif parsed_record['type'] == 2: + elif parsed_record["type"] == 2: parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP)) - elif parsed_record['type'] == 5: + elif parsed_record["type"] == 5: parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP)) return common.TimeAdjustment( _extract_timestamp(parsed_record), - _extract_timestamp(parsed_record, 'old_'), - extra_data={'device_id': parsed_record['device_id']}, + _extract_timestamp(parsed_record, "old_"), + extra_data={"device_id": parsed_record["device_id"]}, ) else: return None # Check right away if we have rapid insulin - if parsed_record['rapid-acting-flag']: - parsed_record.update( - _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP)) + if parsed_record["rapid-acting-flag"]: + parsed_record.update(_parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP)) - if parsed_record['errors']: + if parsed_record["errors"]: return None comment_parts = [] @@ -137,68 +130,69 @@ def _parse_arresult(record): cls = None value = None - if parsed_record['reading-type'] == 2: - comment_parts.append('(Scan)') + if parsed_record["reading-type"] == 2: + comment_parts.append("(Scan)") measure_method = common.MeasurementMethod.CGM cls = common.GlucoseReading - value = parsed_record['value'] - elif parsed_record['reading-type'] == 0: - comment_parts.append('(Blood)') + value = parsed_record["value"] + elif parsed_record["reading-type"] == 0: + comment_parts.append("(Blood)") measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.GlucoseReading - value = parsed_record['value'] - elif parsed_record['reading-type'] == 1: - comment_parts.append('(Ketone)') + value = parsed_record["value"] + elif parsed_record["reading-type"] == 1: + comment_parts.append("(Ketone)") measure_method = common.MeasurementMethod.BLOOD_SAMPLE cls = common.KetoneReading # automatically convert the raw value in mmol/L - value = freestyle.convert_ketone_unit(parsed_record['value']) + value = freestyle.convert_ketone_unit(parsed_record["value"]) else: # unknown reading return None custom_comments = record[29:35] for comment_index in range(6): - if parsed_record['custom-comments-bitfield'] & (1 << comment_index): + if parsed_record["custom-comments-bitfield"] & (1 << comment_index): comment_parts.append(custom_comments[comment_index][1:-1]) - if parsed_record['sport-flag']: - comment_parts.append('Sport') + if parsed_record["sport-flag"]: + comment_parts.append("Sport") - if parsed_record['medication-flag']: - comment_parts.append('Medication') + if parsed_record["medication-flag"]: + comment_parts.append("Medication") - if parsed_record['food-flag']: - if parsed_record['food-carbs-grams']: - comment_parts.append( - 'Food (%d g)' % parsed_record['food-carbs-grams']) + if parsed_record["food-flag"]: + if parsed_record["food-carbs-grams"]: + comment_parts.append("Food (%d g)" % parsed_record["food-carbs-grams"]) else: - comment_parts.append('Food') + comment_parts.append("Food") - if parsed_record['long-acting-flag']: - if parsed_record['double-long-acting-insulin']: + if parsed_record["long-acting-flag"]: + if parsed_record["double-long-acting-insulin"]: comment_parts.append( - 'Long-acting insulin (%.1f)' % - (parsed_record['double-long-acting-insulin']/2.)) + "Long-acting insulin (%.1f)" + % (parsed_record["double-long-acting-insulin"] / 2.0) + ) else: - comment_parts.append('Long-acting insulin') + comment_parts.append("Long-acting insulin") - if parsed_record['rapid-acting-flag']: + if parsed_record["rapid-acting-flag"]: # provide default value, as this record does not always exist # (even if rapid-acting-flag is set) - if parsed_record.get('double-rapid-acting-insulin', 0): + if parsed_record.get("double-rapid-acting-insulin", 0): comment_parts.append( - 'Rapid-acting insulin (%.1f)' % - (parsed_record['double-rapid-acting-insulin']/2.)) + "Rapid-acting insulin (%.1f)" + % (parsed_record["double-rapid-acting-insulin"] / 2.0) + ) else: - comment_parts.append('Rapid-acting insulin') + comment_parts.append("Rapid-acting insulin") return cls( _extract_timestamp(parsed_record), value, - comment='; '.join(comment_parts), + comment="; ".join(comment_parts), measure_method=measure_method, - extra_data={'device_id': parsed_record['device_id']}, + extra_data={"device_id": parsed_record["device_id"]}, ) @@ -210,16 +204,16 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle Libre', + "FreeStyle Libre", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), + version_info=("Software version: " + self._get_version(),), native_unit=self.get_glucose_unit(), - patient_name=self.get_patient_name()) + patient_name=self.get_patient_name(), + ) def get_serial_number(self): """Overridden function as the command is not compatible.""" - return self._send_text_command(b'$sn?').rstrip('\r\n') + return self._send_text_command(b"$sn?").rstrip("\r\n") def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -231,27 +225,27 @@ class Device(freestyle.FreeStyleHidDevice): # First of all get the usually longer list of sensor readings, and # convert them to Readings objects. - for record in self._get_multirecord(b'$history?'): + for record in self._get_multirecord(b"$history?"): parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP) - if not parsed_record or parsed_record['errors'] != 0: + if not parsed_record or parsed_record["errors"] != 0: # The reading is considered invalid, so ignore it. continue yield common.GlucoseReading( _extract_timestamp(parsed_record), - parsed_record['value'], - comment='(Sensor)', + parsed_record["value"], + comment="(Sensor)", measure_method=common.MeasurementMethod.CGM, - extra_data={'device_id': parsed_record['device_id']}, + extra_data={"device_id": parsed_record["device_id"]}, ) # Then get the results of explicit scans and blood tests (and other # events). - for record in self._get_multirecord(b'$arresult?'): + for record in self._get_multirecord(b"$arresult?"): reading = _parse_arresult(record) if reading: yield reading def zero_log(self): - self._send_text_command(b'$resetpatient') + self._send_text_command(b"$resetpatient") diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index 66b23ca..5c3971e 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -20,13 +20,13 @@ import datetime import logging import re -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial _CLOCK_RE = re.compile( - r'^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t' - r'(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$') + r"^Clock:\t(?P<month>[A-Z][a-z]{2}) (?P<day>[0-9]{2}) (?P<year>[0-9]{4})\t" + r"(?P<time>[0-9]{2}:[0-9]{2}:[0-9]{2})$" +) # The reading can be HI (padded to three-characters by a space) if the value was # over what the meter was supposed to read. Unlike the "Clock:" line, the months @@ -34,33 +34,33 @@ _CLOCK_RE = re.compile( # characters, so accept a space or 'e'/'y' at the end of the month name. Also, # the time does *not* include seconds. _READING_RE = re.compile( - r'^(?P<reading>HI |[0-9]{3}) ' - r'(?P<month>[A-Z][a-z]{2})[ ey] ' - r'(?P<day>[0-9]{2}) ' - r'(?P<year>[0-9]{4}) ' - r'(?P<time>[0-9]{2}:[0-9]{2}) ' - r'(?P<type>[GK]) 0x00$') + r"^(?P<reading>HI |[0-9]{3}) " + r"(?P<month>[A-Z][a-z]{2})[ ey] " + r"(?P<day>[0-9]{2}) " + r"(?P<year>[0-9]{4}) " + r"(?P<time>[0-9]{2}:[0-9]{2}) " + r"(?P<type>[GK]) 0x00$" +) -_CHECKSUM_RE = re.compile( - r'^(?P<checksum>0x[0-9A-F]{4}) END$') +_CHECKSUM_RE = re.compile(r"^(?P<checksum>0x[0-9A-F]{4}) END$") # There are two date format used by the device. One uses three-letters month # names, and that's easy enough. The other uses three-letters month names, # except for (at least) July. So ignore the fourth character. # explicit mapping. Note that the mapping *requires* a trailing whitespace. _MONTH_MATCHES = { - 'Jan': 1, - 'Feb': 2, - 'Mar': 3, - 'Apr': 4, - 'May': 5, - 'Jun': 6, - 'Jul': 7, - 'Aug': 8, - 'Sep': 9, - 'Oct': 10, - 'Nov': 11, - 'Dec': 12 + "Jan": 1, + "Feb": 2, + "Mar": 3, + "Apr": 4, + "May": 5, + "Jun": 6, + "Jul": 7, + "Aug": 8, + "Sep": 9, + "Oct": 10, + "Nov": 11, + "Dec": 12, } @@ -75,60 +75,59 @@ def _parse_clock(datestr): raise exceptions.InvalidResponse(datestr) # int() parses numbers in decimal, so we don't have to worry about '08' - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) + day = int(match.group("day")) + month = _MONTH_MATCHES[match.group("month")] + year = int(match.group("year")) - hour, minute, second = (int (x) for x in match.group('time').split(':')) + hour, minute, second = (int(x) for x in match.group("time").split(":")) return datetime.datetime(year, month, day, hour, minute, second) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 19200 - DEFAULT_CABLE_ID = '1a61:3420' + DEFAULT_CABLE_ID = "1a61:3420" def _send_command(self, command): - cmd_bytes = bytes('$%s\r\n' % command, 'ascii') - logging.debug('Sending command: %r', cmd_bytes) + cmd_bytes = bytes("$%s\r\n" % command, "ascii") + logging.debug("Sending command: %r", cmd_bytes) self.serial_.write(cmd_bytes) self.serial_.flush() response = self.serial_.readlines() - logging.debug('Received response: %r', response) + logging.debug("Received response: %r", response) # We always want to decode the output, and remove stray \r\n. Any # failure in decoding means the output is invalid anyway. - decoded_response = [line.decode('ascii').rstrip('\r\n') - for line in response] + decoded_response = [line.decode("ascii").rstrip("\r\n") for line in response] return decoded_response def connect(self): - self._send_command('xmem') # ignore output this time + self._send_command("xmem") # ignore output this time self._fetch_device_information() def disconnect(self): # pylint: disable=no-self-use return def _fetch_device_information(self): - data = self._send_command('colq') + data = self._send_command("colq") for line in data: - parsed_line = line.split('\t') + parsed_line = line.split("\t") - if parsed_line[0] == 'S/N:': + if parsed_line[0] == "S/N:": self.device_serialno_ = parsed_line[1] - elif parsed_line[0] == 'Ver:': + elif parsed_line[0] == "Ver:": self.device_version_ = parsed_line[1] - if parsed_line[2] == 'MMOL': + if parsed_line[2] == "MMOL": self.device_glucose_unit_ = common.Unit.MMOL_L else: # I only have a mmol/l device, so I can't be sure. self.device_glucose_unit_ = common.Unit.MG_DL # There are more entries: Clock, Market, ROM and Usage, but we don't # care for those here. - elif parsed_line[0] == 'CMD OK': + elif parsed_line[0] == "CMD OK": return # I have not figured out why this happens, but sometimes it's echoing @@ -142,11 +141,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): A common.MeterInfo object. """ return common.MeterInfo( - 'Freestyle Optium glucometer', + "Freestyle Optium glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): """Returns an identifier of the firmware version of the glucometer. @@ -179,21 +178,21 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Returns: A datetime object built according to the returned response. """ - data = self._send_command('colq') + data = self._send_command("colq") for line in data: - if not line.startswith('Clock:'): + if not line.startswith("Clock:"): continue return _parse_clock(line) - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) def _set_device_datetime(self, date): - data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M')) + data = self._send_command(date.strftime("tim,%m,%d,%y,%H,%M")) - parsed_data = ''.join(data) - if parsed_data != 'CMD OK': + parsed_data = "".join(data) + if parsed_data != "CMD OK": raise exceptions.InvalidResponse(parsed_data) return self.get_datetime() @@ -216,50 +215,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): expected. """ - data = self._send_command('xmem') + data = self._send_command("xmem") # The first line is empty, the second is the serial number, the third # the version, the fourth the current time, and the fifth the record # count.. The last line has a checksum and the end. count = int(data[4]) if count != (len(data) - 6): - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) # Extract the checksum from the last line. checksum_match = _CHECKSUM_RE.match(data[-1]) if not checksum_match: - raise exceptions.InvalidResponse('\n'.join(data)) + raise exceptions.InvalidResponse("\n".join(data)) - expected_checksum = int(checksum_match.group('checksum'), 16) + expected_checksum = int(checksum_match.group("checksum"), 16) # exclude the last line in the checksum calculation, as that's the # checksum itself. The final \r\n is added separately. - calculated_checksum = sum( - ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa + calculated_checksum = sum(ord(c) for c in "\r\n".join(data[:-1])) + 0xD + 0xA if expected_checksum != calculated_checksum: - raise exceptions.InvalidChecksum( - expected_checksum, calculated_checksum) + raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) for line in data[5:-1]: match = _READING_RE.match(line) if not match: raise exceptions.InvalidResponse(line) - if match.group('type') != 'G': - logging.warning( - 'Non-glucose readings are not supported, ignoring.') + if match.group("type") != "G": + logging.warning("Non-glucose readings are not supported, ignoring.") continue - if match.group('reading') == 'HI ': + if match.group("reading") == "HI ": value = float("inf") else: - value = float(match.group('reading')) + value = float(match.group("reading")) - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) + day = int(match.group("day")) + month = _MONTH_MATCHES[match.group("month")] + year = int(match.group("year")) - hour, minute = map(int, match.group('time').split(':')) + hour, minute = map(int, match.group("time").split(":")) timestamp = datetime.datetime(year, month, day, hour, minute) diff --git a/glucometerutils/drivers/fsprecisionneo.py b/glucometerutils/drivers/fsprecisionneo.py index 58564e5..909fed8 100644 --- a/glucometerutils/drivers/fsprecisionneo.py +++ b/glucometerutils/drivers/fsprecisionneo.py @@ -27,25 +27,30 @@ import datetime from glucometerutils import common from glucometerutils.support import freestyle - # The type is a string because it precedes the parsing of the object. -_TYPE_GLUCOSE_READING = '7' -_TYPE_KETONE_READING = '9' - -_NeoReading = collections.namedtuple('_NeoReading', ( - 'type', # 7 = blood glucose, 9 = blood ketone - 'id', - 'month', 'day', 'year', # year is two-digits - 'hour', 'minute', - 'unknown2', - 'value', - # Extra trailing and so-far-unused fields; so discard them: - # * for blood glucose: 10 unknown trailing fields - #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', - #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', - # * for blood ketone: 2 unknown trailing fields - #'unknown3', 'unknown4', -)) +_TYPE_GLUCOSE_READING = "7" +_TYPE_KETONE_READING = "9" + +_NeoReading = collections.namedtuple( + "_NeoReading", + ( + "type", # 7 = blood glucose, 9 = blood ketone + "id", + "month", + "day", + "year", # year is two-digits + "hour", + "minute", + "unknown2", + "value", + # Extra trailing and so-far-unused fields; so discard them: + # * for blood glucose: 10 unknown trailing fields + #'unknown3', 'unknown4', 'unknown5', 'unknown6', 'unknown7', + #'unknown8', 'unknown9', 'unknown10', 'unknown11', 'unknown12', + # * for blood ketone: 2 unknown trailing fields + #'unknown3', 'unknown4', + ), +) class Device(freestyle.FreeStyleHidDevice): @@ -56,12 +61,12 @@ class Device(freestyle.FreeStyleHidDevice): def get_meter_info(self): """Return the device information in structured form.""" return common.MeterInfo( - 'FreeStyle Precision Neo', + "FreeStyle Precision Neo", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self._get_version(),), + version_info=("Software version: " + self._get_version(),), native_unit=self.get_glucose_unit(), - patient_name=self.get_patient_name()) + patient_name=self.get_patient_name(), + ) def get_glucose_unit(self): # pylint: disable=no-self-use """Returns the glucose unit of the device.""" @@ -69,7 +74,7 @@ class Device(freestyle.FreeStyleHidDevice): def get_readings(self): """Iterate through the reading records in the device.""" - for record in self._get_multirecord(b'$result?'): + for record in self._get_multirecord(b"$result?"): cls = None if record and record[0] == _TYPE_GLUCOSE_READING: cls = common.GlucoseReading @@ -85,11 +90,15 @@ class Device(freestyle.FreeStyleHidDevice): if value == "HI": value = float("inf") values.append(int(value)) - raw_reading = _NeoReading._make(values[:len(_NeoReading._fields)]) + raw_reading = _NeoReading._make(values[: len(_NeoReading._fields)]) timestamp = datetime.datetime( - raw_reading.year + 2000, raw_reading.month, raw_reading.day, - raw_reading.hour, raw_reading.minute) + raw_reading.year + 2000, + raw_reading.month, + raw_reading.day, + raw_reading.hour, + raw_reading.minute, + ) if record and record[0] == _TYPE_KETONE_READING: value = freestyle.convert_ketone_unit(raw_reading.value) diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index 39be859..5e90b87 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -16,40 +16,40 @@ Expected device path: /dev/ttyUSB0 or similar serial port device. import datetime import re -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, serial # The following two hashes are taken directly from LifeScan's documentation _MEAL_CODES = { - 'N': common.Meal.NONE, - 'B': common.Meal.BEFORE, - 'A': common.Meal.AFTER, + "N": common.Meal.NONE, + "B": common.Meal.BEFORE, + "A": common.Meal.AFTER, } _COMMENT_CODES = { - '00': '', # would be 'No Comment' - '01': 'Not Enough Food', - '02': 'Too Much Food', - '03': 'Mild Exercise', - '04': 'Hard Exercise', - '05': 'Medication', - '06': 'Stress', - '07': 'Illness', - '08': 'Feel Hypo', - '09': 'Menses', - '10': 'Vacation', - '11': 'Other', + "00": "", # would be 'No Comment' + "01": "Not Enough Food", + "02": "Too Much Food", + "03": "Mild Exercise", + "04": "Hard Exercise", + "05": "Medication", + "06": "Stress", + "07": "Illness", + "08": "Feel Hypo", + "09": "Menses", + "10": "Vacation", + "11": "Other", } -_DUMP_HEADER_RE = re.compile( - r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"') +_DUMP_HEADER_RE = re.compile(r'P ([0-9]{3}),"[0-9A-Z]{9}","(?:MG/DL |MMOL/L)"') _DUMP_LINE_RE = re.compile( r'P (?P<datetime>"[A-Z]{3}","[0-9/]{8}","[0-9:]{8} "),' r'"(?P<control>[C ]) (?P<value>[0-9]{3})(?P<parityerror>[\? ])",' - r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00') + r'"(?P<meal>[NBA])","(?P<comment>0[0-9]|1[01])", 00' +) + +_RESPONSE_MATCH = re.compile(r"^(.+) ([0-9A-F]{4})\r$") -_RESPONSE_MATCH = re.compile(r'^(.+) ([0-9A-F]{4})\r$') def _calculate_checksum(bytestring): """Calculate the checksum used by OneTouch Ultra and Ultra2 devices @@ -66,10 +66,11 @@ def _calculate_checksum(bytestring): checksum = 0 for byte in bytestring: - checksum = (checksum + byte) & 0xffff + checksum = (checksum + byte) & 0xFFFF return checksum + def _validate_and_strip_checksum(line): """Verify the simple 16-bit checksum and remove it from the line. @@ -88,20 +89,19 @@ def _validate_and_strip_checksum(line): try: checksum_given = int(checksum_string, 16) - checksum_calculated = _calculate_checksum( - bytes(response, 'ascii')) + checksum_calculated = _calculate_checksum(bytes(response, "ascii")) if checksum_given != checksum_calculated: - raise exceptions.InvalidChecksum(checksum_given, - checksum_calculated) + raise exceptions.InvalidChecksum(checksum_given, checksum_calculated) except ValueError: raise exceptions.InvalidChecksum(checksum_given, None) return response + _DATETIME_RE = re.compile( - r'^"[A-Z]{3}",' - r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$') + r'^"[A-Z]{3}",' r'"([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$' +) def _parse_datetime(response): @@ -121,8 +121,8 @@ def _parse_datetime(response): raise exceptions.InvalidResponse(response) date, time = match.groups() - month, day, year = map(int, date.split('/')) - hour, minute, second = map(int, time.split(':')) + month, day, year = map(int, date.split("/")) + hour, minute, second = map(int, time.split(":")) # Yes, OneTouch2's firmware is not Y2K safe. return datetime.datetime(2000 + year, month, day, hour, minute, second) @@ -130,7 +130,7 @@ def _parse_datetime(response): class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. def connect(self): # pylint: disable=no-self-use return @@ -144,7 +144,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Args: cmd: command and parameters to send (without newline) """ - cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii') + cmdstring = bytes("\x11\r" + cmd + "\r", "ascii") self.serial_.write(cmdstring) self.serial_.flush() @@ -160,7 +160,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): """ self._send_command(cmd) - line = self.serial_.readline().decode('ascii') + line = self.serial_.readline().decode("ascii") return _validate_and_strip_checksum(line) def get_meter_info(self): @@ -170,11 +170,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): A common.MeterInfo object. """ return common.MeterInfo( - 'OneTouch Ultra 2 glucometer', + "OneTouch Ultra 2 glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): """Returns an identifier of the firmware version of the glucometer. @@ -183,9 +183,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): The software version returned by the glucometer, such as "P02.00.00 30/08/06". """ - response = self._send_oneliner_command('DM?') + response = self._send_oneliner_command("DM?") - if response[0] != '?': + if response[0] != "?": raise exceptions.InvalidResponse(response) return response[1:] @@ -204,7 +204,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): InvalidSerialNumber: if the returned serial number does not match the OneTouch2 device as per specs. """ - response = self._send_oneliner_command('DM@') + response = self._send_oneliner_command("DM@") match = self._SERIAL_NUMBER_RE.match(response) if not match: @@ -214,7 +214,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): # 'Y' at the far right of the serial number is the indication of a # OneTouch Ultra2 device, as per specs. - if serial_number[-1] != 'Y': + if serial_number[-1] != "Y": raise lifescan.InvalidSerialNumber(serial_number) return serial_number @@ -225,12 +225,13 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): Returns: A datetime object built according to the returned response. """ - response = self._send_oneliner_command('DMF') + response = self._send_oneliner_command("DMF") return _parse_datetime(response[2:]) def _set_device_datetime(self, date): response = self._send_oneliner_command( - 'DMT' + date.strftime('%m/%d/%y %H:%M:%S')) + "DMT" + date.strftime("%m/%d/%y %H:%M:%S") + ) return _parse_datetime(response[2:]) def zero_log(self): @@ -239,8 +240,8 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): This function will clear the memory of the device deleting all the readings in an irrecoverable way. """ - response = self._send_oneliner_command('DMZ') - if response != 'Z': + response = self._send_oneliner_command("DMZ") + if response != "Z": raise exceptions.InvalidResponse(response) _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') @@ -260,15 +261,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): unit used for display. This is not settable by the user in all modern meters. """ - response = self._send_oneliner_command('DMSU?') + response = self._send_oneliner_command("DMSU?") match = self._GLUCOSE_UNIT_RE.match(response) unit = match.group(1) - if unit == 'MG/DL ': + if unit == "MG/DL ": return common.Unit.MG_DL - if unit == 'MMOL/L': + if unit == "MMOL/L": return common.Unit.MMOL_L raise exceptions.InvalidGlucoseUnit(response) @@ -287,10 +288,10 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): expected. """ - self._send_command('DMP') + self._send_command("DMP") data = self.serial_.readlines() - header = data.pop(0).decode('ascii') + header = data.pop(0).decode("ascii") match = _DUMP_HEADER_RE.match(header) if not match: raise exceptions.InvalidResponse(header) @@ -299,7 +300,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): assert count == len(data) for line in data: - line = _validate_and_strip_checksum(line.decode('ascii')) + line = _validate_and_strip_checksum(line.decode("ascii")) match = _DUMP_LINE_RE.match(line) if not match: @@ -307,11 +308,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): line_data = match.groupdict() - date = _parse_datetime(line_data['datetime']) - meal = _MEAL_CODES[line_data['meal']] - comment = _COMMENT_CODES[line_data['comment']] + date = _parse_datetime(line_data["datetime"]) + meal = _MEAL_CODES[line_data["meal"]] + comment = _COMMENT_CODES[line_data["comment"]] # OneTouch2 always returns the data in mg/dL even if the glucometer # is set to mmol/L, so there is no conversion required. yield common.GlucoseReading( - date, float(line_data['value']), meal=meal, comment=comment) + date, float(line_data["value"]), meal=meal, comment=comment + ) diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 7f4934e..0d1e7a9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -22,87 +22,97 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial +from glucometerutils.support import ( + construct_extras, + driver_base, + lifescan, + lifescan_binary_protocol, + serial, +) _PACKET = lifescan_binary_protocol.LifeScanPacket(True) _INVALID_RECORD = 501 -_COMMAND_SUCCESS = construct.Const(b'\x05\x06') +_COMMAND_SUCCESS = construct.Const(b"\x05\x06") -_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') +_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'version' / construct.PascalString(construct.Byte, encoding='ascii'), + "version" / construct.PascalString(construct.Byte, encoding="ascii"), ) _SERIAL_NUMBER_REQUEST = construct.Const( - b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00') + b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" +) _SERIAL_NUMBER_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'serial_number' / construct.GreedyString(encoding='ascii'), + _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"), ) _DATETIME_REQUEST = construct.Struct( - construct.Const(b'\x05\x20'), # 0x20 is the datetime - 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), - 'timestamp' / construct.Default( + construct.Const(b"\x05\x20"), # 0x20 is the datetime + "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02), + "timestamp" + / construct.Default( construct_extras.Timestamp(construct.Int32ul), # type: ignore - datetime.datetime(1970, 1, 1, 0, 0)), + datetime.datetime(1970, 1, 1, 0, 0), + ), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore ) -_GLUCOSE_UNIT_REQUEST = construct.Const( - b'\x05\x09\x02\x09\x00\x00\x00\x00') +_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A") _READING_COUNT_RESPONSE = construct.Struct( - construct.Const(b'\x0f'), - 'count' / construct.Int16ul, + construct.Const(b"\x0f"), "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x05\x1f'), - 'record_id' / construct.Int16ul, + construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul, ) _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore - 'value' / construct.Int32ul, + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "value" / construct.Int32ul, ) -def _make_packet( - message, sequence_number, expect_receive, acknowledge, disconnect): + +def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): return _PACKET.build( - {'data': {'value': { - 'message': message, - 'link_control': { - 'sequence_number': sequence_number, - 'expect_receive': expect_receive, - 'acknowledge': acknowledge, - 'disconnect': disconnect, - }, - }}}) + { + "data": { + "value": { + "message": message, + "link_control": { + "sequence_number": sequence_number, + "expect_receive": expect_receive, + "acknowledge": acknowledge, + "disconnect": disconnect, + }, + } + } + } + ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 def __init__(self, device): @@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.sent_counter_ = False self.expect_receive_ = False - self.buffered_reader_ = construct.Rebuffered( - _PACKET, tailcutoff=1024) + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def connect(self): try: - self._send_packet(b'', disconnect=True) + self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) @@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _send_packet(self, message, acknowledge=False, disconnect=False): pkt = _make_packet( - message, - self.sent_counter_, - self.expect_receive_, - acknowledge, - disconnect) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect + ) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self): raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data - logging.debug('received packet: %r', raw_pkt) + logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value if not pkt.link_control.disconnect and ( - pkt.link_control.sequence_number != self.expect_receive_): + pkt.link_control.sequence_number != self.expect_receive_ + ): raise lifescan.MalformedCommand( - 'at position 2[0b] expected %02x, received %02x' % ( - self.expect_receive_, pkt.link_connect.sequence_count)) + "at position 2[0b] expected %02x, received %02x" + % (self.expect_receive_, pkt.link_connect.sequence_count) + ) return pkt def _send_ack(self): - self._send_packet(b'', acknowledge=True, disconnect=False) + self._send_packet(b"", acknowledge=True, disconnect=False) def _read_ack(self): pkt = self._read_packet() @@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def get_meter_info(self): return common.MeterInfo( - 'OneTouch Ultra Easy glucometer', + "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): - response = self._send_request( - _VERSION_REQUEST, None, _VERSION_RESPONSE) + response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self): response = self._send_request( - _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE + ) return response.serial_number def get_datetime(self): response = self._send_request( - _DATETIME_REQUEST, {'request_type': 'read'}, - _DATETIME_RESPONSE) + _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE + ) return response.timestamp def _set_device_datetime(self, date): response = self._send_request( - _DATETIME_REQUEST, { - 'request_type': 'write', - 'timestamp': date, - }, _DATETIME_RESPONSE) + _DATETIME_REQUEST, + {"request_type": "write", "timestamp": date,}, + _DATETIME_RESPONSE, + ) return response.timestamp def zero_log(self): - self._send_request( - _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}, - _READING_COUNT_RESPONSE) + _READ_RECORD_REQUEST, + {"record_id": _INVALID_RECORD}, + _READING_COUNT_RESPONSE, + ) return response.count def _get_reading(self, record_id): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE + ) - return common.GlucoseReading( - response.timestamp, float(response.value)) + return common.GlucoseReading(response.timestamp, float(response.value)) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py index bde0af3..b8429ea 100644 --- a/glucometerutils/drivers/otverio2015.py +++ b/glucometerutils/drivers/otverio2015.py @@ -29,67 +29,61 @@ import construct from pyscsi.pyscsi.scsi import SCSI from pyscsi.pyscsi.scsi_device import SCSIDevice -from glucometerutils import common -from glucometerutils import exceptions +from glucometerutils import common, exceptions from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol # This device uses SCSI blocks as registers. _REGISTER_SIZE = 512 _PACKET = construct.Padded( - _REGISTER_SIZE, - lifescan_binary_protocol.LifeScanPacket(False)) + _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False) +) -_COMMAND_SUCCESS = construct.Const(b'\x03\x06') +_COMMAND_SUCCESS = construct.Const(b"\x03\x06") _QUERY_REQUEST = construct.Struct( - construct.Const(b'\x03\xe6\x02'), - 'selector' / construct.Enum( - construct.Byte, serial=0x00, model=0x01, software=0x02), + construct.Const(b"\x03\xe6\x02"), + "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02), ) _QUERY_RESPONSE = construct.Struct( - construct.Const(b'\x03\x06'), - 'value' / construct.CString(encoding='utf-16-le'), + construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"), ) _READ_PARAMETER_REQUEST = construct.Struct( - construct.Const(b'\x03'), - 'selector' / construct.Enum( - construct.Byte, unit=0x04), + construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04), ) _READ_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02') +_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02") _READ_RTC_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) _WRITE_RTC_REQUEST = construct.Struct( - construct.Const(b'\x03\x20\x01'), - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + construct.Const(b"\x03\x20\x01"), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a") -_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00') +_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00") _READ_RECORD_COUNT_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'count' / construct.Int16ul, + _COMMAND_SUCCESS, "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x03\x31\x02'), - 'record_id' / construct.Int16ul, - construct.Const(b'\x00'), + construct.Const(b"\x03\x31\x02"), + "record_id" / construct.Int16ul, + construct.Const(b"\x00"), ) _MEAL_FLAG = { @@ -100,13 +94,12 @@ _MEAL_FLAG = { _READ_RECORD_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'inverse_counter' / construct.Int16ul, + "inverse_counter" / construct.Int16ul, construct.Padding(1), - 'lifetime_counter' / construct.Int16ul, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore - 'value' / construct.Int16ul, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "lifetime_counter" / construct.Int16ul, + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "value" / construct.Int16ul, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Padding(4), ) @@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver): def __init__(self, device): if not device: raise exceptions.CommandLineError( - '--device parameter is required, should point to the disk ' - 'device representing the meter.') + "--device parameter is required, should point to the disk " + "device representing the meter." + ) self.device_name_ = device self.scsi_device_ = SCSIDevice(device, readwrite=True) @@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver): def connect(self): inq = self.scsi_.inquiry() - logging.debug('Device connected: %r', inq.result) - vendor = inq.result['t10_vendor_identification'][:32] - if vendor != b'LifeScan': + logging.debug("Device connected: %r", inq.result) + vendor = inq.result["t10_vendor_identification"][:32] + if vendor != b"LifeScan": raise exceptions.ConnectionFailed( - 'Device %s is not a LifeScan glucometer.' % self.device_name_) + "Device %s is not a LifeScan glucometer." % self.device_name_ + ) def disconnect(self): # pylint: disable=no-self-use return @@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver): """ try: request = request_format.build(request_obj) - request_raw = _PACKET.build({'data': {'value': { - 'message': request, - }}}) - logging.debug( - 'Request sent: %s', binascii.hexlify(request_raw)) + request_raw = _PACKET.build({"data": {"value": {"message": request,}}}) + logging.debug("Request sent: %s", binascii.hexlify(request_raw)) self.scsi_.write10(lba, 1, request_raw) response_raw = self.scsi_.read10(lba, 1) logging.debug( - 'Response received: %s', binascii.hexlify(response_raw.datain)) + "Response received: %s", binascii.hexlify(response_raw.datain) + ) response_pkt = _PACKET.parse(response_raw.datain).data - logging.debug('Response packet: %r', response_pkt) + logging.debug("Response packet: %r", response_pkt) response = response_format.parse(response_pkt.value.message) - logging.debug('Response parsed: %r', response) + logging.debug("Response parsed: %r", response) return response except construct.ConstructError as e: @@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver): def _query_string(self, selector): response = self._send_request( - 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE) + 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE + ) return response.value def get_meter_info(self): return common.MeterInfo( - 'OneTouch %s glucometer' % self._query_string('model'), + "OneTouch %s glucometer" % self._query_string("model"), serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_serial_number(self): - return self._query_string('serial') + return self._query_string("serial") def get_version(self): - return self._query_string('software') + return self._query_string("software") def get_datetime(self): - response = self._send_request( - 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp def _set_device_datetime(self, date): - self._send_request( - 3, _WRITE_RTC_REQUEST, {'timestamp': date}, - _COMMAND_SUCCESS) + self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() def zero_log(self): - self._send_request( - 3, _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'}, - _READ_UNIT_RESPONSE) + 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE + ) return response.count def _get_reading(self, record_id): response = self._send_request( - 3, _READ_RECORD_REQUEST, {'record_id': record_id}, - _READ_RECORD_RESPONSE) + 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE + ) return common.GlucoseReading( - response.timestamp, float(response.value), meal=response.meal) + response.timestamp, float(response.value), meal=response.meal + ) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py index 69bdac9..24327ef 100644 --- a/glucometerutils/drivers/otverioiq.py +++ b/glucometerutils/drivers/otverioiq.py @@ -21,63 +21,63 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol, serial +from glucometerutils.support import ( + driver_base, + lifescan, + lifescan_binary_protocol, + serial, +) _PACKET = lifescan_binary_protocol.LifeScanPacket(False) -_COMMAND_SUCCESS = construct.Const(b'\x03\x06') +_COMMAND_SUCCESS = construct.Const(b"\x03\x06") -_VERSION_REQUEST = construct.Const(b'\x03\x0d\x01') +_VERSION_REQUEST = construct.Const(b"\x03\x0d\x01") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'version' / construct.PascalString(construct.Byte, encoding='ascii'), + "version" / construct.PascalString(construct.Byte, encoding="ascii"), # NULL-termination is not included in string length. - construct.Const(b'\x00'), + construct.Const(b"\x00"), ) -_SERIAL_NUMBER_REQUEST = construct.Const( - b'\x03\x0b\x01\x02') +_SERIAL_NUMBER_REQUEST = construct.Const(b"\x03\x0b\x01\x02") _SERIAL_NUMBER_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'serial_number' / construct.CString(encoding='ascii'), + _COMMAND_SUCCESS, "serial_number" / construct.CString(encoding="ascii"), ) -_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02') +_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02") _READ_RTC_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) _WRITE_RTC_REQUEST = construct.Struct( - construct.Const(b'\x03\x20\x01'), - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + construct.Const(b"\x03\x20\x01"), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore ) -_GLUCOSE_UNIT_REQUEST = construct.Const( - b'\x03\x09\x02\x02') +_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x03\x09\x02\x02") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a") -_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00') +_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00") _READ_RECORD_COUNT_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'count' / construct.Int16ul, + _COMMAND_SUCCESS, "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x03\x21'), - 'record_id' / construct.Int16ul, + construct.Const(b"\x03\x21"), "record_id" / construct.Int16ul, ) _MEAL_FLAG = { @@ -88,18 +88,17 @@ _MEAL_FLAG = { _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore - 'value' / construct.Int16ul, - 'control_test' / construct.Flag, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore + "value" / construct.Int16ul, + "control_test" / construct.Flag, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Padding(2), # unknown ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 - DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x + DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x TIMEOUT = 0.5 def __init__(self, device): @@ -107,18 +106,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def _send_packet(self, message): - pkt = _PACKET.build( - {'data': {'value': { - 'message': message, - }}}) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + pkt = _PACKET.build({"data": {"value": {"message": message,}}}) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self): raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data - logging.debug('received packet: %r', raw_pkt) + logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value @@ -138,66 +134,64 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def get_meter_info(self): return common.MeterInfo( - 'OneTouch Verio IQ glucometer', + "OneTouch Verio IQ glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): - response = self._send_request( - _VERSION_REQUEST, None, _VERSION_RESPONSE) + response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self): response = self._send_request( - _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE + ) return response.serial_number def get_datetime(self): - response = self._send_request( - _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) + response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE) return response.timestamp def _set_device_datetime(self, date): - self._send_request( - _WRITE_RTC_REQUEST, { - 'timestamp': date, - }, _COMMAND_SUCCESS) + self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date,}, _COMMAND_SUCCESS) # The device does not return the new datetime, so confirm by calling # READ RTC again. return self.get_datetime() def zero_log(self): - self._send_request( - _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE) + _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE + ) return response.count def _get_reading(self, record_id): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE + ) if response.control_test: - logging.debug('control solution test, ignoring.') + logging.debug("control solution test, ignoring.") return None return common.GlucoseReading( - response.timestamp, float(response.value), meal=response.meal) + response.timestamp, float(response.value), meal=response.meal + ) def get_readings(self): record_count = self._get_reading_count() diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index a6e2ce5..47dd9ca 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -24,46 +24,44 @@ import operator import construct -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial def xor_checksum(msg): return functools.reduce(operator.xor, msg) + class Direction(enum.Enum): In = 0x20 Out = 0x10 + _PACKET = construct.Struct( - 'stx' / construct.Const(0x53, construct.Byte), - 'direction' / construct.Mapping( - construct.Byte, - {e: e.value for e in Direction}), - 'length' / construct.Rebuild( - construct.Byte, lambda this: len(this.message) + 2), - 'message' / construct.Bytes(lambda this: this.length - 2), - 'checksum' / construct.Checksum( - construct.Byte, xor_checksum, construct.this.message), - 'etx' / construct.Const(0xAA, construct.Byte) + "stx" / construct.Const(0x53, construct.Byte), + "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), + "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2), + "message" / construct.Bytes(lambda this: this.length - 2), + "checksum" + / construct.Checksum(construct.Byte, xor_checksum, construct.this.message), + "etx" / construct.Const(0xAA, construct.Byte), ) _FIRST_MESSAGE = construct.Struct( construct.Const(0x30, construct.Byte), - 'count' / construct.Int16ub, + "count" / construct.Int16ub, construct.Const(0xAA, construct.Byte)[19], ) -_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA' -_RESPONSE_MESSAGE = b'\x10\x40' +_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA" +_RESPONSE_MESSAGE = b"\x10\x40" -_DATE_SET_MESSAGE = b'\x10\x10' +_DATE_SET_MESSAGE = b"\x10\x10" -_DISCONNECT_MESSAGE = b'\x10\x60' -_DISCONNECTED_MESSAGE = b'\x10\x70' +_DISCONNECT_MESSAGE = b"\x10\x60" +_DISCONNECTED_MESSAGE = b"\x10\x70" -_FETCH_MESSAGE = b'\x10\x60' +_FETCH_MESSAGE = b"\x10\x60" _MEAL_FLAG = { common.Meal.NONE: 0x00, @@ -73,67 +71,64 @@ _MEAL_FLAG = { _READING = construct.Struct( construct.Byte[2], - 'year' / construct.Byte, - 'month' / construct.Byte, - 'day' / construct.Byte, - 'hour' / construct.Byte, - 'minute' / construct.Byte, - 'value' / construct.Int16ub, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "year" / construct.Byte, + "month" / construct.Byte, + "day" / construct.Byte, + "hour" / construct.Byte, + "minute" / construct.Byte, + "value" / construct.Int16ub, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Byte[7], ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 - DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable. + DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. def read_message(self): pkt = _PACKET.parse_stream(self.serial_) - logging.debug('received packet: %r', pkt) + logging.debug("received packet: %r", pkt) return pkt.message def wait_and_ready(self): - challenge = b'\0' - while challenge == b'\0': + challenge = b"\0" + while challenge == b"\0": challenge = self.serial_.read(1) # The first packet read may have a prefixed zero, it might be a bug # in the cp210x driver or device, but discard it if found. - if challenge == b'\0': - logging.debug('spurious null byte received') + if challenge == b"\0": + logging.debug("spurious null byte received") continue - if challenge != b'\x53': + if challenge != b"\x53": raise exceptions.ConnectionFailed( - message='Unexpected starting bytes %r' % challenge) + message="Unexpected starting bytes %r" % challenge + ) challenge += self.serial_.read(6) if challenge != _CHALLENGE_PACKET_FULL: raise exceptions.ConnectionFailed( - message='Unexpected challenge %r' % challenge) + message="Unexpected challenge %r" % challenge + ) - logging.debug( - 'challenge packet received: %s', binascii.hexlify(challenge)) + logging.debug("challenge packet received: %s", binascii.hexlify(challenge)) self.send_message(_RESPONSE_MESSAGE) # The first packet only contains the counter of how many readings are # available. first_message = _FIRST_MESSAGE.parse(self.read_message()) - logging.debug('received first message: %r', first_message) + logging.debug("received first message: %r", first_message) return first_message.count def send_message(self, message): - pkt = _PACKET.build({ - 'message': message, - 'direction': Direction.Out - }) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + pkt = _PACKET.build({"message": message, "direction": Direction.Out}) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) def connect(self): # pylint: disable=no-self-use @@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) def get_meter_info(self): # pylint: disable=no-self-use - return common.MeterInfo('SD CodeFree glucometer') + return common.MeterInfo("SD CodeFree glucometer") def get_version(self): # pylint: disable=no-self-use raise NotImplementedError @@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise NotImplementedError def _set_device_datetime(self, date): - setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') + setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii") # Ignore the readings count. self.wait_and_ready() @@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) # The date we return should only include up to minute, unfortunately. - return datetime.datetime(date.year, date.month, date.day, - date.hour, date.minute) + return datetime.datetime( + date.year, date.month, date.day, date.hour, date.minute + ) def zero_log(self): raise NotImplementedError @@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): message = self.read_message() reading = _READING.parse(message) - logging.debug('received reading: %r', reading) + logging.debug("received reading: %r", reading) yield common.GlucoseReading( datetime.datetime( - 2000 + reading.year, reading.month, - reading.day, reading.hour, reading.minute), - reading.value, meal=reading.meal) + 2000 + reading.year, + reading.month, + reading.day, + reading.hour, + reading.minute, + ), + reading.value, + meal=reading.meal, + ) diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py index 4ab25ee..0385299 100644 --- a/glucometerutils/drivers/td4277.py +++ b/glucometerutils/drivers/td4277.py @@ -21,14 +21,13 @@ import operator import construct -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial class Direction(enum.Enum): - In = 0xa5 - Out = 0xa3 + In = 0xA5 + Out = 0xA3 def byte_checksum(data): @@ -36,18 +35,18 @@ def byte_checksum(data): _PACKET = construct.Struct( - 'data' / construct.RawCopy( + "data" + / construct.RawCopy( construct.Struct( - construct.Const(b'\x51'), - 'command' / construct.Byte, - 'message' / construct.Bytes(4), - 'direction' / construct.Mapping( - construct.Byte, - {e: e.value for e in Direction}), + construct.Const(b"\x51"), + "command" / construct.Byte, + "message" / construct.Bytes(4), + "direction" + / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), ), ), - 'checksum' / construct.Checksum( - construct.Byte, byte_checksum, construct.this.data.data), + "checksum" + / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data), ) _EMPTY_MESSAGE = 0 @@ -60,38 +59,32 @@ _SET_DATETIME = 0x33 _GET_MODEL = 0x24 -_GET_READING_COUNT = 0x2b +_GET_READING_COUNT = 0x2B _GET_READING_DATETIME = 0x25 _GET_READING_VALUE = 0x26 _CLEAR_MEMORY = 0x52 _MODEL_STRUCT = construct.Struct( - construct.Const(b'\x77\x42'), - construct.Byte, - construct.Byte, + construct.Const(b"\x77\x42"), construct.Byte, construct.Byte, ) _DATETIME_STRUCT = construct.Struct( - 'day' / construct.Int16ul, - 'minute' / construct.Byte, - 'hour' / construct.Byte, + "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte, ) _DAY_BITSTRUCT = construct.BitStruct( - 'year' / construct.BitsInteger(7), - 'month' / construct.BitsInteger(4), - 'day' / construct.BitsInteger(5), + "year" / construct.BitsInteger(7), + "month" / construct.BitsInteger(4), + "day" / construct.BitsInteger(5), ) _READING_COUNT_STRUCT = construct.Struct( - 'count' / construct.Int16ul, - construct.Int16ul, + "count" / construct.Int16ul, construct.Int16ul, ) _READING_SELECTION_STRUCT = construct.Struct( - 'record_id' / construct.Int16ul, - construct.Const(b'\x00\x00'), + "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"), ) _MEAL_FLAG = { @@ -101,21 +94,24 @@ _MEAL_FLAG = { } _READING_VALUE_STRUCT = construct.Struct( - 'value' / construct.Int16ul, - construct.Const(b'\x06'), - 'meal'/ construct.Mapping( - construct.Byte, _MEAL_FLAG), + "value" / construct.Int16ul, + construct.Const(b"\x06"), + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), ) + def _make_packet(command, message, direction=Direction.Out): return _PACKET.build( - {'data': { - 'value': { - 'command': command, - 'message': message, - 'direction': direction, - }, - }}) + { + "data": { + "value": { + "command": command, + "message": message, + "direction": direction, + }, + } + } + ) def _parse_datetime(message): @@ -124,11 +120,12 @@ def _parse_datetime(message): # unfortunately. day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day)) return datetime.datetime( - 2000+day.year, day.month, day.day, date.hour, date.minute) + 2000 + day.year, day.month, day.day, date.hour, date.minute + ) def _select_record(record_id): - return _READING_SELECTION_STRUCT.build({'record_id': record_id}) + return _READING_SELECTION_STRUCT.build({"record_id": record_id}) class Device(serial.SerialDevice, driver_base.GlucometerDriver): @@ -137,19 +134,17 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): TIMEOUT = 0.5 def __init__(self, device): - super(Device, self).__init__('cp2110://' + device) - self.buffered_reader_ = construct.Rebuffered( - _PACKET, tailcutoff=1024) + super(Device, self).__init__("cp2110://" + device) + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - def _send_command( - self, command, message=_EMPTY_MESSAGE, validate_response=True): + def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True): pkt = _make_packet(command, message) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() response = self.buffered_reader_.parse_stream(self.serial_) - logging.debug('received packet: %r', response) + logging.debug("received packet: %r", response) if validate_response and response.data.value.command != command: raise InvalidResponse(response) @@ -158,24 +153,26 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def connect(self): response_command, message = self._send_command( - _CONNECT_REQUEST, validate_response=False) + _CONNECT_REQUEST, validate_response=False + ) if response_command not in _VALID_CONNECT_RESPONSE: raise exceptions.ConnectionFailed( - 'Invalid response received: %2x %r' % ( - response_command, message)) + "Invalid response received: %2x %r" % (response_command, message) + ) _, model_message = self._send_command(_GET_MODEL) try: _MODEL_STRUCT.parse(model_message) except construct.ConstructError: raise exceptions.ConnectionFailed( - 'Invalid model identified: %r' % model_message) + "Invalid model identified: %r" % model_message + ) def disconnect(self): pass def get_meter_info(self): - return common.MeterInfo('TaiDoc TD-4277 glucometer') + return common.MeterInfo("TaiDoc TD-4277 glucometer") def get_version(self): # pylint: disable=no-self-use raise NotImplementedError @@ -191,18 +188,15 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _set_device_datetime(self, date): assert date.year >= 2000 - day_struct = _DAY_BITSTRUCT.build({ - 'year': date.year - 2000, - 'month': date.month, - 'day': date.day, - }) + day_struct = _DAY_BITSTRUCT.build( + {"year": date.year - 2000, "month": date.month, "day": date.day,} + ) day_word = construct.Int16ub.parse(day_struct) - date_message = _DATETIME_STRUCT.build({ - 'day': day_word, - 'minute': date.minute, - 'hour': date.hour}) + date_message = _DATETIME_STRUCT.build( + {"day": day_word, "minute": date.minute, "hour": date.hour} + ) _, message = self._send_command(_SET_DATETIME, message=date_message) @@ -215,17 +209,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _get_reading(self, record_id): _, reading_date_message = self._send_command( - _GET_READING_DATETIME, - _select_record(record_id)) + _GET_READING_DATETIME, _select_record(record_id) + ) reading_date = _parse_datetime(reading_date_message) _, reading_value_message = self._send_command( - _GET_READING_VALUE, - _select_record(record_id)) + _GET_READING_VALUE, _select_record(record_id) + ) reading_value = _READING_VALUE_STRUCT.parse(reading_value_message) return common.GlucoseReading( - reading_date, reading_value.value, meal=reading_value.meal) + reading_date, reading_value.value, meal=reading_value.meal + ) def get_readings(self): record_count = self._get_reading_count() |