diff options
author | Ben <b-schaefer@posteo.de> | 2020-02-21 10:45:40 +0100 |
---|---|---|
committer | Diego Elio Pettenò <flameeyes@flameeyes.com> | 2020-03-08 00:36:39 +0100 |
commit | e72b02d84e7f67cdf6107862ad580e951a5bbda1 (patch) | |
tree | 0887513d2478f55b27abccfeb307f313231bd994 /glucometerutils/support/contourusb.py | |
parent | pre-commit guide in README (diff) | |
download | glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.gz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.bz2 glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.lz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.xz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.zst glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.zip |
Diffstat (limited to 'glucometerutils/support/contourusb.py')
-rw-r--r-- | glucometerutils/support/contourusb.py | 163 |
1 files changed, 85 insertions, 78 deletions
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py index b4db4eb..3b0dc80 100644 --- a/glucometerutils/support/contourusb.py +++ b/glucometerutils/support/contourusb.py @@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile( "(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})" "(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|" "(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|" - "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)") + "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)" +) _RESULT_RECORD_RE = re.compile( "^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" "^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^" "(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|" - "(?P<datetime>[0-9]+)") + "(?P<datetime>[0-9]+)" +) _RECORD_FORMAT = re.compile( - '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)' - '\x0d(?P<end>[\x03\x17]))' - '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a') + "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)" + "\x0d(?P<end>[\x03\x17]))" + "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a" +) + class FrameError(Exception): pass + class ContourHidDevice(hiddevice.HidDevice): """Base class implementing the ContourUSB HID common protocol. """ + blocksize = 64 # Operation modes @@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice): while True: data = self._read() dstr = data - result.append(dstr[4:data[3]+4]) - if data[3] != self.blocksize-4: + result.append(dstr[4 : data[3] + 4]) + if data[3] != self.blocksize - 4: break - return (b"".join(result)) + return b"".join(result) def write(self, data): - data = b'ABC' + chr(len(data)).encode() + data.encode() + data = b"ABC" + chr(len(data)).encode() + data.encode() pad_length = self.blocksize - len(data) - data += pad_length * b'\x00' + data += pad_length * b"\x00" self._write(data) - USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour + USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour USB_PRODUCT_ID = 0x6002 # type: int def parse_header_record(self, text): header = _HEADER_RECORD_RE.search(text) - self.field_del = header.group('field_del') - self.repeat_del = header.group('repeat_del') - self.component_del = header.group('component_del') - self.escape_del = header.group('escape_del') - - self.product_code = header.group('product_code') - self.dig_ver = header.group('dig_ver') - self.anlg_ver = header.group('anlg_ver') - self.agp_ver = header.group('agp_ver') - - self.serial_num = header.group('serial_num') - self.sku_id = header.group('sku_id') - self.res_marking = header.group('res_marking') - self.config_bits = header.group('config_bits') - self.lang = header.group('lang') - self.interv = header.group('interv') - self.ref_method = header.group('ref_method') - self.internal = header.group('internal') + self.field_del = header.group("field_del") + self.repeat_del = header.group("repeat_del") + self.component_del = header.group("component_del") + self.escape_del = header.group("escape_del") + + self.product_code = header.group("product_code") + self.dig_ver = header.group("dig_ver") + self.anlg_ver = header.group("anlg_ver") + self.agp_ver = header.group("agp_ver") + + self.serial_num = header.group("serial_num") + self.sku_id = header.group("sku_id") + self.res_marking = header.group("res_marking") + self.config_bits = header.group("config_bits") + self.lang = header.group("lang") + self.interv = header.group("interv") + self.ref_method = header.group("ref_method") + self.internal = header.group("internal") # U limit - self.unit = header.group('unit') - self.lo_bound = header.group('lo_bound') - self.hi_bound = header.group('hi_bound') + self.unit = header.group("unit") + self.lo_bound = header.group("lo_bound") + self.hi_bound = header.group("hi_bound") # X field - self.hypo_limit = header.group('hypo_limit') - self.overall_low = header.group('overall_low') - self.pre_food_low = header.group('pre_food_low') - self.post_food_low = header.group('post_food_low') - self.overall_high = header.group('overall_high') - self.pre_food_high = header.group('pre_food_high') - self.post_food_high = header.group('post_food_high') - self.hyper_limit = header.group('hyper_limit') + self.hypo_limit = header.group("hypo_limit") + self.overall_low = header.group("overall_low") + self.pre_food_low = header.group("pre_food_low") + self.post_food_low = header.group("post_food_low") + self.overall_high = header.group("overall_high") + self.pre_food_high = header.group("pre_food_high") + self.post_food_high = header.group("post_food_high") + self.hyper_limit = header.group("hyper_limit") # Y field - self.upp_hyper = header.group('upp_hyper') - self.low_hyper = header.group('low_hyper') - self.upp_hypo = header.group('upp_hypo') - self.low_hypo = header.group('low_hypo') - self.upp_low_target = header.group('upp_low_target') - self.low_low_target = header.group('low_low_target') - self.upp_hi_target = header.group('upp_hi_target') - self.low_hi_target = header.group('low_hi_target') + self.upp_hyper = header.group("upp_hyper") + self.low_hyper = header.group("low_hyper") + self.upp_hypo = header.group("upp_hypo") + self.low_hypo = header.group("low_hypo") + self.upp_low_target = header.group("upp_low_target") + self.low_low_target = header.group("low_low_target") + self.upp_hi_target = header.group("upp_hi_target") + self.low_hi_target = header.group("low_hi_target") # Z field - self.trends = header.group('trends') + self.trends = header.group("trends") - self.total = header.group('total') - self.spec_ver = header.group('spec_ver') + self.total = header.group("total") + self.spec_ver = header.group("spec_ver") # Datetime string in YYYYMMDDHHMM format - self.datetime = header.group('datetime') - + self.datetime = header.group("datetime") def checksum(self, text): """ Implemented by Anders Hammarquist for glucodump project More info: https://bitbucket.org/iko/glucodump/src/default/ """ - checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1] - return ('00' + checksum)[-2:] + checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1] + return ("00" + checksum)[-2:] def checkframe(self, frame): """ @@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice): if not match: raise FrameError("Couldn't parse frame", frame) - recno = int(match.group('recno')) + recno = int(match.group("recno")) if self.currecno is None: self.currecno = recno @@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice): return None if recno != self.currecno: - raise FrameError("Bad recno, got %r expected %r" % - (recno, self.currecno), - frame) - - checksum = self.checksum(match.group('check')) - if checksum != match.group('checksum'): - raise FrameError("Checksum error: got %s expected %s" % - (match.group('checksum'), checksum), - frame) + raise FrameError( + "Bad recno, got %r expected %r" % (recno, self.currecno), frame + ) + + checksum = self.checksum(match.group("check")) + if checksum != match.group("checksum"): + raise FrameError( + "Checksum error: got %s expected %s" + % (match.group("checksum"), checksum), + frame, + ) self.currecno = (self.currecno + 1) % 8 - return match.group('text') + return match.group("text") def connect(self): """Connecting the device, nothing to be done. @@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice): self.state = self.mode_establish try: while True: - self.write('\x04') + self.write("\x04") res = self.read() if res[0] == 4 and res[-1] == 5: # we are connected and just got a header header_record = res.decode() - stx = header_record.find('\x02') + stx = header_record.find("\x02") if stx != -1: - result = _RECORD_FORMAT.match( - header_record[stx:]).group('text') + result = _RECORD_FORMAT.match(header_record[stx:]).group("text") self.parse_header_record(result) break else: @@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice): int(datetime_str[6:8]), # day int(datetime_str[8:10]), # hour int(datetime_str[10:12]), # minute - 0) + 0, + ) def sync(self): """ @@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ self.state = self.mode_establish try: - tometer = '\x04' + tometer = "\x04" result = None foo = 0 while True: @@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice): continue if data_bytes[-1] == 5: # got an <ENQ>, send <ACK> - tometer = '\x06' + tometer = "\x06" self.currecno = None continue if self.state == self.mode_data: @@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice): # got an <EOT>, done self.state = self.mode_precommand break - stx = data.find('\x02') + stx = data.find("\x02") if stx != -1: # got <STX>, parse frame try: result = self.checkframe(data[stx:]) - tometer = '\x06' + tometer = "\x06" self.state = self.mode_data except FrameError as e: - tometer = '\x15' # Couldn't parse, <NAK> + tometer = "\x15" # Couldn't parse, <NAK> else: # Got something we don't understand, <NAK> it - tometer = '\x15' + tometer = "\x15" except Exception as e: raise e @@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice): """ records_arr = [] for rec in self.sync(): - if rec[0] == 'R': + if rec[0] == "R": # parse using result record regular expression rec_text = self.parse_result_record(rec) # get dictionary to use in main driver module without import re |