summaryrefslogtreecommitdiffstats
path: root/glucometerutils/support/contourusb.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/support/contourusb.py')
-rw-r--r--glucometerutils/support/contourusb.py163
1 files changed, 85 insertions, 78 deletions
diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py
index b4db4eb..3b0dc80 100644
--- a/glucometerutils/support/contourusb.py
+++ b/glucometerutils/support/contourusb.py
@@ -43,25 +43,31 @@ _HEADER_RECORD_RE = re.compile(
"(?P<low_low_target>[0-9]{3})(?P<upp_hi_target>[0-9]{3})"
"(?P<low_hi_target>[0-9]{3})\\^Z=(?P<trends>[0-2])\\|"
"(?P<total>[0-9]*)\\|\\|\\|\\|\\|\\|"
- "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)")
+ "(?P<spec_ver>[0-9]+)\\|(?P<datetime>[0-9]+)"
+)
_RESULT_RECORD_RE = re.compile(
"^(?P<record_type>[a-zA-Z])\\|(?P<seq_num>[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\"
"^(?P<test_id>\\w+)\\|(?P<value>[0-9]+)\\|(?P<unit>\\w+\\/\\w+)\\^"
"(?P<ref_method>[BPD])\\|\\|(?P<markers>[><BADISXCZ\\/1-12]*)\\|\\|"
- "(?P<datetime>[0-9]+)")
+ "(?P<datetime>[0-9]+)"
+)
_RECORD_FORMAT = re.compile(
- '\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)'
- '\x0d(?P<end>[\x03\x17]))'
- '(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a')
+ "\x02(?P<check>(?P<recno>[0-7])(?P<text>[^\x0d]*)"
+ "\x0d(?P<end>[\x03\x17]))"
+ "(?P<checksum>[0-9A-F][0-9A-F])\x0d\x0a"
+)
+
class FrameError(Exception):
pass
+
class ContourHidDevice(hiddevice.HidDevice):
"""Base class implementing the ContourUSB HID common protocol.
"""
+
blocksize = 64
# Operation modes
@@ -77,85 +83,84 @@ class ContourHidDevice(hiddevice.HidDevice):
while True:
data = self._read()
dstr = data
- result.append(dstr[4:data[3]+4])
- if data[3] != self.blocksize-4:
+ result.append(dstr[4 : data[3] + 4])
+ if data[3] != self.blocksize - 4:
break
- return (b"".join(result))
+ return b"".join(result)
def write(self, data):
- data = b'ABC' + chr(len(data)).encode() + data.encode()
+ data = b"ABC" + chr(len(data)).encode() + data.encode()
pad_length = self.blocksize - len(data)
- data += pad_length * b'\x00'
+ data += pad_length * b"\x00"
self._write(data)
- USB_VENDOR_ID = 0x1a79 # type: int # Bayer Health Care LLC Contour
+ USB_VENDOR_ID = 0x1A79 # type: int # Bayer Health Care LLC Contour
USB_PRODUCT_ID = 0x6002 # type: int
def parse_header_record(self, text):
header = _HEADER_RECORD_RE.search(text)
- self.field_del = header.group('field_del')
- self.repeat_del = header.group('repeat_del')
- self.component_del = header.group('component_del')
- self.escape_del = header.group('escape_del')
-
- self.product_code = header.group('product_code')
- self.dig_ver = header.group('dig_ver')
- self.anlg_ver = header.group('anlg_ver')
- self.agp_ver = header.group('agp_ver')
-
- self.serial_num = header.group('serial_num')
- self.sku_id = header.group('sku_id')
- self.res_marking = header.group('res_marking')
- self.config_bits = header.group('config_bits')
- self.lang = header.group('lang')
- self.interv = header.group('interv')
- self.ref_method = header.group('ref_method')
- self.internal = header.group('internal')
+ self.field_del = header.group("field_del")
+ self.repeat_del = header.group("repeat_del")
+ self.component_del = header.group("component_del")
+ self.escape_del = header.group("escape_del")
+
+ self.product_code = header.group("product_code")
+ self.dig_ver = header.group("dig_ver")
+ self.anlg_ver = header.group("anlg_ver")
+ self.agp_ver = header.group("agp_ver")
+
+ self.serial_num = header.group("serial_num")
+ self.sku_id = header.group("sku_id")
+ self.res_marking = header.group("res_marking")
+ self.config_bits = header.group("config_bits")
+ self.lang = header.group("lang")
+ self.interv = header.group("interv")
+ self.ref_method = header.group("ref_method")
+ self.internal = header.group("internal")
# U limit
- self.unit = header.group('unit')
- self.lo_bound = header.group('lo_bound')
- self.hi_bound = header.group('hi_bound')
+ self.unit = header.group("unit")
+ self.lo_bound = header.group("lo_bound")
+ self.hi_bound = header.group("hi_bound")
# X field
- self.hypo_limit = header.group('hypo_limit')
- self.overall_low = header.group('overall_low')
- self.pre_food_low = header.group('pre_food_low')
- self.post_food_low = header.group('post_food_low')
- self.overall_high = header.group('overall_high')
- self.pre_food_high = header.group('pre_food_high')
- self.post_food_high = header.group('post_food_high')
- self.hyper_limit = header.group('hyper_limit')
+ self.hypo_limit = header.group("hypo_limit")
+ self.overall_low = header.group("overall_low")
+ self.pre_food_low = header.group("pre_food_low")
+ self.post_food_low = header.group("post_food_low")
+ self.overall_high = header.group("overall_high")
+ self.pre_food_high = header.group("pre_food_high")
+ self.post_food_high = header.group("post_food_high")
+ self.hyper_limit = header.group("hyper_limit")
# Y field
- self.upp_hyper = header.group('upp_hyper')
- self.low_hyper = header.group('low_hyper')
- self.upp_hypo = header.group('upp_hypo')
- self.low_hypo = header.group('low_hypo')
- self.upp_low_target = header.group('upp_low_target')
- self.low_low_target = header.group('low_low_target')
- self.upp_hi_target = header.group('upp_hi_target')
- self.low_hi_target = header.group('low_hi_target')
+ self.upp_hyper = header.group("upp_hyper")
+ self.low_hyper = header.group("low_hyper")
+ self.upp_hypo = header.group("upp_hypo")
+ self.low_hypo = header.group("low_hypo")
+ self.upp_low_target = header.group("upp_low_target")
+ self.low_low_target = header.group("low_low_target")
+ self.upp_hi_target = header.group("upp_hi_target")
+ self.low_hi_target = header.group("low_hi_target")
# Z field
- self.trends = header.group('trends')
+ self.trends = header.group("trends")
- self.total = header.group('total')
- self.spec_ver = header.group('spec_ver')
+ self.total = header.group("total")
+ self.spec_ver = header.group("spec_ver")
# Datetime string in YYYYMMDDHHMM format
- self.datetime = header.group('datetime')
-
+ self.datetime = header.group("datetime")
def checksum(self, text):
"""
Implemented by Anders Hammarquist for glucodump project
More info: https://bitbucket.org/iko/glucodump/src/default/
"""
- checksum = hex(sum(ord(c) for c in text) % 256).upper().split('X')[1]
- return ('00' + checksum)[-2:]
+ checksum = hex(sum(ord(c) for c in text) % 256).upper().split("X")[1]
+ return ("00" + checksum)[-2:]
def checkframe(self, frame):
"""
@@ -166,7 +171,7 @@ class ContourHidDevice(hiddevice.HidDevice):
if not match:
raise FrameError("Couldn't parse frame", frame)
- recno = int(match.group('recno'))
+ recno = int(match.group("recno"))
if self.currecno is None:
self.currecno = recno
@@ -174,18 +179,20 @@ class ContourHidDevice(hiddevice.HidDevice):
return None
if recno != self.currecno:
- raise FrameError("Bad recno, got %r expected %r" %
- (recno, self.currecno),
- frame)
-
- checksum = self.checksum(match.group('check'))
- if checksum != match.group('checksum'):
- raise FrameError("Checksum error: got %s expected %s" %
- (match.group('checksum'), checksum),
- frame)
+ raise FrameError(
+ "Bad recno, got %r expected %r" % (recno, self.currecno), frame
+ )
+
+ checksum = self.checksum(match.group("check"))
+ if checksum != match.group("checksum"):
+ raise FrameError(
+ "Checksum error: got %s expected %s"
+ % (match.group("checksum"), checksum),
+ frame,
+ )
self.currecno = (self.currecno + 1) % 8
- return match.group('text')
+ return match.group("text")
def connect(self):
"""Connecting the device, nothing to be done.
@@ -198,15 +205,14 @@ class ContourHidDevice(hiddevice.HidDevice):
self.state = self.mode_establish
try:
while True:
- self.write('\x04')
+ self.write("\x04")
res = self.read()
if res[0] == 4 and res[-1] == 5:
# we are connected and just got a header
header_record = res.decode()
- stx = header_record.find('\x02')
+ stx = header_record.find("\x02")
if stx != -1:
- result = _RECORD_FORMAT.match(
- header_record[stx:]).group('text')
+ result = _RECORD_FORMAT.match(header_record[stx:]).group("text")
self.parse_header_record(result)
break
else:
@@ -251,7 +257,8 @@ class ContourHidDevice(hiddevice.HidDevice):
int(datetime_str[6:8]), # day
int(datetime_str[8:10]), # hour
int(datetime_str[10:12]), # minute
- 0)
+ 0,
+ )
def sync(self):
"""
@@ -261,7 +268,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
self.state = self.mode_establish
try:
- tometer = '\x04'
+ tometer = "\x04"
result = None
foo = 0
while True:
@@ -281,7 +288,7 @@ class ContourHidDevice(hiddevice.HidDevice):
continue
if data_bytes[-1] == 5:
# got an <ENQ>, send <ACK>
- tometer = '\x06'
+ tometer = "\x06"
self.currecno = None
continue
if self.state == self.mode_data:
@@ -289,18 +296,18 @@ class ContourHidDevice(hiddevice.HidDevice):
# got an <EOT>, done
self.state = self.mode_precommand
break
- stx = data.find('\x02')
+ stx = data.find("\x02")
if stx != -1:
# got <STX>, parse frame
try:
result = self.checkframe(data[stx:])
- tometer = '\x06'
+ tometer = "\x06"
self.state = self.mode_data
except FrameError as e:
- tometer = '\x15' # Couldn't parse, <NAK>
+ tometer = "\x15" # Couldn't parse, <NAK>
else:
# Got something we don't understand, <NAK> it
- tometer = '\x15'
+ tometer = "\x15"
except Exception as e:
raise e
@@ -321,7 +328,7 @@ class ContourHidDevice(hiddevice.HidDevice):
"""
records_arr = []
for rec in self.sync():
- if rec[0] == 'R':
+ if rec[0] == "R":
# parse using result record regular expression
rec_text = self.parse_result_record(rec)
# get dictionary to use in main driver module without import re