diff options
Diffstat (limited to 'glucometerutils/drivers/sdcodefree.py')
-rw-r--r-- | glucometerutils/drivers/sdcodefree.py | 108 |
1 files changed, 55 insertions, 53 deletions
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py index a6e2ce5..47dd9ca 100644 --- a/glucometerutils/drivers/sdcodefree.py +++ b/glucometerutils/drivers/sdcodefree.py @@ -24,46 +24,44 @@ import operator import construct -from glucometerutils import common -from glucometerutils import exceptions -from glucometerutils.support import serial, driver_base +from glucometerutils import common, exceptions +from glucometerutils.support import driver_base, serial def xor_checksum(msg): return functools.reduce(operator.xor, msg) + class Direction(enum.Enum): In = 0x20 Out = 0x10 + _PACKET = construct.Struct( - 'stx' / construct.Const(0x53, construct.Byte), - 'direction' / construct.Mapping( - construct.Byte, - {e: e.value for e in Direction}), - 'length' / construct.Rebuild( - construct.Byte, lambda this: len(this.message) + 2), - 'message' / construct.Bytes(lambda this: this.length - 2), - 'checksum' / construct.Checksum( - construct.Byte, xor_checksum, construct.this.message), - 'etx' / construct.Const(0xAA, construct.Byte) + "stx" / construct.Const(0x53, construct.Byte), + "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}), + "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2), + "message" / construct.Bytes(lambda this: this.length - 2), + "checksum" + / construct.Checksum(construct.Byte, xor_checksum, construct.this.message), + "etx" / construct.Const(0xAA, construct.Byte), ) _FIRST_MESSAGE = construct.Struct( construct.Const(0x30, construct.Byte), - 'count' / construct.Int16ub, + "count" / construct.Int16ub, construct.Const(0xAA, construct.Byte)[19], ) -_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA' -_RESPONSE_MESSAGE = b'\x10\x40' +_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA" +_RESPONSE_MESSAGE = b"\x10\x40" -_DATE_SET_MESSAGE = b'\x10\x10' +_DATE_SET_MESSAGE = b"\x10\x10" -_DISCONNECT_MESSAGE = b'\x10\x60' -_DISCONNECTED_MESSAGE = b'\x10\x70' +_DISCONNECT_MESSAGE = b"\x10\x60" +_DISCONNECTED_MESSAGE = b"\x10\x70" -_FETCH_MESSAGE = b'\x10\x60' +_FETCH_MESSAGE = b"\x10\x60" _MEAL_FLAG = { common.Meal.NONE: 0x00, @@ -73,67 +71,64 @@ _MEAL_FLAG = { _READING = construct.Struct( construct.Byte[2], - 'year' / construct.Byte, - 'month' / construct.Byte, - 'day' / construct.Byte, - 'hour' / construct.Byte, - 'minute' / construct.Byte, - 'value' / construct.Int16ub, - 'meal' / construct.Mapping( - construct.Byte, _MEAL_FLAG), + "year" / construct.Byte, + "month" / construct.Byte, + "day" / construct.Byte, + "hour" / construct.Byte, + "minute" / construct.Byte, + "value" / construct.Int16ub, + "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG), construct.Byte[7], ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 38400 - DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable. + DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. TIMEOUT = 300 # We need to wait for data from the device. def read_message(self): pkt = _PACKET.parse_stream(self.serial_) - logging.debug('received packet: %r', pkt) + logging.debug("received packet: %r", pkt) return pkt.message def wait_and_ready(self): - challenge = b'\0' - while challenge == b'\0': + challenge = b"\0" + while challenge == b"\0": challenge = self.serial_.read(1) # The first packet read may have a prefixed zero, it might be a bug # in the cp210x driver or device, but discard it if found. - if challenge == b'\0': - logging.debug('spurious null byte received') + if challenge == b"\0": + logging.debug("spurious null byte received") continue - if challenge != b'\x53': + if challenge != b"\x53": raise exceptions.ConnectionFailed( - message='Unexpected starting bytes %r' % challenge) + message="Unexpected starting bytes %r" % challenge + ) challenge += self.serial_.read(6) if challenge != _CHALLENGE_PACKET_FULL: raise exceptions.ConnectionFailed( - message='Unexpected challenge %r' % challenge) + message="Unexpected challenge %r" % challenge + ) - logging.debug( - 'challenge packet received: %s', binascii.hexlify(challenge)) + logging.debug("challenge packet received: %s", binascii.hexlify(challenge)) self.send_message(_RESPONSE_MESSAGE) # The first packet only contains the counter of how many readings are # available. first_message = _FIRST_MESSAGE.parse(self.read_message()) - logging.debug('received first message: %r', first_message) + logging.debug("received first message: %r", first_message) return first_message.count def send_message(self, message): - pkt = _PACKET.build({ - 'message': message, - 'direction': Direction.Out - }) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + pkt = _PACKET.build({"message": message, "direction": Direction.Out}) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) def connect(self): # pylint: disable=no-self-use @@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) def get_meter_info(self): # pylint: disable=no-self-use - return common.MeterInfo('SD CodeFree glucometer') + return common.MeterInfo("SD CodeFree glucometer") def get_version(self): # pylint: disable=no-self-use raise NotImplementedError @@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise NotImplementedError def _set_device_datetime(self, date): - setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii') + setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii") # Ignore the readings count. self.wait_and_ready() @@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): raise exceptions.InvalidResponse(response=response) # The date we return should only include up to minute, unfortunately. - return datetime.datetime(date.year, date.month, date.day, - date.hour, date.minute) + return datetime.datetime( + date.year, date.month, date.day, date.hour, date.minute + ) def zero_log(self): raise NotImplementedError @@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): message = self.read_message() reading = _READING.parse(message) - logging.debug('received reading: %r', reading) + logging.debug("received reading: %r", reading) yield common.GlucoseReading( datetime.datetime( - 2000 + reading.year, reading.month, - reading.day, reading.hour, reading.minute), - reading.value, meal=reading.meal) + 2000 + reading.year, + reading.month, + reading.day, + reading.hour, + reading.minute, + ), + reading.value, + meal=reading.meal, + ) |