summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/sdcodefree.py
blob: 4a375bd37b64b8cfd6ec1a6850c785bf8395eb34 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
                       













                                                                                


                                    
                                                        





                   
              



               

                                      
                                          
 

                                 
























                                                        


                                                                      


                                             


                             


                           
                                                                

                      
                                              
 



                                                              



















































































                                                                              
                                
















                                                                             
                                                        













                                           
                                        


                                                                     
# -*- coding: utf-8 -*-
"""Driver for SD CodeFree devices by SD Biosensor.

For SD Biosensor glucometers using the serial interface.

Supported features:
    - get readings, including pre-/post-meal notes;
    - set date and time.

Expected device path: /dev/ttyUSB0 or similar serial port device.

IMPORTANT NOTE: the glucometer can be connected before starting the program, but
it has to be turned on when the program asks you to.
"""


__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2017, Diego Elio Pettenò'
__license__ = 'MIT'

import array
import collections
import datetime
import functools
import logging
import operator
import struct
import time

from glucometerutils import common
from glucometerutils import exceptions
from glucometerutils.support import serial

_STX = 0x53    # Not really 'STX'
_ETX = 0xAA    # Not really 'ETX'

_DIR_IN = 0x20
_DIR_OUT = 0x10

_IDX_STX = 0
_IDX_DIRECTION = 1
_IDX_LENGTH = 2
_IDX_CHECKSUM = -2
_IDX_ETX = -1

_RECV_PREAMBLE = b'\x53\x20'

_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
_RESPONSE_PACKET = b'\x10\x40'

_DATE_SET_PACKET = b'\x10\x10'

_DISCONNECT_PACKET = b'\x10\x60'
_DISCONNECTED_PACKET = b'\x10\x70'

_STRUCT_READINGS_COUNT = struct.Struct('>H')

_FETCH_PACKET = b'\x10\x60'

_ReadingRecord = collections.namedtuple(
    '_ReadingRecord',
    ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute',
     'value', 'meal_flag'))
_STRUCT_READING = struct.Struct('>BBBBBBBHB')

_MEAL_FLAG = {
    0x00: common.Meal.NONE,
    0x10: common.Meal.BEFORE,
    0x20: common.Meal.AFTER,
}

def parse_reading(msgdata):
    return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata))

def xor_checksum(msg):
    return functools.reduce(operator.xor, msg)

class Device(serial.SerialDevice):
    BAUDRATE = 38400
    DEFAULT_CABLE_ID = '10c4:ea60'  # Generic cable.
    TIMEOUT = 300  # We need to wait for data from the device.

    def read_packet(self):
        preamble = self.serial_.read(3)
        if len(preamble) != 3:
            raise exceptione.InvalidResponse(
                response='Expected 3 bytes, received %d' % len(preamble))
        if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE:
            raise exceptions.InvalidResponse(
                response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH])

        msglen = preamble[_IDX_LENGTH]
        message = self.serial_.read(msglen)
        if len(message) != msglen:
            raise exception.InvalidResponse(
                response='Expected %d bytes, received %d' %
                (msglen, len(message)))
        if message[_IDX_ETX] != _ETX:
            raise exception.InvalidResponse(
                response='Unexpected end-of-transmission byte: %02x' %
                message[_IDX_ETX])

        # Calculate the checksum up until before the checksum itself.
        msgdata = message[:_IDX_CHECKSUM]

        cksum = xor_checksum(msgdata)
        if cksum != message[_IDX_CHECKSUM]:
            raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum)

        return msgdata

    def wait_and_ready(self):
        challenge = self.serial_.read(1)

        # The first packet read may have a prefixed zero, it might be a bug in
        # the cp210x driver or device, but discard it if found.
        if challenge == b'\0':
            challege = self.serial_.read(1)
            if challenge != b'\x53':
                raise exceptions.ConnectionFailed(
                    message='Unexpected starting bytes %r' % challenge)

        challenge += self.serial_.read(6)

        if challenge != _CHALLENGE_PACKET_FULL:
            raise exceptions.ConnectionFailed(
                message='Unexpected challenge %r' % challenge)

        self.send_packet(_RESPONSE_PACKET)

        # The first packet only contains the counter of how many readings are
        # available.
        first_packet = self.read_packet()

        count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1)

        return count[0]

    def send_packet(self, msgdata):
        packet = array.array('B')
        packet.extend((_STX, _DIR_OUT, len(msgdata)+2))
        packet.extend(msgdata)
        packet.extend((xor_checksum(msgdata), _ETX))
        self.serial_.write(packet.tobytes())

    def connect(self):
        print("Please connect and turn on the device.")

    def disconnect(self):
        self.send_packet(_DISCONNECT_PACKET)
        response = self.read_packet()
        if response != _DISCONNECTED_PACKET:
            raise exceptions.InvalidResponse(response=response)

    def get_meter_info(self):
        return common.MeterInfo('SD CodeFree glucometer')

    def get_version(self):
        raise NotImplementedError

    def get_serial_number(self):
        raise NotImplementedError

    def get_glucose_unit(self):
        # Device does not provide information on glucose unit.
        return common.Unit.MG_DL

    def get_datetime(self):
        raise NotImplementedError

    def set_datetime(self, date=datetime.datetime.now()):
        setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')

        # Ignore the readings count.
        self.wait_and_ready()

        self.send_packet(setdatecmd)
        response = self.read_packet()
        if response != _DATE_SET_PACKET:
            raise exceptions.InvalidResponse(response=response)

        # The date we return should only include up to minute, unfortunately.
        return datetime.datetime(date.year, date.month, date.day,
                                 date.hour, date.minute)

    def zero_log(self):
        raise NotmplementedError

    def get_readings(self):
        count = self.wait_and_ready()

        for _ in range(count):
            self.send_packet(_FETCH_PACKET)
            rpkt = self.read_packet()

            r = parse_reading(rpkt)
            meal = _MEAL_FLAG[r.meal_flag]

            yield common.GlucoseReading(
                datetime.datetime(
                    2000 + r.year, r.month, r.day, r.hour, r.minute),
                r.value, meal=meal)