From d7ab5281015dbfe5c92d471894e6f63d693bad6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sat, 3 Aug 2019 14:29:14 +0100 Subject: td4277: new driver for rebranded TaiDoc TD-4277 glucometers. This driver supports the GlucoRx Nexus, Menarini GlucoMen Nexus, and Aktivmed GlucoCheck XL. --- glucometerutils/drivers/td4277.py | 234 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 glucometerutils/drivers/td4277.py (limited to 'glucometerutils/drivers') diff --git a/glucometerutils/drivers/td4277.py b/glucometerutils/drivers/td4277.py new file mode 100644 index 0000000..3a5051b --- /dev/null +++ b/glucometerutils/drivers/td4277.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +# +# SPDX-License-Identifier: MIT +"""Driver for TaiDoc TD-4277 devices. + +Supported features: + - get readings, including pre-/post-meal notes; + - get and set date and time; + - get serial number (partial); + - memory reset (caution!) + +Expected device path: 0001:001c:00 (libusb), /dev/hidraw1 (Linux). +""" + +import binascii +import datetime +import enum +import functools +import logging +import operator + +import construct + +from glucometerutils import common +from glucometerutils import exceptions +from glucometerutils.support import serial + +class Direction(enum.Enum): + In = 0xa5 + Out = 0xa3 + + +def byte_checksum(data): + return functools.reduce(operator.add, data) & 0xFF + + +_PACKET = construct.Struct( + 'data' / construct.RawCopy( + construct.Struct( + construct.Const(b'\x51'), + 'command' / construct.Byte, + 'message' / construct.Bytes(4), + 'direction' / construct.Mapping( + construct.Byte, + {e: e.value for e in Direction}), + ), + ), + 'checksum' / construct.Checksum( + construct.Byte, byte_checksum, construct.this.data.data), +) + +_EMPTY_MESSAGE = 0 + +_CONNECT_REQUEST = 0x22 +_VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54} + +_GET_DATETIME = 0x23 +_SET_DATETIME = 0x33 + +_GET_MODEL = 0x24 + +_GET_READING_COUNT = 0x2b +_GET_READING_DATETIME = 0x25 +_GET_READING_VALUE = 0x26 + +_CLEAR_MEMORY = 0x52 + +_MODEL_STRUCT = construct.Struct( + construct.Const(b'\x77\x42'), + construct.Byte, + construct.Byte, +) + +_DATETIME_STRUCT = construct.Struct( + 'day' / construct.Int16ul, + 'minute' / construct.Byte, + 'hour' / construct.Byte, +) + +_DAY_BITSTRUCT = construct.BitStruct( + 'year' / construct.BitsInteger(7), + 'month' / construct.BitsInteger(4), + 'day' / construct.BitsInteger(5), +) + +_READING_COUNT_STRUCT = construct.Struct( + 'count' / construct.Int16ul, + construct.Int16ul, +) + +_READING_SELECTION_STRUCT = construct.Struct( + 'record_id' / construct.Int16ul, + construct.Const(b'\x00\x00'), +) + +_MEAL_FLAG = { + common.Meal.NONE: 0x00, + common.Meal.BEFORE: 0x40, + common.Meal.AFTER: 0x80, +} + +_READING_VALUE_STRUCT = construct.Struct( + 'value' / construct.Int16ul, + construct.Const(b'\x06'), + 'meal'/ construct.Mapping( + construct.Byte, _MEAL_FLAG), +) + +def _make_packet(command, message, direction=Direction.Out): + return _PACKET.build( + {'data': { + 'value': { + 'command': command, + 'message': message, + 'direction': direction, + }, + }}) + + +def _parse_datetime(message): + date = _DATETIME_STRUCT.parse(message) + # We can't parse the day properly with a single pass of Construct + # unfortunately. + day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day)) + return datetime.datetime( + 2000+day.year, day.month, day.day, date.hour, date.minute) + + +def _select_record(record_id): + return _READING_SELECTION_STRUCT.build({'record_id': record_id}) + + +class Device(serial.SerialDevice): + BAUDRATE = 19200 + TIMEOUT = 0.5 + + def __init__(self, device): + super(Device, self).__init__('cp2110://' + device) + self.buffered_reader_ = construct.Rebuffered( + _PACKET, tailcutoff=1024) + + def _send_command( + self, command, message=_EMPTY_MESSAGE, validate_response=True): + pkt = _make_packet(command, message) + logging.debug('sending packet: %s', binascii.hexlify(pkt)) + + self.serial_.write(pkt) + self.serial_.flush() + response = self.buffered_reader_.parse_stream(self.serial_) + logging.debug('received packet: %r', response) + + if validate_response and response.data.value.command != command: + raise InvalidResponse(response) + + return response.data.value.command, response.data.value.message + + def connect(self): + response_command, message = self._send_command( + _CONNECT_REQUEST, validate_response=False) + if response_command not in _VALID_CONNECT_RESPONSE: + raise exceptions.ConnectionFailed( + 'Invalid response received: %2x %r' % ( + response_command, message)) + + _, model_message = self._send_command(_GET_MODEL) + try: + _MODEL_STRUCT.parse(model_message) + except construct.ConstructError: + raise exceptions.ConnectionFailed( + 'Invalid model identified: %r' % model_message) + + def disconnect(self): + pass + + def get_meter_info(self): + return common.MeterInfo('TaiDoc TD-4277 glucometer') + + def get_version(self): # pylint: disable=no-self-use + raise NotImplementedError + + def get_serial_number(self): # pylint: disable=no-self-use + raise NotImplementedError + + def get_datetime(self): + _, message = self._send_command(_GET_DATETIME) + + return _parse_datetime(message) + + def set_datetime(self, date=datetime.datetime.now()): + assert date.year >= 2000 + + day_struct = _DAY_BITSTRUCT.build({ + 'year': date.year - 2000, + 'month': date.month, + 'day': date.day, + }) + + day_word = construct.Int16ub.parse(day_struct) + + date_message = _DATETIME_STRUCT.build({ + 'day': day_word, + 'minute': date.minute, + 'hour': date.hour}) + + _, message = self._send_command(_SET_DATETIME, message=date_message) + + return _parse_datetime(message) + + def _get_reading_count(self): + _, message = self._send_command(_GET_READING_COUNT) + + return _READING_COUNT_STRUCT.parse(message).count + + def _get_reading(self, record_id): + _, reading_date_message = self._send_command( + _GET_READING_DATETIME, + _select_record(record_id)) + reading_date = _parse_datetime(reading_date_message) + + _, reading_value_message = self._send_command( + _GET_READING_VALUE, + _select_record(record_id)) + reading_value = _READING_VALUE_STRUCT.parse(reading_value_message) + + return common.GlucoseReading( + reading_date, reading_value.value, meal=reading_value.meal) + + def get_readings(self): + record_count = self._get_reading_count() + for record_id in range(record_count): + yield self._get_reading(record_id) + + def zero_log(self): + self._send_command(_CLEAR_MEMORY) -- cgit v1.2.3