summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/sdcodefree.py
blob: ec232ddb97c450d56823187fddc41c148ad463fa (plain) (tree)





























































































































































































                                                                                 
# -*- coding: utf-8 -*-
"""Driver for SD CodeFree devices by SD Biosensor"""

__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2016, Diego Elio Pettenò'
__license__ = 'MIT'

import array
import collections
import datetime
import functools
import operator
import struct
import time

import serial

from glucometerutils import common
from glucometerutils import exceptions

_STX = 0x53  # Not really 'STX'
_ETX = 0xAA  # Not really 'ETX'

_DIR_IN = 0x20
_DIR_OUT = 0x10

_IDX_STX = 0
_IDX_DIRECTION = 1
_IDX_LENGTH = 2
_IDX_CHECKSUM = -2
_IDX_ETX = -1

_RECV_PREAMBLE = b'\x53\x20'

_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
_RESPONSE_PACKET = b'\x10\x40'

_DATE_SET_PACKET = b'\x10\x10'

_DISCONNECT_PACKET = b'\x10\x60'
_DISCONNECTED_PACKET = b'\x10\x70'

_STRUCT_READINGS_COUNT = struct.Struct('>H')

_FETCH_PACKET = b'\x10\x60'

_ReadingRecord = collections.namedtuple(
  '_ReadingRecord',
  ('unknown1', 'unknown2', 'year', 'month', 'day', 'hour', 'minute',
   'value', 'meal_flag'))
_STRUCT_READING = struct.Struct('>BBBBBBBHB')

_MEAL_FLAG = {
  0x00: common.NO_MEAL,
  0x10: common.BEFORE_MEAL,
  0x20: common.AFTER_MEAL
}

def parse_reading(msgdata):
  return _ReadingRecord(*_STRUCT_READING.unpack_from(msgdata))

def xor_checksum(msg):
  return functools.reduce(operator.xor, msg)

class Device(object):
  def __init__(self, device):
    self.serial_ = serial.Serial(
      port=device, baudrate=38400, bytesize=serial.EIGHTBITS,
      parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
      timeout=300, xonxoff=False, rtscts=False, dsrdtr=False,
      writeTimeout=None)

  def read_packet(self):
    preamble = self.serial_.read(3)
    if len(preamble) != 3:
      raise exceptione.InvalidResponse(
        response='Expected 3 bytes, received %d' % len(preamble))
    if preamble[0:_IDX_LENGTH] != _RECV_PREAMBLE:
      raise exceptions.InvalidResponse(
        response='Unexpected preamble %r' % pramble[0:_IDX_LENGTH])

    msglen = preamble[_IDX_LENGTH]
    message = self.serial_.read(msglen)
    if len(message) != msglen:
      raise exception.InvalidResponse(
        response='Expected %d bytes, received %d' % (msglen, len(message)))
    if message[_IDX_ETX] != _ETX:
      raise exception.InvalidResponse(
        response='Unexpected end-of-transmission byte: %02x' % message[_IDX_ETX])

    # Calculate the checksum up until before the checksum itself.
    msgdata = message[:_IDX_CHECKSUM]
    
    cksum = xor_checksum(msgdata)
    if cksum != message[_IDX_CHECKSUM]:
      raise exception.InvalidChecksum(message[_IDX_CHECKSUM], cksum)

    return msgdata

  def wait_and_ready(self):
    challenge = self.serial_.read(1)

    # The first packet read may have a prefixed zero, it might be a bug in the
    # cp210x driver or device, but discard it if found.
    if challenge == b'\0':
      challege = self.serial_.read(1)
      if challenge != b'\x53':
        raise exceptions.ConnectionFailed(
          message='Unexpected starting bytes %r' % challenge)

    challenge += self.serial_.read(6)
    
    if challenge != _CHALLENGE_PACKET_FULL:
      raise exceptions.ConnectionFailed(
        message='Unexpected challenge %r' % challenge)

    self.send_packet(_RESPONSE_PACKET)

    # The first packet only contains the counter of how many readings are
    # available.
    first_packet = self.read_packet()
    
    count = _STRUCT_READINGS_COUNT.unpack_from(first_packet, 1)

    return count[0]

  def send_packet(self, msgdata):
    packet = array.array('B')
    packet.extend((_STX, _DIR_OUT, len(msgdata)+2))
    packet.extend(msgdata)
    packet.extend((xor_checksum(msgdata), _ETX))
    self.serial_.write(packet.tobytes())
    
  def connect(self):
    print("Please connect and turn on the device.")

  def disconnect(self):
    self.send_packet(_DISCONNECT_PACKET)
    response = self.read_packet()
    if response != _DISCONNECTED_PACKET:
      raise exceptions.InvalidResponse(response=response)

  def get_information_string(self):
    return ('SD CodeFree glucometer\n')

  def get_version(self):
    raise NotImplementedError

  def get_serial_number(self):
    raise NotImplementedError

  def get_glucose_unit(self):
    # Device does not provide information on glucose unit.
    return common.UNIT_MGDL

  def get_datetime(self):
    raise NotImplementedError

  def set_datetime(self, date=datetime.datetime.now()):
    setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')

    # Ignore the readings count.
    self.wait_and_ready()
    
    self.send_packet(setdatecmd)
    response = self.read_packet()
    if response != _DATE_SET_PACKET:
      raise exceptions.InvalidResponse(response=response)
    
    # The date we return should only include up to minute, unfortunately.
    return datetime.datetime(date.year, date.month, date.day,
                             date.hour, date.minute)

  def zero_log(self):
    raise NotmplementedError

  def get_readings(self):
    count = self.wait_and_ready()

    for _ in range(count):
      self.send_packet(_FETCH_PACKET)
      rpkt = self.read_packet()

      r = parse_reading(rpkt)
      meal = _MEAL_FLAG[r.meal_flag]

      yield common.Reading(
        datetime.datetime(2000 + r.year, r.month, r.day, r.hour, r.minute),
        r.value, meal=meal)