diff options
author | Ben <b-schaefer@posteo.de> | 2020-02-21 10:45:40 +0100 |
---|---|---|
committer | Diego Elio Pettenò <flameeyes@flameeyes.com> | 2020-03-08 00:36:39 +0100 |
commit | e72b02d84e7f67cdf6107862ad580e951a5bbda1 (patch) | |
tree | 0887513d2478f55b27abccfeb307f313231bd994 /glucometerutils/drivers/otultraeasy.py | |
parent | pre-commit guide in README (diff) | |
download | glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.gz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.bz2 glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.lz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.xz glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.tar.zst glucometerutils-e72b02d84e7f67cdf6107862ad580e951a5bbda1.zip |
Diffstat (limited to 'glucometerutils/drivers/otultraeasy.py')
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 149 |
1 files changed, 79 insertions, 70 deletions
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 7f4934e..0d1e7a9 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -22,87 +22,97 @@ import logging import construct from glucometerutils import common -from glucometerutils.support import construct_extras, driver_base, lifescan, lifescan_binary_protocol, serial +from glucometerutils.support import ( + construct_extras, + driver_base, + lifescan, + lifescan_binary_protocol, + serial, +) _PACKET = lifescan_binary_protocol.LifeScanPacket(True) _INVALID_RECORD = 501 -_COMMAND_SUCCESS = construct.Const(b'\x05\x06') +_COMMAND_SUCCESS = construct.Const(b"\x05\x06") -_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') +_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02") _VERSION_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'version' / construct.PascalString(construct.Byte, encoding='ascii'), + "version" / construct.PascalString(construct.Byte, encoding="ascii"), ) _SERIAL_NUMBER_REQUEST = construct.Const( - b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00') + b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" +) _SERIAL_NUMBER_RESPONSE = construct.Struct( - _COMMAND_SUCCESS, - 'serial_number' / construct.GreedyString(encoding='ascii'), + _COMMAND_SUCCESS, "serial_number" / construct.GreedyString(encoding="ascii"), ) _DATETIME_REQUEST = construct.Struct( - construct.Const(b'\x05\x20'), # 0x20 is the datetime - 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), - 'timestamp' / construct.Default( + construct.Const(b"\x05\x20"), # 0x20 is the datetime + "request_type" / construct.Enum(construct.Byte, write=0x01, read=0x02), + "timestamp" + / construct.Default( construct_extras.Timestamp(construct.Int32ul), # type: ignore - datetime.datetime(1970, 1, 1, 0, 0)), + datetime.datetime(1970, 1, 1, 0, 0), + ), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore ) -_GLUCOSE_UNIT_REQUEST = construct.Const( - b'\x05\x09\x02\x09\x00\x00\x00\x00') +_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00") _GLUCOSE_UNIT_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT, + "unit" / lifescan_binary_protocol.GLUCOSE_UNIT, construct.Padding(3), ) -_MEMORY_ERASE_REQUEST = construct.Const(b'\x05\x1A') +_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A") _READING_COUNT_RESPONSE = construct.Struct( - construct.Const(b'\x0f'), - 'count' / construct.Int16ul, + construct.Const(b"\x0f"), "count" / construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( - construct.Const(b'\x05\x1f'), - 'record_id' / construct.Int16ul, + construct.Const(b"\x05\x1f"), "record_id" / construct.Int16ul, ) _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / construct_extras.Timestamp(construct.Int32ul), # type: ignore - 'value' / construct.Int32ul, + "timestamp" / construct_extras.Timestamp(construct.Int32ul), # type: ignore + "value" / construct.Int32ul, ) -def _make_packet( - message, sequence_number, expect_receive, acknowledge, disconnect): + +def _make_packet(message, sequence_number, expect_receive, acknowledge, disconnect): return _PACKET.build( - {'data': {'value': { - 'message': message, - 'link_control': { - 'sequence_number': sequence_number, - 'expect_receive': expect_receive, - 'acknowledge': acknowledge, - 'disconnect': disconnect, - }, - }}}) + { + "data": { + "value": { + "message": message, + "link_control": { + "sequence_number": sequence_number, + "expect_receive": expect_receive, + "acknowledge": acknowledge, + "disconnect": disconnect, + }, + } + } + } + ) class Device(serial.SerialDevice, driver_base.GlucometerDriver): BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 def __init__(self, device): @@ -110,12 +120,11 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): self.sent_counter_ = False self.expect_receive_ = False - self.buffered_reader_ = construct.Rebuffered( - _PACKET, tailcutoff=1024) + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def connect(self): try: - self._send_packet(b'', disconnect=True) + self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) @@ -125,33 +134,32 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def _send_packet(self, message, acknowledge=False, disconnect=False): pkt = _make_packet( - message, - self.sent_counter_, - self.expect_receive_, - acknowledge, - disconnect) - logging.debug('sending packet: %s', binascii.hexlify(pkt)) + message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect + ) + logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self): raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data - logging.debug('received packet: %r', raw_pkt) + logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value if not pkt.link_control.disconnect and ( - pkt.link_control.sequence_number != self.expect_receive_): + pkt.link_control.sequence_number != self.expect_receive_ + ): raise lifescan.MalformedCommand( - 'at position 2[0b] expected %02x, received %02x' % ( - self.expect_receive_, pkt.link_connect.sequence_count)) + "at position 2[0b] expected %02x, received %02x" + % (self.expect_receive_, pkt.link_connect.sequence_count) + ) return pkt def _send_ack(self): - self._send_packet(b'', acknowledge=True, disconnect=False) + self._send_packet(b"", acknowledge=True, disconnect=False) def _read_ack(self): pkt = self._read_packet() @@ -177,62 +185,63 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver): def get_meter_info(self): return common.MeterInfo( - 'OneTouch Ultra Easy glucometer', + "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + version_info=("Software version: " + self.get_version(),), + native_unit=self.get_glucose_unit(), + ) def get_version(self): - response = self._send_request( - _VERSION_REQUEST, None, _VERSION_RESPONSE) + response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self): response = self._send_request( - _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE) + _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE + ) return response.serial_number def get_datetime(self): response = self._send_request( - _DATETIME_REQUEST, {'request_type': 'read'}, - _DATETIME_RESPONSE) + _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE + ) return response.timestamp def _set_device_datetime(self, date): response = self._send_request( - _DATETIME_REQUEST, { - 'request_type': 'write', - 'timestamp': date, - }, _DATETIME_RESPONSE) + _DATETIME_REQUEST, + {"request_type": "write", "timestamp": date,}, + _DATETIME_RESPONSE, + ) return response.timestamp def zero_log(self): - self._send_request( - _MEMORY_ERASE_REQUEST, None, - _COMMAND_SUCCESS) + self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self): response = self._send_request( - _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE) + _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE + ) return response.unit def _get_reading_count(self): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}, - _READING_COUNT_RESPONSE) + _READ_RECORD_REQUEST, + {"record_id": _INVALID_RECORD}, + _READING_COUNT_RESPONSE, + ) return response.count def _get_reading(self, record_id): response = self._send_request( - _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE) + _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE + ) - return common.GlucoseReading( - response.timestamp, float(response.value)) + return common.GlucoseReading(response.timestamp, float(response.value)) def get_readings(self): record_count = self._get_reading_count() |