summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/otultraeasy.py
blob: b0aa093da5dffbb20ea6806ce9d42c690462f0d5 (plain) (tree)
1
2
3
4
                       
 
                                                             
                              












                                                                        
 
               
               
              

                                     

                
 
                                          

                                     



                             
 
                                                       


                     
                                               
 
                                                   

                                     

                                                                     


                                         

                                                       

                                           

                                                           


                                     


                                                                       
                                                                      

                                            


                                      

                                                                            

 
                                                                            
 

                                          


                                               

 
                                                    

                                           

                                   


                                        

                                       


                                     


                                                                            
 
 
 






                         
                         













                                                           
 
 
                                                           
                   
                                                           
                 
 

                                                      
 

                                    
                                                                              
 
                              
            
                                                   


                                                   
 
                                 
                      
 


                                                                                 
                           


                                                                                      
 

                               
 
                                                  
                                                                       
                                                     
 

                                       
 
                                                

                                                                    
                                            
                                                                                                                        
             
 
                  
 
                                
                                                                  
 
                                
                                 
                                           
 


                                         
                                              

                                          
            
                                                       
                                                                           
 

                                                       
 
                                              
                                                            
 

                                                           
 
                                                              

                                                   
 
                                                 
                                
                                             
                                                   


                                                                      
 
                                 
                                                                                
 
                               
 
                                       
                                      

                                                                 
 
                                     
 
                                                
                                      

                                                                           
 
                                 
 
                                                                                 
                                      
                              
                                                         

                               
                                 
 
                               
                                                                         
 
                                              
                                      

                                                               
 
                            
 
                                        
                                      



                                           

                             
                                                                    
                                      

                                                                             
 
                                                                               
 
                                                                       


                                                
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: © 2014 The glucometerutils Authors
# SPDX-License-Identifier: MIT
"""Driver for LifeScan OneTouch Ultra Easy devices.

Also supports OneTouch Ultra Mini devices (different name, same device).

Supported features:
    - get readings;
    - use the glucose unit preset on the device by default;
    - get and set date and time;
    - get serial number and software version;
    - memory reset (caution!)

Expected device path: /dev/ttyUSB0 or similar serial port device.
"""

import binascii
import datetime
import logging
from collections.abc import Generator
from typing import Any, Optional

import construct

from glucometerutils import common, driver
from glucometerutils.support import (
    construct_extras,
    lifescan,
    lifescan_binary_protocol,
    serial,
)

_PACKET = lifescan_binary_protocol.LifeScanPacket(True)

_INVALID_RECORD = 501

_COMMAND_SUCCESS = construct.Const(b"\x05\x06")

_VERSION_REQUEST = construct.Const(b"\x05\x0d\x02")

_VERSION_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    version=construct.PascalString(construct.Byte, encoding="ascii"),
)

_SERIAL_NUMBER_REQUEST = construct.Const(
    b"\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)

_SERIAL_NUMBER_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    serial_number=construct.GreedyString(encoding="ascii"),
)

_DATETIME_REQUEST = construct.Struct(
    const=construct.Const(b"\x05\x20"),  # 0x20 is the datetime
    request_type=construct.Enum(construct.Byte, write=0x01, read=0x02),
    timestamp=construct.Default(
        construct_extras.Timestamp(construct.Int32ul),  # type: ignore
        datetime.datetime(1970, 1, 1, 0, 0),
    ),
)

_DATETIME_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    timestamp=construct_extras.Timestamp(construct.Int32ul),  # type: ignore
)

_GLUCOSE_UNIT_REQUEST = construct.Const(b"\x05\x09\x02\x09\x00\x00\x00\x00")


_GLUCOSE_UNIT_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    unit=lifescan_binary_protocol.GLUCOSE_UNIT,
    padding=construct.Padding(3),
)

_MEMORY_ERASE_REQUEST = construct.Const(b"\x05\x1A")

_READING_COUNT_RESPONSE = construct.Struct(
    const=construct.Const(b"\x0f"),
    count=construct.Int16ul,
)

_READ_RECORD_REQUEST = construct.Struct(
    const=construct.Const(b"\x05\x1f"),
    record_id=construct.Int16ul,
)

_READING_RESPONSE = construct.Struct(
    success=_COMMAND_SUCCESS,
    timestamp=construct_extras.Timestamp(construct.Int32ul),  # type: ignore
    value=construct.Int32ul,
)


