summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArvanitis Christos <arvchristos@gmail.com>2019-09-25 00:06:16 +0200
committerDiego Elio Pettenò <flameeyes@flameeyes.com>2019-09-25 00:06:16 +0200
commit48e89e53d9983312e36ae6c353da9b94fa46b590 (patch)
tree3e09fd179592df54faedf746be0f8329b95513df
parentRemove try/except around typing imports. (diff)
downloadglucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar.gz
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar.bz2
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar.lz
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar.xz
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.tar.zst
glucometerutils-48e89e53d9983312e36ae6c353da9b94fa46b590.zip
-rw-r--r--AUTHORS4
-rw-r--r--README1
-rw-r--r--glucometerutils/drivers/contourusb.py81
-rw-r--r--glucometerutils/support/contourusb.py328
-rw-r--r--setup.py1
-rw-r--r--test/test_contourusb.py117
6 files changed, 531 insertions, 1 deletions
diff --git a/AUTHORS b/AUTHORS
index dc6803c..315cf65 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -1,11 +1,13 @@
-# This is the list of usbmon-tools authors for copyright purposes.
+# This is the list of glucometerutils authors for copyright purposes.
#
# This does not necessarily list everyone who has contributed code, since in
# some cases, their employer may be the copyright holder. To see the full list
# of contributors, see the revision history in source control.
+Anders Hammarquist
Andreas Sandberg
André Caldas
Arkadiusz Bulski
+Christos Arvanitis
Diego Elio Pettenò
Dorian Scholz
Jim Sifferle
diff --git a/README b/README
index c623408..79d006e 100644
--- a/README
+++ b/README
@@ -53,6 +53,7 @@ supported.
| GlucoRx | Nexus | `td4277` | [construct] [pyserial]² [hidapi] |
| Menarini | GlucoMen Nexus | `td4277` | [construct] [pyserial]² [hidapi] |
| Aktivmed | GlucoCheck XL | `td4277` | [construct] [pyserial]² [hidapi] |
+| Ascensia | ContourUSB | `contourusb` | [construct] [hidapi]‡ |
† Untested.
diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py
new file mode 100644
index 0000000..6c333fb
--- /dev/null
+++ b/glucometerutils/drivers/contourusb.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+#
+# SPDX-License-Identifier: MIT
+"""Driver for ContourUSB devices.
+
+Supported features:
+ - get readings (blood glucose), including comments;
+ - get date and time;
+ - get serial number and software version;
+ - get device info (e.g. unit)
+
+Expected device path: /dev/hidraw4 or similar HID device. Optional when using
+HIDAPI.
+
+Further information on the device protocol can be found at
+
+http://protocols.ascensia.com/Programming-Guide.aspx
+
+"""
+
+import datetime
+
+from glucometerutils import common
+from glucometerutils.support import contourusb
+
+def _extract_timestamp(parsed_record, prefix=''):
+ """Extract the timestamp from a parsed record.
+
+ This leverages the fact that all the reading records have the same base structure.
+ """
+ datetime_str = parsed_record['datetime']
+
+ return datetime.datetime(
+ int(datetime_str[0:4]), #year
+ int(datetime_str[4:6]), #month
+ int(datetime_str[6:8]), #day
+ int(datetime_str[8:10]), #hour
+ int(datetime_str[10:12]), #minute
+ 0)
+
+
+class Device(contourusb.ContourHidDevice):
+ """Glucometer driver for FreeStyle Libre devices."""
+
+ USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_PRODUCT_ID = 0x6002 # type: int
+
+
+ def get_meter_info(self):
+ """Return the device information in structured form."""
+ self._get_info_record()
+ return common.MeterInfo(
+ 'Contour USB',
+ serial_number=self._get_serial_number(),
+ version_info=(
+ 'Meter versions: ' + self._get_version(),),
+ native_unit= self.get_glucose_unit())
+
+ def get_glucose_unit(self): # pylint: disable=no-self-use
+ """Returns the glucose unit of the device."""
+
+ if self._get_glucose_unit() == '0':
+ return common.Unit.MG_DL
+ else:
+ return common.Unit.MMOL_L
+
+
+ def get_readings(self):
+ """
+ Get reading dump from download data mode(all readings stored)
+ This meter supports only blood samples
+ """
+ for parsed_record in self._get_multirecord():
+ yield common.GlucoseReading(
+ _extract_timestamp(parsed_record),
+ int(parsed_record['value']),
+ comment=parsed_record['markers'],
+ measure_method=common.MeasurementMethod.BLOOD_SAMPLE
+ )
+
+
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
new file mode 100644
index 0000000..d3bd6dc
--- /dev/null
+++ b/glucometerutils/support/contourusb.py
@@ -0,0 +1,328 @@
+# -*- coding: utf-8 -*-
+#
+# SPDX-License-Identifier: MIT
+"""Common routines to implement the ContourUSB common protocol.
+
+Protocol documentation available from Ascensia at
+http://protocols.ascensia.com/Programming-Guide.aspx
+
+* glucodump code segments are developed by Anders Hammarquist
+* Rest of code is developed by Arvanitis Christos
+
+"""
+
+import csv
+import datetime
+import logging
+import re
+import construct
+
+from glucometerutils import exceptions
+from glucometerutils.exceptions import InvalidResponse
+from glucometerutils.support import hiddevice
+
+# regexr.com/4k6jb
+_HEADER_RECORD_RE = re.compile(
+ "^(?P<record_type>[a-zA-Z])\\|(?P<field_del>.)(?P<repeat_del>.)"
+ "(?P<component_del>.)(?P<escape_del>.)\\|\\w*\\|(?P<product_code>\\w+)"
+ "\\^(?P<dig_ver>[0-9]{2}\\.[0-9]{2})\\\\(?P<anlg_ver>[0-9]{2}\\.[0-9]{2})"
+ "\\\\(?P<agp_ver>[0-9]{2}\\.[0-9]{2}\\.[0-9]{2})\\"
+ "^(?P<serial_num>(\\w|-)+)\\^(?P<sku_id>(\\w|-)+)\\|"
+ "A=(?P<res_marking>[0-9])\\^C=(?P<config_bits>[0-9]+)\\"
+ "^G=(?P<lang>[0-9]+)\\^I=(?P<interv>[0-9]+)\\^R=(?P<ref_method>[0-9]+)\\"
+ "^S=(?P<internal>[0-9]+)\\^U=(?P<unit>[0-9]+)\\"
+ "^V=(?P<lo_bound>[0-9]{2})(?P<hi_bound>[0-9]{3})\\"
+ "^X=(?P<hypo_limit>[0-9]{3})(?P<overall_low>[0-9]{3})"
+ "(?P<pre_food_low>[0-9]{3})(?P<post_food_low>[0-9]{3})"
+ "(?P<overall_high>[0-9]{3})(?P<pre_food_high>[0-9]{3})"
+ "(?P<post_food_high>[0-9]{3})(?P<hyper_limit>[0-9]{3})\\"
+ "^Y=(?P<upp_hyper>[0-9]{3})(?P<low_hyper>[0-9]{3})"
+ "(?P<upp_hypo>[0-9]{3})(?P<low_hypo>[0-9]{3})(?P<upp_low_target>[0-9]{3})"
+ "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
+ "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
+ "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
+ "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)")
+
+_RESULT_RECORD_RE = re.compile(
+ "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
+ "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
+ "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
+ "(?P<datetime>[0-9]+)")
+
+_RECORD_FORMAT = re.compile(
+ '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)'
+ '\x0d(?P<end>[\x03\x17]))'
+ '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a')
+
+class FrameError(Exception):
+ pass
+
+class ContourHidDevice(hiddevice.HidDevice):
+ """Base class implementing the ContourUSB HID common protocol.
+ """
+ blocksize = 64
+
+ # Operation modes
+ mode_establish = object
+ mode_data = object()
+ mode_precommand = object()
+ mode_command = object()
+ state = None
+
+ def read(self, r_size=blocksize):
+ result = []
+
+ while True:
+ data = self._read()
+ dstr = data
+ result.append(dstr[4:data[3]+4])
+ if data[3] != self.blocksize-4:
+ break
+
+ return (b"".join(result))
+
+ def write(self, data):
+ data = b'ABC' + chr(len(data)).encode() + data.encode()
+ pad_length = self.blocksize - len(data)
+ data += pad_length * b'\x00'
+
+ self._write(data)
+
+ USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_PRODUCT_ID = 0x6002 # type: int
+
+ def parse_header_record(self, text):
+ header = _HEADER_RECORD_RE.search(text)
+
+ self.field_del = header.group('field_del')
+ self.repeat_del = header.group('repeat_del')
+ self.component_del = header.group('component_del')
+ self.escape_del = header.group('escape_del')
+
+ self.product_code = header.group('product_code')
+ self.dig_ver = header.group('dig_ver')
+ self.anlg_ver = header.group('anlg_ver')
+ self.agp_ver = header.group('agp_ver')
+
+ self.serial_num = header.group('serial_num')
+ self.sku_id = header.group('sku_id')
+ self.res_marking = header.group('res_marking')
+ self.config_bits = header.group('config_bits')
+ self.lang = header.group('lang')
+ self.interv = header.group('interv')
+ self.ref_method = header.group('ref_method')
+ self.internal = header.group('internal')
+
+ # U limit
+ self.unit = header.group('unit')
+ self.lo_bound = header.group('lo_bound')
+ self.hi_bound = header.group('hi_bound')
+
+ # X field
+ self.hypo_limit = header.group('hypo_limit')
+ self.overall_low = header.group('overall_low')
+ self.pre_food_low = header.group('pre_food_low')
+ self.post_food_low = header.group('post_food_low')
+ self.overall_high = header.group('overall_high')
+ self.pre_food_high = header.group('pre_food_high')
+ self.post_food_high = header.group('post_food_high')
+ self.hyper_limit = header.group('hyper_limit')
+
+ # Y field
+ self.upp_hyper = header.group('upp_hyper')
+ self.low_hyper = header.group('low_hyper')
+ self.upp_hypo = header.group('upp_hypo')
+ self.low_hypo = header.group('low_hypo')
+ self.upp_low_target = header.group('upp_low_target')
+ self.low_low_target = header.group('low_low_target')
+ self.upp_hi_target = header.group('upp_hi_target')
+ self.low_hi_target = header.group('low_hi_target')
+
+ # Z field
+ self.trends = header.group('trends')
+
+ self.total = header.group('total')
+ self.spec_ver = header.group('spec_ver')
+ # Datetime string in YYYYMMDDHHMM format
+ self.datetime = header.group('datetime')
+
+
+ def checksum(self, text):
+ """
+ Implemented by Anders Hammarquist for glucodump project
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1]
+ return ('00' + checksum)[-2:]
+
+ def checkframe(self, frame):
+ """
+ Implemented by Anders Hammarquist for glucodump project
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ match = _RECORD_FORMAT.match(frame)
+ if not match:
+ raise FrameError("Couldn't parse frame", frame)
+
+ recno = int(match.group('recno'))
+ if self.currecno is None:
+ self.currecno = recno
+
+ if recno + 1 == self.currecno:
+ return None
+
+ if recno != self.currecno:
+ raise FrameError("Bad recno, got %r expected %r" %
+ (recno, self.currecno),
+ frame)
+
+ checksum = self.checksum(match.group('check'))
+ if checksum != match.group('checksum'):
+ raise FrameError("Checksum error: got %s expected %s" %
+ (match.group('checksum'), checksum),
+ frame)
+
+ self.currecno = (self.currecno + 1) % 8
+ return match.group('text')
+
+ def connect(self):
+ """Connecting the device, nothing to be done.
+ All process is hadled by hiddevice
+ """
+ pass
+
+ def _get_info_record(self):
+ self.currecno = None
+ self.state = self.mode_establish
+ try:
+ while True:
+ self.write('\x04')
+ res = self.read()
+ if res[0] == 4 and res[-1] == 5:
+ # we are connected and just got a header
+ header_record = res.decode()
+ stx = header_record.find('\x02')
+ if stx != -1:
+ result = _RECORD_FORMAT.match(
+ header_record[stx:]).group('text')
+ self.parse_header_record(result)
+ break
+ else:
+ pass
+
+ except FrameError as e:
+ print("Frame error")
+ raise e
+
+ except Exception as e:
+ print("Uknown error occured")
+ raise e
+
+ def disconnect(self):
+ """Disconnect the device, nothing to be done."""
+ pass
+
+ # Some of the commands are also shared across devices that use this HID
+ # protocol, but not many. Only provide here those that do seep to change
+ # between them.
+ def _get_version(self):
+ # type: () -> Text
+ """Return the software version of the device."""
+ return self.dig_ver + " - " + self.anlg_ver + " - " + self.agp_ver
+
+ def _get_serial_number(self):
+ # type: () -> Text
+ """Returns the serial number of the device."""
+ return self.serial_num
+
+ def _get_glucose_unit(self):
+ # type: () -> Text
+ """Return 0 for mg/dL, 1 for mmol/L"""
+ return self.unit
+
+ def get_datetime(self):
+ # type: () -> datetime.datetime
+ datetime_str = self.datetime
+ return datetime.datetime(
+ int(datetime_str[0:4]), # year
+ int(datetime_str[4:6]), # month
+ int(datetime_str[6:8]), # day
+ int(datetime_str[8:10]), # hour
+ int(datetime_str[10:12]), # minute
+ 0)
+
+ def sync(self):
+ """
+ Sync with meter and yield received data frames
+ FSM implemented by Anders Hammarquist's for glucodump
+ More info: https://bitbucket.org/iko/glucodump/src/default/
+ """
+ self.state = self.mode_establish
+ try:
+ tometer = '\x04'
+ result = None
+ foo = 0
+ while True:
+ self.write(tometer)
+ if result is not None and self.state == self.mode_data:
+ yield result
+ result = None
+ data_bytes = self.read()
+ data = data_bytes.decode()
+
+ if self.state == self.mode_establish:
+ if data_bytes[-1] == 15:
+ # got a <NAK>, send <EOT>
+ tometer = chr(foo)
+ foo += 1
+ foo %= 256
+ continue
+ if data_bytes[-1] == 5:
+ # got an <ENQ>, send <ACK>
+ tometer = '\x06'
+ self.currecno = None
+ continue
+ if self.state == self.mode_data:
+ if data_bytes[-1] == 4:
+ # got an <EOT>, done
+ self.state = self.mode_precommand
+ break
+ stx = data.find('\x02')
+ if stx != -1:
+ # got <STX>, parse frame
+ try:
+ result = self.checkframe(data[stx:])
+ tometer = '\x06'
+ self.state = self.mode_data
+ except FrameError as e:
+ tometer = '\x15' # Couldn't parse, <NAK>
+ else:
+ # Got something we don't understand, <NAK> it
+ tometer = '\x15'
+ except Exception as e:
+ raise e
+
+ def parse_result_record(self, text):
+ # type : text -> dict
+ result = _RESULT_RECORD_RE.search(text)
+ rec_text = result.groupdict()
+ return rec_text
+
+ def _get_multirecord(self):
+ # type: (bytes) -> Iterator[List[Text]]
+ """Queries for, and returns, "multirecords" results.
+
+ Returns:
+ (csv.reader): a CSV reader object that returns a record for each line
+ in the record file.
+ """
+ records_arr = []
+ for rec in self.sync():
+ if rec[0] == 'R':
+ # parse using result record regular expression
+ rec_text = self.parse_result_record(rec)
+ # get dictionary to use in main driver module without import re
+
+ records_arr.append(rec_text)
+ # return csv.reader(records_arr)
+ return records_arr # array of groupdicts \ No newline at end of file
diff --git a/setup.py b/setup.py
index f4dee67..48dd85c 100644
--- a/setup.py
+++ b/setup.py
@@ -54,6 +54,7 @@ setup(
# These are all the drivers' dependencies. Optional dependencies are
# listed as mandatory for the feature.
'accucheck_reports': [],
+ 'contourusb': ['construct', 'hidapi'],
'fsinsulinx': ['construct', 'hidapi'],
'fslibre': ['construct', 'hidapi'],
'fsoptium': ['pyserial'],
diff --git a/test/test_contourusb.py b/test/test_contourusb.py
new file mode 100644
index 0000000..e2fb6cb
--- /dev/null
+++ b/test/test_contourusb.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# SPDX-License-Identifier: MIT
+"""Tests for the common ContourUSB functions.."""
+
+# pylint: disable=protected-access,missing-docstring
+
+from absl.testing import absltest
+
+from glucometerutils.support import contourusb
+
+from unittest.mock import Mock
+
+
+
+class TestContourUSB(absltest.TestCase):
+
+ header_record = b'\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05'
+
+ mock_dev = Mock()
+
+ def test_get_datetime(self):
+ import datetime
+
+ self.datetime = "201908071315" # returned by
+ self.assertEqual(
+ datetime.datetime(2019,8,7,13,15),
+ contourusb.ContourHidDevice.get_datetime(self)
+ )
+
+
+ def test_RECORD_FORMAT_match(self):
+ #first decode the header record frame
+ header_record_decoded = self.header_record.decode()
+ stx = header_record_decoded.find('\x02')
+
+ _RECORD_FORMAT = contourusb._RECORD_FORMAT
+ result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text')
+
+ self.assertEqual(
+ "H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304",
+ result
+ )
+
+ def test_parse_header_record(self):
+
+ _HEADER_RECORD_RE = contourusb._HEADER_RECORD_RE
+ _RECORD_FORMAT = contourusb._RECORD_FORMAT
+
+
+ header_record_decoded = self.header_record.decode()
+ stx = header_record_decoded.find('\x02')
+
+
+ result = _RECORD_FORMAT.match(header_record_decoded[stx:]).group('text')
+ contourusb.ContourHidDevice.parse_header_record(self.mock_dev,result)
+
+ self.assertEqual(self.mock_dev.field_del, "\\")
+ self.assertEqual(self.mock_dev.repeat_del, "^")
+ self.assertEqual(self.mock_dev.component_del, "&")
+ self.assertEqual(self.mock_dev.escape_del, "|")
+
+ self.assertEqual(self.mock_dev.product_code, "Bayer7390")
+
+ self.assertEqual(self.mock_dev.dig_ver, "01.24")
+ self.assertEqual(self.mock_dev.anlg_ver, "01.04")
+ self.assertEqual(self.mock_dev.agp_ver, "09.02.20")
+ self.assertEqual(self.mock_dev.serial_num, "7390-2336773")
+ self.assertEqual(self.mock_dev.sku_id, "7403-")
+ self.assertEqual(self.mock_dev.res_marking, "1")
+ self.assertEqual(self.mock_dev.config_bits, "63")
+ self.assertEqual(self.mock_dev.lang, "1")
+ self.assertEqual(self.mock_dev.interv, "0200")
+ self.assertEqual(self.mock_dev.ref_method, "0")
+ self.assertEqual(self.mock_dev.internal, "1")
+ self.assertEqual(self.mock_dev.unit, "0")
+ self.assertEqual(self.mock_dev.lo_bound, "10")
+ self.assertEqual(self.mock_dev.hi_bound, "600")
+
+ self.assertEqual(self.mock_dev.hypo_limit, "070")
+ self.assertEqual(self.mock_dev.overall_low, "070")
+ self.assertEqual(self.mock_dev.pre_food_low, "070")
+ self.assertEqual(self.mock_dev.post_food_low, "070")
+ self.assertEqual(self.mock_dev.overall_high, "180")
+ self.assertEqual(self.mock_dev.pre_food_high, "130")
+ self.assertEqual(self.mock_dev.post_food_high, "150")
+ self.assertEqual(self.mock_dev.hyper_limit, "250")
+
+ self.assertEqual(self.mock_dev.upp_hyper, "360")
+ self.assertEqual(self.mock_dev.low_hyper, "126")
+ self.assertEqual(self.mock_dev.upp_hypo, "090")
+ self.assertEqual(self.mock_dev.low_hypo, "050")
+ self.assertEqual(self.mock_dev.upp_low_target, "099")
+ self.assertEqual(self.mock_dev.low_low_target, "050")
+ self.assertEqual(self.mock_dev.upp_hi_target, "300")
+ self.assertEqual(self.mock_dev.low_hi_target, "089")
+ self.assertEqual(self.mock_dev.trends, "1")
+ self.assertEqual(self.mock_dev.total, "1714")
+ self.assertEqual(self.mock_dev.spec_ver, "1")
+
+ self.assertEqual(self.mock_dev.datetime, "201909221304")
+
+ #TO-DO checksum and checkframe unit tests
+
+ def test_parse_result_record(self):
+ #first decode the header record frame
+ result_record = "R|8|^^^Glucose|133|mg/dL^P||B/X||201202052034"
+ result_dict = contourusb.ContourHidDevice.parse_result_record(self.mock_dev, result_record)
+
+ self.assertEqual(result_dict['record_type'], 'R')
+ self.assertEqual(result_dict['seq_num'], '8')
+ self.assertEqual(result_dict['test_id'], 'Glucose')
+ self.assertEqual(result_dict['value'], '133')
+ self.assertEqual(result_dict['unit'], 'mg/dL')
+ self.assertEqual(result_dict['ref_method'], 'P')
+ self.assertEqual(result_dict['markers'], 'B/X')
+ self.assertEqual(result_dict['datetime'], '201202052034') \ No newline at end of file