# -*- coding: utf-8 -*- # # SPDX-FileCopyrightText: © 2014 The glucometerutils Authors # SPDX-License-Identifier: MIT """Driver for LifeScan OneTouch Ultra Easy devices. Also supports OneTouch Ultra Mini devices (different name, same device). Supported features: - get readings; - use the glucose unit preset on the device by default; - get and set date and time; - get serial number and software version; - memory reset (caution!) Expected device path: /dev/ttyUSB0 or similar serial port device. """ import binascii import datetime import logging from collections.abc import Generator from typing import Any, Optional import construct from glucometerutils import common, driver from glucometerutils.support import ( construct_extras, lifescan, lifescan_binary_protocol, serial, ) _PACKET = lifescan_binary_protocol.LifeScanPacket(True) _INVALID_RECORD = 501 _COMMAND_SUCCESS = construct.Const(b"\x05\x06") _VERSION_REQUEST = construct.Const(b"\x05\x0d\x02") _VERSION_RESPONSE = construct.Struct( success=_COMMAND_SUCCESS, version=construct.PascalString(construct.Byte, encoding="ascii"), ) _SERIAL_NUMBER_REQUEST = construct.Const( b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00" ) _SERIAL_NUMBER_RESPONSE = construct.Struct( success=_COMMAND_SUCCESS, serial_number=construct.GreedyString(encoding="ascii"), ) _DATETIME_REQUEST = construct.Struct( const=construct.Const(b"\x05\x20"), # 0x20 is the datetime request_type=construct.Enum(construct.Byte, write=0x01, read=0x02), timestamp=construct.Default( construct_extras.Timestamp(construct.Int32ul), # type: ignore datetime.datetime(1970, 1, 1, 0, 0), ), ) _DATETIME_RESPONSE = construct.Struct( success=_COMMAND_SUCCESS, timestamp=construct_extras.Timestamp(construct.Int32ul), # type: ignore ) _GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00") _GLUCOSE_UNIT_RESPONSE = construct.Struct( success=_COMMAND_SUCCESS, unit=lifescan_binary_protocol.GLUCOSE_UNIT, padding=construct.Padding(3), ) _MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A") _READING_COUNT_RESPONSE = construct.Struct( const=construct.Const(b"\x0f"), count=construct.Int16ul, ) _READ_RECORD_REQUEST = construct.Struct( const=construct.Const(b"\x05\x1f"), record_id=construct.Int16ul, ) _READING_RESPONSE = construct.Struct( success=_COMMAND_SUCCESS, timestamp=construct_extras.Timestamp(construct.Int32ul), # type: ignore value=construct.Int32ul, ) def _make_packet( message: bytes, sequence_number: int, expect_receive: bool, acknowledge: bool, disconnect: bool, ): return _PACKET.build( { "data": { "value": { "message": message, "link_control": { "sequence_number": sequence_number, "expect_receive": expect_receive, "acknowledge": acknowledge, "disconnect": disconnect, }, } } } ) class Device(serial.SerialDevice, driver.GlucometerDevice): BAUDRATE = 9600 DEFAULT_CABLE_ID = "067b:2303" # Generic PL2303 cable. TIMEOUT = 0.5 def __init__(self, device: Optional[str]) -> None: super().__init__(device) self.sent_counter_ = False self.expect_receive_ = False self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024) def connect(self) -> None: try: self._send_packet(b"", disconnect=True) self._read_ack() except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) def disconnect(self) -> None: self.connect() def _send_packet( self, message: bytes, acknowledge: bool = False, disconnect: bool = False ) -> None: pkt = _make_packet( message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect ) logging.debug("sending packet: %s", binascii.hexlify(pkt)) self.serial_.write(pkt) self.serial_.flush() def _read_packet(self) -> construct.Container: raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data logging.debug("received packet: %r", raw_pkt) # discard the checksum and copy pkt = raw_pkt.value if not pkt.link_control.disconnect and ( pkt.link_control.sequence_number != self.expect_receive_ ): raise lifescan.MalformedCommand( f"at position 2[0b] expected {self.expect_receive_:02x}, received {pkt.link_connect.sequence_count:02x}" ) return pkt def _send_ack(self) -> None: self._send_packet(b"", acknowledge=True, disconnect=False) def _read_ack(self) -> None: pkt = self._read_packet() assert pkt.link_control.acknowledge def _send_request( self, request_format: construct.Struct, request_obj: Optional[dict[str, Any]], response_format: construct.Struct, ) -> construct.Container: try: request = request_format.build(request_obj) self._send_packet(request, acknowledge=False, disconnect=False) self.sent_counter_ = not self.sent_counter_ self._read_ack() response_pkt = self._read_packet() assert not response_pkt.link_control.acknowledge self.expect_receive_ = not self.expect_receive_ self._send_ack() return response_format.parse(response_pkt.message) except construct.ConstructError as e: raise lifescan.MalformedCommand(str(e)) def get_meter_info(self) -> common.MeterInfo: return common.MeterInfo( "OneTouch Ultra Easy glucometer", serial_number=self.get_serial_number(), version_info=("Software version: " + self.get_version(),), native_unit=self.get_glucose_unit(), ) def get_version(self) -> str: response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE) return response.version def get_serial_number(self) -> str: response = self._send_request( _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE ) return response.serial_number def get_datetime(self) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE ) return response.timestamp def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: response = self._send_request( _DATETIME_REQUEST, {"request_type": "write", "timestamp": date}, _DATETIME_RESPONSE, ) return response.timestamp def zero_log(self) -> None: self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS) def get_glucose_unit(self) -> common.Unit: response = self._send_request( _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE ) return response.unit def _get_reading_count(self) -> int: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": _INVALID_RECORD}, _READING_COUNT_RESPONSE, ) return response.count def _get_reading(self, record_id: int) -> common.GlucoseReading: response = self._send_request( _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE ) return common.GlucoseReading(response.timestamp, float(response.value)) def get_readings(self) -> Generator[common.AnyReading, None, None]: record_count = self._get_reading_count() for record_id in range(record_count): yield self._get_reading(record_id)