def _make_packet(
    message: bytes,
    sequence_number: int,
    expect_receive: bool,
    acknowledge: bool,
    disconnect: bool,
):
    return _PACKET.build(
        {
            "data": {
                "value": {
                    "message": message,
                    "link_control": {
                        "sequence_number": sequence_number,
                        "expect_receive": expect_receive,
                        "acknowledge": acknowledge,
                        "disconnect": disconnect,
                    },
                }
            }
        }
    )


class Device(serial.SerialDevice, driver.GlucometerDevice):
    BAUDRATE = 9600
    DEFAULT_CABLE_ID = "067b:2303"  # Generic PL2303 cable.
    TIMEOUT = 0.5

    def __init__(self, device: Optional[str]) -> None:
        super().__init__(device)

        self.sent_counter_ = False
        self.expect_receive_ = False
        self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)

    def connect(self) -> None:
        try:
            self._send_packet(b"", disconnect=True)
            self._read_ack()
        except construct.ConstructError as e:
            raise lifescan.MalformedCommand(str(e))

    def disconnect(self) -> None:
        self.connect()

    def _send_packet(
        self, message: bytes, acknowledge: bool = False, disconnect: bool = False
    ) -> None:
        pkt = _make_packet(
            message, self.sent_counter_, self.expect_receive_, acknowledge, disconnect
        )
        logging.debug("sending packet: %s", binascii.hexlify(pkt))

        self.serial_.write(pkt)
        self.serial_.flush()

    def _read_packet(self) -> construct.Container:
        raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
        logging.debug("received packet: %r", raw_pkt)

        # discard the checksum and copy
        pkt = raw_pkt.value

        if not pkt.link_control.disconnect and (
            pkt.link_control.sequence_number != self.expect_receive_
        ):
            raise lifescan.MalformedCommand(
                f"at position 2[0b] expected {self.expect_receive_:02x}, received {pkt.link_connect.sequence_count:02x}"
            )

        return pkt

    def _send_ack(self) -> None:
        self._send_packet(b"", acknowledge=True, disconnect=False)

    def _read_ack(self) -> None:
        pkt = self._read_packet()
        assert pkt.link_control.acknowledge

    def _send_request(
        self,
        request_format: construct.Struct,
        request_obj: Optional[dict[str, Any]],
        response_format: construct.Struct,
    ) -> construct.Container:
        try:
            request = request_format.build(request_obj)
            self._send_packet(request, acknowledge=False, disconnect=False)

            self.sent_counter_ = not self.sent_counter_
            self._read_ack()

            response_pkt = self._read_packet()
            assert not response_pkt.link_control.acknowledge

            self.expect_receive_ = not self.expect_receive_
            self._send_ack()

            return response_format.parse(response_pkt.message)
        except construct.ConstructError as e:
            raise lifescan.MalformedCommand(str(e))

    def get_meter_info(self) -> common.MeterInfo:
        return common.MeterInfo(
            "OneTouch Ultra Easy glucometer",
            serial_number=self.get_serial_number(),
            version_info=("Software version: " + self.get_version(),),
            native_unit=self.get_glucose_unit(),
        )

    def get_version(self) -> str:
        response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)

        return response.version

    def get_serial_number(self) -> str:
        response = self._send_request(
            _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
        )

        return response.serial_number

    def get_datetime(self) -> datetime.datetime:
        response = self._send_request(
            _DATETIME_REQUEST, {"request_type": "read"}, _DATETIME_RESPONSE
        )

        return response.timestamp

    def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
        response = self._send_request(
            _DATETIME_REQUEST,
            {"request_type": "write", "timestamp": date},
            _DATETIME_RESPONSE,
        )
        return response.timestamp

    def zero_log(self) -> None:
        self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)

    def get_glucose_unit(self) -> common.Unit:
        response = self._send_request(
            _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
        )

        return response.unit

    def _get_reading_count(self) -> int:
        response = self._send_request(
            _READ_RECORD_REQUEST,
            {"record_id": _INVALID_RECORD},
            _READING_COUNT_RESPONSE,
        )
        return response.count

    def _get_reading(self, record_id: int) -> common.GlucoseReading:
        response = self._send_request(
            _READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
        )

        return common.GlucoseReading(response.timestamp, float(response.value))

    def get_readings(self) -> Generator[common.AnyReading, None, None]:
        record_count = self._get_reading_count()
        for record_id in range(record_count):
            yield self._get_reading(record_id)