summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/td4277.py
blob: 038529980b87d3f2c0d83031241e59eb5d3501f7 (plain) (tree)






















                                                                  

                                                       
 

                           

              






                                                      

                        
                         




                                                                                 

          

                                                                                  











                                            
                         





                                 
                                                                 


                                    
                                                                                  


                                     


                                       


                                         
                                                   


                                             
                                                                  








                                         


                                                           

 
 

                                                            









                                           







                                                                     

                                                                   


                              
                                                                    

 

                                                                



                               

                                                                              
 
                                                                                     
                                            
                                                                  



                                                                   
                                                      







                                                                        

                                                     

                                                           

                                                                                 





                                                         

                                                              




                             
                                                            











                                                               
                                         

                                


                                                                             


                                                      


                                                                       











                                                                            

                                                            


                                                            

                                                         


                                                                          

                                                                      







                                                



                                                                             
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: MIT
"""Driver for TaiDoc TD-4277 devices.

Supported features:
    - get readings, including pre-/post-meal notes;
    - get and set date and time;
    - get serial number (partial);
    - memory reset (caution!)

Expected device path: 0001:001c:00 (libusb), /dev/hidraw1 (Linux).
"""

import binascii
import datetime
import enum
import functools
import logging
import operator

import construct

from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, serial


class Direction(enum.Enum):
    In = 0xA5
    Out = 0xA3


def byte_checksum(data):
    return functools.reduce(operator.add, data) & 0xFF


_PACKET = construct.Struct(
    "data"
    / construct.RawCopy(
        construct.Struct(
            construct.Const(b"\x51"),
            "command" / construct.Byte,
            "message" / construct.Bytes(4),
            "direction"
            / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
        ),
    ),
    "checksum"
    / construct.Checksum(construct.Byte, byte_checksum, construct.this.data.data),
)

_EMPTY_MESSAGE = 0

_CONNECT_REQUEST = 0x22
_VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54}

_GET_DATETIME = 0x23
_SET_DATETIME = 0x33

_GET_MODEL = 0x24

_GET_READING_COUNT = 0x2B
_GET_READING_DATETIME = 0x25
_GET_READING_VALUE = 0x26

_CLEAR_MEMORY = 0x52

_MODEL_STRUCT = construct.Struct(
    construct.Const(b"\x77\x42"), construct.Byte, construct.Byte,
)

_DATETIME_STRUCT = construct.Struct(
    "day" / construct.Int16ul, "minute" / construct.Byte, "hour" / construct.Byte,
)

_DAY_BITSTRUCT = construct.BitStruct(
    "year" / construct.BitsInteger(7),
    "month" / construct.BitsInteger(4),
    "day" / construct.BitsInteger(5),
)

_READING_COUNT_STRUCT = construct.Struct(
    "count" / construct.Int16ul, construct.Int16ul,
)

_READING_SELECTION_STRUCT = construct.Struct(
    "record_id" / construct.Int16ul, construct.Const(b"\x00\x00"),
)

_MEAL_FLAG = {
    common.Meal.NONE: 0x00,
    common.Meal.BEFORE: 0x40,
    common.Meal.AFTER: 0x80,
}

_READING_VALUE_STRUCT = construct.Struct(
    "value" / construct.Int16ul,
    construct.Const(b"\x06"),
    "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
)


def _make_packet(command, message, direction=Direction.Out):
    return _PACKET.build(
        {
            "data": {
                "value": {
                    "command": command,
                    "message": message,
                    "direction": direction,
                },
            }
        }
    )


def _parse_datetime(message):
    date = _DATETIME_STRUCT.parse(message)
    # We can't parse the day properly with a single pass of Construct
    # unfortunately.
    day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day))
    return datetime.datetime(
        2000 + day.year, day.month, day.day, date.hour, date.minute
    )


def _select_record(record_id):
    return _READING_SELECTION_STRUCT.build({"record_id": record_id})


class Device(serial.SerialDevice, driver_base.GlucometerDriver):

    BAUDRATE = 19200
    TIMEOUT = 0.5

    def __init__(self, device):
        super(Device, self).__init__("cp2110://" + device)
        self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)

    def _send_command(self, command, message=_EMPTY_MESSAGE, validate_response=True):
        pkt = _make_packet(command, message)
        logging.debug("sending packet: %s", binascii.hexlify(pkt))

        self.serial_.write(pkt)
        self.serial_.flush()
        response = self.buffered_reader_.parse_stream(self.serial_)
        logging.debug("received packet: %r", response)

        if validate_response and response.data.value.command != command:
            raise InvalidResponse(response)

        return response.data.value.command, response.data.value.message

    def connect(self):
        response_command, message = self._send_command(
            _CONNECT_REQUEST, validate_response=False
        )
        if response_command not in _VALID_CONNECT_RESPONSE:
            raise exceptions.ConnectionFailed(
                "Invalid response received: %2x %r" % (response_command, message)
            )

        _, model_message = self._send_command(_GET_MODEL)
        try:
            _MODEL_STRUCT.parse(model_message)
        except construct.ConstructError:
            raise exceptions.ConnectionFailed(
                "Invalid model identified: %r" % model_message
            )

    def disconnect(self):
        pass

    def get_meter_info(self):
        return common.MeterInfo("TaiDoc TD-4277 glucometer")

    def get_version(self):  # pylint: disable=no-self-use
        raise NotImplementedError

    def get_serial_number(self):  # pylint: disable=no-self-use
        raise NotImplementedError

    def get_datetime(self):
        _, message = self._send_command(_GET_DATETIME)

        return _parse_datetime(message)

    def _set_device_datetime(self, date):
        assert date.year >= 2000

        day_struct = _DAY_BITSTRUCT.build(
            {"year": date.year - 2000, "month": date.month, "day": date.day,}
        )

        day_word = construct.Int16ub.parse(day_struct)

        date_message = _DATETIME_STRUCT.build(
            {"day": day_word, "minute": date.minute, "hour": date.hour}
        )

        _, message = self._send_command(_SET_DATETIME, message=date_message)

        return _parse_datetime(message)

    def _get_reading_count(self):
        _, message = self._send_command(_GET_READING_COUNT)

        return _READING_COUNT_STRUCT.parse(message).count

    def _get_reading(self, record_id):
        _, reading_date_message = self._send_command(
            _GET_READING_DATETIME, _select_record(record_id)
        )
        reading_date = _parse_datetime(reading_date_message)

        _, reading_value_message = self._send_command(
            _GET_READING_VALUE, _select_record(record_id)
        )
        reading_value = _READING_VALUE_STRUCT.parse(reading_value_message)

        return common.GlucoseReading(
            reading_date, reading_value.value, meal=reading_value.meal
        )

    def get_readings(self):
        record_count = self._get_reading_count()
        for record_id in range(record_count):
            yield self._get_reading(record_id)

    def zero_log(self):
        self._send_command(_CLEAR_MEMORY)

    def get_glucose_unit(self):
        """Maybe this could be implemented by someone who knows the device"""
        raise NotImplementedError