From 48e89e53d9983312e36ae6c353da9b94fa46b590 Mon Sep 17 00:00:00 2001 From: Arvanitis Christos Date: Tue, 24 Sep 2019 23:06:16 +0100 Subject: Add driver implementation for Ascensia ContourUSB. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented based on the protocol documentation provide by Ascensia, with portions from glucodump by @sm6xmm (relicensed by the author under MIT license.) Squashed from pull request #62. Signed-off-by: Anders Hammarquist Signed-off-by: Diego Elio Pettenò --- AUTHORS | 4 +- README | 1 + glucometerutils/drivers/contourusb.py | 81 +++++++++ glucometerutils/support/contourusb.py | 328 ++++++++++++++++++++++++++++++++++ setup.py | 1 + test/test_contourusb.py | 117 ++++++++++++ 6 files changed, 531 insertions(+), 1 deletion(-) create mode 100644 glucometerutils/drivers/contourusb.py create mode 100644 glucometerutils/support/contourusb.py create mode 100644 test/test_contourusb.py diff --git a/AUTHORS b/AUTHORS index dc6803c..315cf65 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,11 +1,13 @@ -# This is the list of usbmon-tools authors for copyright purposes. +# This is the list of glucometerutils authors for copyright purposes. # # This does not necessarily list everyone who has contributed code, since in # some cases, their employer may be the copyright holder. To see the full list # of contributors, see the revision history in source control. +Anders Hammarquist Andreas Sandberg André Caldas Arkadiusz Bulski +Christos Arvanitis Diego Elio Pettenò Dorian Scholz Jim Sifferle diff --git a/README b/README index c623408..79d006e 100644 --- a/README +++ b/README @@ -53,6 +53,7 @@ supported. | GlucoRx | Nexus | `td4277` | [construct] [pyserial]² [hidapi] | | Menarini | GlucoMen Nexus | `td4277` | [construct] [pyserial]² [hidapi] | | Aktivmed | GlucoCheck XL | `td4277` | [construct] [pyserial]² [hidapi] | +| Ascensia | ContourUSB | `contourusb` | [construct] [hidapi]‡ | † Untested. diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py new file mode 100644 index 0000000..6c333fb --- /dev/null +++ b/glucometerutils/drivers/contourusb.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +"""Driver for ContourUSB devices. + +Supported features: + - get readings (blood glucose), including comments; + - get date and time; + - get serial number and software version; + - get device info (e.g. unit) + +Expected device path: /dev/hidraw4 or similar HID device. Optional when using +HIDAPI. + +Further information on the device protocol can be found at + +http://protocols.ascensia.com/Programming-Guide.aspx + +""" + +import datetime + +from glucometerutils import common +from glucometerutils.support import contourusb + +def _extract_timestamp(parsed_record, prefix=''): + """Extract the timestamp from a parsed record. + + This leverages the fact that all the reading records have the same base structure. + """ + datetime_str = parsed_record['datetime'] + + return datetime.datetime( + int(datetime_str[0:4]), #year + int(datetime_str[4:6]), #month + int(datetime_str[6:8]), #day + int(datetime_str[8:10]), #hour + int(datetime_str[10:12]), #minute + 0) + + +class Device(contourusb.ContourHidDevice): + """Glucometer driver for FreeStyle Libre devices.""" + + USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_PRODUCT_ID = 0x6002 # type: int + + + def get_meter_info(self): + """Return the device information in structured form.""" + self._get_info_record() + return common.MeterInfo( + 'Contour USB', + serial_number=self._get_serial_number(), + version_info=( + 'Meter versions: ' + self._get_version(),), + native_unit= self.get_glucose_unit()) + + def get_glucose_unit(self): # pylint: disable=no-self-use + """Returns the glucose unit of the device.""" + + if self._get_glucose_unit() == '0': + return common.Unit.MG_DL + else: + return common.Unit.MMOL_L + + + def get_readings(self): + """ + Get reading dump from download data mode(all readings stored) + This meter supports only blood samples + """ + for parsed_record in self._get_multirecord(): + yield common.GlucoseReading( + _extract_timestamp(parsed_record), + int(parsed_record['value']), + comment=parsed_record['markers'], + measure_method=common.MeasurementMethod.BLOOD_SAMPLE + ) + + diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py new file mode 100644 index 0000000..d3bd6dc --- /dev/null +++ b/glucometerutils/support/contourusb.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +"""Common routines to implement the ContourUSB common protocol. + +Protocol documentation available from Ascensia at +http://protocols.ascensia.com/Programming-Guide.aspx + +* glucodump code segments are developed by Anders Hammarquist +* Rest of code is developed by Arvanitis Christos + +""" + +import csv +import datetime +import logging +import re +import construct + +from glucometerutils import exceptions +from glucometerutils.exceptions import InvalidResponse +from glucometerutils.support import hiddevice + +# regexr.com/4k6jb +_HEADER_RECORD_RE = re.compile( + "^(?P[a-zA-Z])\\|(?P.)(?P.)" + "(?P.)(?P.)\\|\\w*\\|(?P\\w+)" + "\\^(?P[0-9]{2}\\.[0-9]{2})\\\\(?P[0-9]{2}\\.[0-9]{2})" + "\\\\(?P[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\" + "^(?P(\\w|-)+)\\^(?P(\\w|-)+)\\|" + "A=(?P[0-9])\\^C=(?P[0-9]+)\\" + "^G=(?P[0-9]+)\\^I=(?P[0-9]+)\\^R=(?P[0-9]+)\\" + "^S=(?P[0-9]+)\\^U=(?P[0-9]+)\\" + "^V=(?P[0-9]{2})(?P[0-9]{3})\\" + "^X=(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})\\" + "^Y=(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})\\^Z=(?P[0-2])\\|" + "(?P[0-9]*)\\|\\|\\|\\|\\|\\|" + "(?P[0-9]+)\\|(?P[0-9]+)") + +_RESULT_RECORD_RE = re.compile( + "^(?P[a-zA-Z])\\|(?P[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" + "^(?P\\w+)\\|(?P[0-9]+)\\|(?P\\w+\\/\\w+)\\^" + "(?P[BPD])\\|\\|(?P[>[0-9]+)") + +_RECORD_FORMAT = re.compile( + '\x02(?P(?P[0-7])(?P[^\x0d]*)' + '\x0d(?P[\x03\x17]))' + '(?P[0-9A-F][0-9A-F])\x0d\x0a') + +class FrameError(Exception): + pass + +class ContourHidDevice(hiddevice.HidDevice): + """Base class implementing the ContourUSB HID common protocol. + """ + blocksize = 64 + + # Operation modes + mode_establish = object + mode_data = object() + mode_precommand = object() + mode_command = object() + state = None + + def read(self, r_size=blocksize): + result = [] + + while True: + data = self._read() + dstr = data + result.append(dstr[4:data[3]+4]) + if data[3] != self.blocksize-4: + break + + return (b"".join(result)) + + def write(self, data): + data = b'ABC' + chr(len(data)).encode() + data.encode() + pad_length = self.blocksize - len(data) + data += pad_length * b'\x00' + + self._write(data) + + USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_PRODUCT_ID = 0x6002 # type: int + + def parse_header_record(self, text): + header = _HEADER_RECORD_RE.search(text) + + self.field_del = header.group('field_del') + self.repeat_del = header.group('repeat_del') + self.component_del = header.group('component_del') + self.escape_del = header.group('escape_del') + + self.product_code = header.group('product_code') + self.dig_ver = header.group('dig_ver') + self.anlg_ver = header.group('anlg_ver') + self.agp_ver = header.group('agp_ver') + + self.serial_num = header.group('serial_num') + self.sku_id = header.group('sku_id') + self.res_marking = header.group('res_marking') + self.config_bits = header.group('config_bits') + self.lang = header.group('lang') + self.interv = header.group('interv') + self.ref_method = header.group('ref_method') + self.internal = header.group('internal') + + # U limit + self.unit = header.group('unit') + self.lo_bound = header.group('lo_bound') + self.hi_bound = header.group('hi_bound') + + # X field + self.hypo_limit = header.group('hypo_limit') + self.overall_low = header.group('overall_low') + self.pre_food_low = header.group('pre_food_low') + self.post_food_low = header.group('post_food_low') + self.overall_high = header.group('overall_high') + self.pre_food_high = header.group('pre_food_high') + self.post_food_high = header.group('post_food_high') + self.hyper_limit = header.group('hyper_limit') + + # Y field + self.upp_hyper = header.group('upp_hyper') + self.low_hyper = header.group('low_hyper') + self.upp_hypo = header.group('upp_hypo') + self.low_hypo = header.group('low_hypo') + self.upp_low_target = header.group('upp_low_target') + self.low_low_target = header.group('low_low_target') + self.upp_hi_target = header.group('upp_hi_target') + self.low_hi_target = header.group('low_hi_target') + + # Z field + self.trends = header.group('trends') + + self.total = header.group('total') + self.spec_ver = header.group('spec_ver') + # Datetime string in YYYYMMDDHHMM format + self.datetime = header.group('datetime') + + + def checksum(self, text): + """ + Implemented by Anders Hammarquist for glucodump project + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1] + return ('00' + checksum)[-2:] + + def checkframe(self, frame): + """ + Implemented by Anders Hammarquist for glucodump project + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + match = _RECORD_FORMAT.match(frame) + if not match: + raise FrameError("Couldn't parse frame", frame) + + recno = int(match.group('recno')) + if self.currecno is None: + self.currecno = recno + + if recno + 1 == self.currecno: + return None + + if recno != self.currecno: + raise FrameError("Bad recno, got %r expected %r" % + (recno, self.currecno), + frame) + + checksum = self.checksum(match.group('check')) + if checksum != match.group('checksum'): + raise FrameError("Checksum error: got %s expected %s" % + (match.group('checksum'), checksum), + frame) + + self.currecno = (self.currecno + 1) % 8 + return match.group('text') + + def connect(self): + """Connecting the device, nothing to be done. + All process is hadled by hiddevice + """ + pass + + def _get_info_record(self): + self.currecno = None + self.state = self.mode_establish + try: + while True: + self.write('\x04') + res = self.read() + if res[0] == 4 and res[-1] == 5: + # we are connected and just got a header + header_record = res.decode() + stx = header_record.find('\x02') + if stx != -1: + result = _RECORD_FORMAT.match( + header_record[stx:]).group('text') + self.parse_header_record(result) + break + else: + pass + + except FrameError as e: + print("Frame error") + raise e + + except Exception as e: + print("Uknown error occured") + raise e + + def disconnect(self): + """Disconnect the device, nothing to be done.""" + pass + + # Some of the commands are also shared across devices that use this HID + # protocol, but not many. Only provide here those that do seep to change + # between them. + def _get_version(self): + # type: () -> Text + """Return the software version of the device.""" + return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver + + def _get_serial_number(self): + # type: () -> Text + """Returns the serial number of the device.""" + return self.serial_num + + def _get_glucose_unit(self): + # type: () -> Text + """Return 0 for mg/dL, 1 for mmol/L""" + return self.unit + + def get_datetime(self): + # type: () -> datetime.datetime + datetime_str = self.datetime + return datetime.datetime( + int(datetime_str[0:4]), # year + int(datetime_str[4:6]), # month + int(datetime_str[6:8]), # day + int(datetime_str[8:10]), # hour + int(datetime_str[10:12]), # minute + 0) + + def sync(self): + """ + Sync with meter and yield received data frames + FSM implemented by Anders Hammarquist's for glucodump + More info: https://bitbucket.org/iko/glucodump/src/default/ + """ + self.state = self.mode_establish + try: + tometer = '\x04' + result = None + foo = 0 + while True: + self.write(tometer) + if result is not None and self.state == self.mode_data: + yield result + result = None + data_bytes = self.read() + data = data_bytes.decode() + + if self.state == self.mode_establish: + if data_bytes[-1] == 15: + # got a , send + tometer = chr(foo) + foo += 1 + foo %= 256 + continue + if data_bytes[-1] == 5: + # got an , send + tometer = '\x06' + self.currecno = None + continue + if self.state == self.mode_data: + if data_bytes[-1] == 4: + # got an , done + self.state = self.mode_precommand + break + stx = data.find('\x02') + if stx != -1: + # got , parse frame + try: + result = self.checkframe(data[stx:]) + tometer = '\x06' + self.state = self.mode_data + except FrameError as e: + tometer = '\x15' # Couldn't parse, + else: + # Got something we don't understand, it + tometer = '\x15' + except Exception as e: + raise e + + def parse_result_record(self, text): + # type : text -> dict + result = _RESULT_RECORD_RE.search(text) + rec_text = result.groupdict() + return rec_text + + def _get_multirecord(self): + # type: (bytes) -> Iterator[List[Text]] + """Queries for, and returns, "multirecords" results. + + Returns: + (csv.reader): a CSV reader object that returns a record for each line + in the record file. + """ + records_arr = [] + for rec in self.sync(): + if rec[0] == 'R': + # parse using result record regular expression + rec_text = self.parse_result_record(rec) + # get dictionary to use in main driver module without import re + + records_arr.append(rec_text) + # return csv.reader(records_arr) + return records_arr # array of groupdicts \ No newline at end of file diff --git a/setup.py b/setup.py index f4dee67..48dd85c 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ setup( # These are all the drivers' dependencies. Optional dependencies are # listed as mandatory for the feature. 'accucheck_reports': [], + 'contourusb': ['construct', 'hidapi'], 'fsinsulinx': ['construct', 'hidapi'], 'fslibre': ['construct', 'hidapi'], 'fsoptium': ['pyserial'], diff --git a/test/test_contourusb.py b/test/test_contourusb.py new file mode 100644 index 0000000..e2fb6cb --- /dev/null +++ b/test/test_contourusb.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +"""Tests for the common ContourUSB functions..""" + +# pylint: disable=protected-access,missing-docstring + +from absl.testing import absltest + +from glucometerutils.support import contourusb + +from unittest.mock import Mock + + + +class TestContourUSB(absltest.TestCase): + + header_record = b'\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05' + + mock_dev = Mock() + + def test_get_datetime(self): + import datetime + + self.datetime = "201908071315" # returned by + self.assertEqual( + datetime.datetime(2019,8,7,13,15), + contourusb.ContourHidDevice.get_datetime(self) + ) + + + def test_RECORD_FORMAT_match(self): + #first decode the header record frame + header_record_decoded = self.header_record.decode() + stx = header_record_decoded.find('\x02') + + _RECORD_FORMAT = contourusb._RECORD_FORMAT + result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text') + + self.assertEqual( + "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304", + result + ) + + def test_parse_header_record(self): + + _HEADER_RECORD_RE = contourusb._HEADER_RECORD_RE + _RECORD_FORMAT = contourusb._RECORD_FORMAT + + + header_record_decoded = self.header_record.decode() + stx = header_record_decoded.find('\x02') + + + result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text') + contourusb.ContourHidDevice.parse_header_record(self.mock_dev,result) + + self.assertEqual(self.mock_dev.field_del, "\\") + self.assertEqual(self.mock_dev.repeat_del, "^") + self.assertEqual(self.mock_dev.component_del, "&") + self.assertEqual(self.mock_dev.escape_del, "|") + + self.assertEqual(self.mock_dev.product_code, "Bayer7390") + + self.assertEqual(self.mock_dev.dig_ver, "01.24") + self.assertEqual(self.mock_dev.anlg_ver, "01.04") + self.assertEqual(self.mock_dev.agp_ver, "09.02.20") + self.assertEqual(self.mock_dev.serial_num, "7390-2336773") + self.assertEqual(self.mock_dev.sku_id, "7403-") + self.assertEqual(self.mock_dev.res_marking, "1") + self.assertEqual(self.mock_dev.config_bits, "63") + self.assertEqual(self.mock_dev.lang, "1") + self.assertEqual(self.mock_dev.interv, "0200") + self.assertEqual(self.mock_dev.ref_method, "0") + self.assertEqual(self.mock_dev.internal, "1") + self.assertEqual(self.mock_dev.unit, "0") + self.assertEqual(self.mock_dev.lo_bound, "10") + self.assertEqual(self.mock_dev.hi_bound, "600") + + self.assertEqual(self.mock_dev.hypo_limit, "070") + self.assertEqual(self.mock_dev.overall_low, "070") + self.assertEqual(self.mock_dev.pre_food_low, "070") + self.assertEqual(self.mock_dev.post_food_low, "070") + self.assertEqual(self.mock_dev.overall_high, "180") + self.assertEqual(self.mock_dev.pre_food_high, "130") + self.assertEqual(self.mock_dev.post_food_high, "150") + self.assertEqual(self.mock_dev.hyper_limit, "250") + + self.assertEqual(self.mock_dev.upp_hyper, "360") + self.assertEqual(self.mock_dev.low_hyper, "126") + self.assertEqual(self.mock_dev.upp_hypo, "090") + self.assertEqual(self.mock_dev.low_hypo, "050") + self.assertEqual(self.mock_dev.upp_low_target, "099") + self.assertEqual(self.mock_dev.low_low_target, "050") + self.assertEqual(self.mock_dev.upp_hi_target, "300") + self.assertEqual(self.mock_dev.low_hi_target, "089") + self.assertEqual(self.mock_dev.trends, "1") + self.assertEqual(self.mock_dev.total, "1714") + self.assertEqual(self.mock_dev.spec_ver, "1") + + self.assertEqual(self.mock_dev.datetime, "201909221304") + + #TO-DO checksum and checkframe unit tests + + def test_parse_result_record(self): + #first decode the header record frame + result_record = "R|8|^^^Glucose|133|mg/dL^P||B/X||201202052034" + result_dict = contourusb.ContourHidDevice.parse_result_record(self.mock_dev, result_record) + + self.assertEqual(result_dict['record_type'], 'R') + self.assertEqual(result_dict['seq_num'], '8') + self.assertEqual(result_dict['test_id'], 'Glucose') + self.assertEqual(result_dict['value'], '133') + self.assertEqual(result_dict['unit'], 'mg/dL') + self.assertEqual(result_dict['ref_method'], 'P') + self.assertEqual(result_dict['markers'], 'B/X') + self.assertEqual(result_dict['datetime'], '201202052034') \ No newline at end of file -- cgit v1.2.3