From 0af0315dba3590ff5a975783cf8f7bc13460a5f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Mon, 1 Jan 2018 01:16:29 +0000 Subject: otultraeasy: rewrite using construct for parsing. This removes the wholly complicated _Packet() object and replace it with more readable construct. Unfortunately this appears to reduce performance because of the serial stream buffering, needed to calculate the checksum. It's unfortunate, but it at least avoids a significant amount of custom code. --- README | 4 +- glucometerutils/drivers/otultraeasy.py | 455 ++++++++++++++------------------- setup.py | 2 +- test/test_otultraeasy.py | 17 -- 4 files changed, 201 insertions(+), 277 deletions(-) diff --git a/README b/README index a5dcd32..947348c 100644 --- a/README +++ b/README @@ -35,8 +35,8 @@ supported. | Manufacturer | Model Name | Driver | Dependencies | | --- | --- | --- | --- | | LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] | -| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [pyserial] | -| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [pyserial] | +| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] | +| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] | | LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] | | LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] | | Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ | diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 0538dac..579c07c 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -18,305 +18,246 @@ __email__ = 'flameeyes@flameeyes.eu' __copyright__ = 'Copyright © 2014-2017, Diego Elio Pettenò' __license__ = 'MIT' -import array +import binascii import datetime import logging -import re -import struct -import time + +import construct from glucometerutils import common -from glucometerutils import exceptions from glucometerutils.support import lifescan from glucometerutils.support import serial -_STX = 0x02 -_ETX = 0x03 - -_IDX_STX = 0 -_IDX_LENGTH = 1 -_IDX_CONTROL = 2 -_IDX_DATA = 3 -_IDX_ETX = -3 -_IDX_CHECKSUM = -2 - -_BIT_SENT_COUNTER = 0x01 -_BIT_EXPECT_RECEIVE = 0x02 -_BIT_ACK = 0x04 -_BIT_DISCONNECT = 0x08 -_BIT_MORE = 0x10 - -_READ_SERIAL_NUMBER = b'\x05\x0B\x02\x00\x00\x00\x00\x84\x6A\xE8\x73\x00' -_READ_VERSION = b'\x05\x0D\x02' -_READ_GLUCOSE_UNIT = b'\x05\x09\x02\x09\x00\x00\x00\x00' -_DELETE_RECORDS = b'\x05\x1A' -_READ_DATETIME = b'\x05\x20\x02\x00\x00\x00\x00' -_WRITE_DATETIME = b'\x05\x20\x01' -_READ_RECORD = b'\x05\x1F' _INVALID_RECORD = 501 -_STRUCT_TIMESTAMP = struct.Struct(' 6: - self.cmd.extend(serial.read(self.length - 6)) - - self.cmd.extend(serial.read(3)) - - if self.cmd[_IDX_ETX] != _ETX: - raise lifescan.MalformedCommand( - 'at position %s expected %02x, received %02x' % ( - _IDX_ETX, _ETX, self.cmd[_IDX_ETX])) - - def build_command(self, cmd_bytes): - self.cmd.append(_STX) - self.cmd.append(6 + len(cmd_bytes)) - self.cmd.append(0x00) # link control - self.cmd.extend(cmd_bytes) - self.cmd.extend([_ETX, 0x00, 0x00]) - - @property - def length(self): - if not self.cmd: - return None - - return self.cmd[_IDX_LENGTH] - - def __is_in_control(self, bitmask): - if not self.cmd: - return None - - return bool(self.cmd[_IDX_CONTROL] & bitmask) - - def __set_in_control(self, bitmask, value): - if not self.cmd: - return None - - if value: - self.cmd[_IDX_CONTROL] |= bitmask - else: - self.cmd[_IDX_CONTROL] &= (~bitmask) & 0xFF - - return value - - @property - def sent_counter(self): - return self.__is_in_control(_BIT_SENT_COUNTER) - - @sent_counter.setter - def sent_counter(self, value): - self.__set_in_control(_BIT_SENT_COUNTER, value) - - @property - def expect_receive(self): - return self.__is_in_control(_BIT_EXPECT_RECEIVE) - - @expect_receive.setter - def expect_receive(self, value): - self.__set_in_control(_BIT_EXPECT_RECEIVE, value) - - @property - def checksum(self): - return lifescan.crc_ccitt(self.cmd[:_IDX_CHECKSUM].tobytes()) - - @property - def acknowledge(self): - return self.__is_in_control(_BIT_ACK) - - @acknowledge.setter - def acknowledge(self, value): - self.__set_in_control(_BIT_ACK, value) - - @property - def disconnect(self): - return self.__is_in_control(_BIT_DISCONNECT) - - @disconnect.setter - def disconnect(self, value): - self.__set_in_control(_BIT_DISCONNECT, value) - - @property - def more(self): - return self.__is_in_control(_BIT_MORE) - - @more.setter - def more(self, value): - self.__set_in_control(_BIT_MORE, value) - - def validate_checksum(self): - expected_checksum = self.checksum - received_checksum = self._STRUCT.unpack(self.cmd[_IDX_CHECKSUM:])[0] - if received_checksum != expected_checksum: - raise exceptions.InvalidChecksum(expected_checksum, received_checksum) - - def update_checksum(self): - self._STRUCT.pack_into(self.cmd, _IDX_CHECKSUM, self.checksum) - - def tobytes(self): - return self.cmd.tobytes() - - @property - def data(self): - return self.cmd[_IDX_DATA:_IDX_ETX] - +_EPOCH = datetime.datetime.utcfromtimestamp(0) + +def datetime_to_timestamp(date): + delta = date - _EPOCH + return int(delta.total_seconds()) + + +_PACKET = construct.Struct( + construct.RawCopy( + construct.Embedded( + construct.Struct( + construct.Const(b'\x02'), # stx + 'length' / construct.Rebuild( + construct.Byte, lambda ctx: len(ctx.message) + 6), + construct.EmbeddedBitStruct( + construct.Padding(3), + 'more' / construct.Default(construct.Flag, False), + 'disconnect' / construct.Flag, + 'acknowledge' / construct.Flag, + 'expect_receive' / construct.Flag, + 'sequence_number' / construct.Flag, + ), + 'message' / construct.Bytes(length=lambda ctx: ctx.length - 6), + construct.Const(b'\x03'), # etx + ), + ), + ), + 'checksum' / construct.Checksum( + construct.Int16ul, lifescan.crc_ccitt, construct.this.data), +) + +_COMMAND_SUCCESS = construct.Const(b'\x05\x06') +_TIMESTAMP_ADAPTER = construct.ExprAdapter( + construct.Int32ul, + encoder=lambda obj, ctx: datetime_to_timestamp(obj), + decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj)) + +_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') + +_VERSION_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'version' / construct.PascalString(construct.Byte, encoding='ascii'), +) + +_SERIAL_NUMBER_REQUEST = construct.Const( + b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00') + +_SERIAL_NUMBER_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'serial_number' / construct.GreedyString(encoding='ascii'), +) + +_DATETIME_REQUEST = construct.Struct( + construct.Const(b'\x05\x20'), # 0x20 is the datetime + 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), + 'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH), +) + +_DATETIME_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / _TIMESTAMP_ADAPTER, +) + +_GLUCOSE_UNIT_REQUEST = construct.Const( + b'\x05\x09\x02\x09\x00\x00\x00\x00') + +_GLUCOSE_MAPPING = { + common.Unit.MG_DL: 0x00, + common.Unit.MMOL_L: 0x01, +} + +_GLUCOSE_UNIT_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'unit' / construct.SymmetricMapping( + construct.Byte, _GLUCOSE_MAPPING), + construct.Padding(3), +) + +_ZERO_LOG_REQUEST = construct.Const(b'\x05\x1A') + +_READING_COUNT_RESPONSE = construct.Struct( + construct.Const(b'\x05\x0f'), + 'count' / construct.Int16ul, +) + +_READ_RECORD_REQUEST = construct.Struct( + construct.Const(b'\x05\x1f'), + 'record_id' / construct.Int16ul, +) + +_READING_RESPONSE = construct.Struct( + _COMMAND_SUCCESS, + 'timestamp' / _TIMESTAMP_ADAPTER, + 'value' / construct.Int32ul, +) class Device(serial.SerialDevice): - BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. - - def __init__(self, device): - super(Device, self).__init__(device) - - self.sent_counter_ = False - self.expect_receive_ = False - - def connect(self): - self._send_command('', disconnect=True) - - def disconnect(self): - self.connect() - - def _read_response(self): - response = _Packet() - - response.read_from(self.serial_) - - if not response.disconnect and response.sent_counter != self.expect_receive_: - raise lifescan.MalformedCommand( - 'at position 2[0b] expected %02x, received %02x' % ( - self.expect_receive_, response.expect_receive)) + BAUDRATE = 9600 + DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + TIMEOUT = 0.5 - if not response.acknowledge: - self.expect_receive_ = not self.expect_receive_ + def __init__(self, device): + super(Device, self).__init__(device) - response.validate_checksum() + self.sent_counter_ = False + self.expect_receive_ = False + self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) - if not response.acknowledge: - self._send_command('', acknowledge=True) + def connect(self): + self._send_packet(b'', disconnect=True) + self._read_ack() - return response + def disconnect(self): + self.connect() - def _send_command(self, cmd_bytes, acknowledge=False, disconnect=False): - cmd = _Packet() + def _send_packet(self, message, acknowledge=False, disconnect=False): + pkt = _PACKET.build( + {'value': { + 'message': message, + 'sequence_number': self.sent_counter_, + 'expect_receive': self.expect_receive_, + 'acknowledge': acknowledge, + 'disconnect': disconnect, + }}) + logging.debug('sending packet: %s', binascii.hexlify(pkt)) - # set the proper expectations - cmd.build_command(cmd_bytes) - cmd.sent_counter = self.sent_counter_ - cmd.expect_receive = self.expect_receive_ - cmd.acknowledge = acknowledge - cmd.disconnect = disconnect + self.serial_.write(pkt) + self.serial_.flush() - cmd.update_checksum() + def _read_packet(self): + raw_pkt = self.buffered_reader_.parse_stream(self.serial_) + logging.debug('received packet: %r', raw_pkt) - self.serial_.write(cmd.tobytes()) - self.serial_.flush() + # discard the checksum and copy + pkt = raw_pkt.value - if not acknowledge: - self.sent_counter_ = not self.sent_counter_ - result = self._read_response() - return result + if not pkt.disconnect and pkt.sequence_number != self.expect_receive_: + raise lifescan.MalformedCommand( + 'at position 2[0b] expected %02x, received %02x' % ( + self.expect_receive_, pkt.sequence_count)) - def get_meter_info(self): - return common.MeterInfo( - 'OneTouch Ultra Easy glucometer', - serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + return pkt - def get_version(self): - result = self._send_command(_READ_VERSION) + def _send_ack(self): + self._send_packet(b'', acknowledge=True, disconnect=False) - response = self._read_response() + def _read_ack(self): + pkt = self._read_packet() + assert pkt.acknowledge - return response.data[3:].tobytes().decode('ascii') + def _send_request(self, request_format, *args): + request = request_format.build(*args) + self._send_packet(request, acknowledge=False, disconnect=False) - def get_serial_number(self): - result = self._send_command(_READ_SERIAL_NUMBER) + self.sent_counter_ = not self.sent_counter_ + self._read_ack() - response = self._read_response() + def _read_response(self, response_format): + pkt = self._read_packet() + assert not pkt.acknowledge - return response.data[2:].tobytes().decode('ascii') + self.expect_receive_ = not self.expect_receive_ + self._send_ack() - def get_datetime(self): - result = self._send_command(_READ_DATETIME) - response = self._read_response() + return response_format.parse(pkt.message) - return _convert_timestamp(response.data[2:6]) + def get_meter_info(self): + return common.MeterInfo( + 'OneTouch Ultra Easy glucometer', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) - def set_datetime(self, date=datetime.datetime.now()): - epoch = datetime.datetime.utcfromtimestamp(0) - delta = date - epoch - timestamp = int(delta.total_seconds()) + def get_version(self): + self._send_request(_VERSION_REQUEST, None) - timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp) + response = self._read_response(_VERSION_RESPONSE) - result = self._send_command(_WRITE_DATETIME + timestamp_bytes) + return response.version - response = self._read_response() - return _convert_timestamp(response.data[2:6]) + def get_serial_number(self): + self._send_request(_SERIAL_NUMBER_REQUEST, None) - def zero_log(self): - result = self._send_command(_DELETE_RECORDS) - response = self._read_response() + response = self._read_response(_SERIAL_NUMBER_RESPONSE) + return response.serial_number - if response.data.tobytes() != b'\x05\x06': - raise exceptions.InvalidResponse(response.data) + def get_datetime(self): + self._send_request( + _DATETIME_REQUEST, {'request_type': 'read'}) + response = self._read_response(_DATETIME_RESPONSE) + return response.timestamp - def get_glucose_unit(self): - result = self._send_command(_READ_GLUCOSE_UNIT) - response = self._read_response() + def set_datetime(self, date=datetime.datetime.now()): + self._send_request(_DATETIME_REQUEST, { + 'request_type': 'write', + 'timestamp': date, + }) - if response.data[2] == 0: - return common.Unit.MG_DL - elif response.data[2] == 1: - return common.Unit.MMOL_L - else: - raise lifescan.MalformedCommand( - 'at position PM1 invalid value %02x for unit' % response.data[2]) + response = self._read_response(_DATETIME_RESPONSE) + return response.timestamp - def _get_reading(self, record_id): - id_bytes = _STRUCT_RECORDID.pack(record_id) + def zero_log(self): + self._send_request(_ZERO_LOG_REQUEST, None) + self._read_response(_COMMAND_SUCCESS) - result = self._send_command(_READ_RECORD + id_bytes) - return self._read_response() + def get_glucose_unit(self): + self._send_request(_GLUCOSE_UNIT_REQUEST, None) + response = self._read_response(_GLUCOSE_UNIT_RESPONSE) - def get_readings(self): - count_response = self._get_reading(_INVALID_RECORD) + return response.unit - record_count, = _STRUCT_RECORDID.unpack_from(count_response.data, 2) + def _get_reading(self, record_id): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': record_id}) + return self._read_response(_READING_RESPONSE) - for record_id in range(record_count): - record_response = self._get_reading(record_id) + def get_readings(self): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD}) + count_response = self._read_response(_READING_COUNT_RESPONSE) - timestamp = _convert_timestamp(record_response.data[2:6]) - value, = _STRUCT_TIMESTAMP.unpack_from(record_response.data, 6) + for record_id in range(count_response.count): + self._send_request( + _READ_RECORD_REQUEST, {'record_id': record_id}) + reading = self._read_response(_READING_RESPONSE) - yield common.GlucoseReading(timestamp, float(value)) + yield common.GlucoseReading( + reading.timestamp, + float(reading.value)) diff --git a/setup.py b/setup.py index a9b7c18..3be4b94 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ setup( # These are all the drivers' dependencies. Optional dependencies are # listed as mandatory for the feature. 'otultra2': ['pyserial'], - 'otultraeasy': ['pyserial'], + 'otultraeasy': ['construct', 'pyserial'], 'otverio2015': ['python-scsi'], 'fsinsulinx': ['construct', 'hidapi'], 'fslibre': ['construct', 'hidapi'], diff --git a/test/test_otultraeasy.py b/test/test_otultraeasy.py index a1d4c02..52a98f1 100644 --- a/test/test_otultraeasy.py +++ b/test/test_otultraeasy.py @@ -38,23 +38,6 @@ class TestOTUltraMini(unittest.TestCase): 0x62C2, lifescan.crc_ccitt(cmd_array)) - def test_packet_update_checksum(self): - packet = otultraeasy._Packet() - - packet.build_command('') - packet.disconnect = True - - packet.update_checksum() - self.assertEqual( - b'\x02\x06\x08\x03\xC2\x62', - packet.tobytes()) - - packet.validate_checksum() - packet.disconnect = False - - with self.assertRaises(exceptions.InvalidChecksum): - packet.validate_checksum() - if __name__ == '__main__': unittest.main() -- cgit v1.2.3