# -*- coding: utf-8 -*-
"""Driver for LifeScan OneTouch Ultra Easy devices.
Also supports OneTouch Ultra Mini devices (different name, same device).
Supported features:
- get readings;
- use the glucose unit preset on the device by default;
- get and set date and time;
- get serial number and software version;
- memory reset (caution!)
Expected device path: /dev/ttyUSB0 or similar serial port device.
"""
__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2014-2017, Diego Elio Pettenò'
__license__ = 'MIT'
import binascii
import datetime
import logging
import construct
from glucometerutils import common
from glucometerutils.support import lifescan
from glucometerutils.support import serial
_INVALID_RECORD = 501
_EPOCH = datetime.datetime.utcfromtimestamp(0)
def datetime_to_timestamp(date):
delta = date - _EPOCH
return int(delta.total_seconds())
_PACKET = construct.Struct(
construct.RawCopy(
construct.Embedded(
construct.Struct(
construct.Const(b'\x02'), # stx
'length' / construct.Rebuild(
construct.Byte, lambda ctx: len(ctx.message) + 6),
construct.EmbeddedBitStruct(
construct.Padding(3),
'more' / construct.Default(construct.Flag, False),
'disconnect' / construct.Flag,
'acknowledge' / construct.Flag,
'expect_receive' / construct.Flag,
'sequence_number' / construct.Flag,
),
'message' / construct.Bytes(length=lambda ctx: ctx.length - 6),
construct.Const(b'\x03'), # etx
),
),
),
'checksum' / construct.Checksum(
construct.Int16ul, lifescan.crc_ccitt, construct.this.data),
)
_COMMAND_SUCCESS = construct.Const(b'\x05\x06')
_TIMESTAMP_ADAPTER = construct.ExprAdapter(
construct.Int32ul,
encoder=lambda obj, ctx: datetime_to_timestamp(obj),
decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj))
_VERSION_REQUEST = construct.Const(b'\x05\x0d\x02')
_VERSION_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
'version' / construct.PascalString(construct.Byte, encoding='ascii'),
)
_SERIAL_NUMBER_REQUEST = construct.Const(
b'\x05\x0B\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00')
_SERIAL_NUMBER_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
'serial_number' / construct.GreedyString(encoding='ascii'),
)
_DATETIME_REQUEST = construct.Struct(
construct.Const(b'\x05\x20'), # 0x20 is the datetime
'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02),
'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH),
)
_DATETIME_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
'timestamp' / _TIMESTAMP_ADAPTER,
)
_GLUCOSE_UNIT_REQUEST = construct.Const(
b'\x05\x09\x02\x09\x00\x00\x00\x00')
_GLUCOSE_MAPPING = {
common.Unit.MG_DL: 0x00,
common.Unit.MMOL_L: 0x01,
}
_GLUCOSE_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
'unit' / construct.SymmetricMapping(
construct.Byte, _GLUCOSE_MAPPING),
construct.Padding(3),
)
_ZERO_LOG_REQUEST = construct.Const(b'\x05\x1A')
_READING_COUNT_RESPONSE = construct.Struct(
construct.Const(b'\x05\x0f'),
'count' / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
construct.Const(b'\x05\x1f'),
'record_id' / construct.Int16ul,
)
_READING_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
'timestamp' / _TIMESTAMP_ADAPTER,
'value' / construct.Int32ul,
)
class Device(serial.SerialDevice):
BAUDRATE = 9600
DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable.
TIMEOUT = 0.5
def __init__(self, device):
super(Device, self).__init__(device)
self.sent_counter_ = False
self.expect_receive_ = False
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
def connect(self):
self._send_packet(b'', disconnect=True)
self._read_ack()
def disconnect(self):
self.connect()
def _send_packet(self, message, acknowledge=False, disconnect=False):
pkt = _PACKET.build(
{'value': {
'message': message,
'sequence_number': self.sent_counter_,
'expect_receive': self.expect_receive_,
'acknowledge': acknowledge,
'disconnect': disconnect,
}})
logging.debug('sending packet: %s', binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
def _read_packet(self):
raw_pkt = self.buffered_reader_.parse_stream(self.serial_)
logging.debug('received packet: %r', raw_pkt)
# discard the checksum and copy
pkt = raw_pkt.value
if not pkt.disconnect and pkt.sequence_number != self.expect_receive_:
raise lifescan.MalformedCommand(
'at position 2[0b] expected %02x, received %02x' % (
self.expect_receive_, pkt.sequence_count))
return pkt
def _send_ack(self):
self._send_packet(b'', acknowledge=True, disconnect=False)
def _read_ack(self):
pkt = self._read_packet()
assert pkt.acknowledge
def _send_request(self, request_format, *args):
request = request_format.build(*args)
self._send_packet(request, acknowledge=False, disconnect=False)
self.sent_counter_ = not self.sent_counter_
self._read_ack()
def _read_response(self, response_format):
pkt = self._read_packet()
assert not pkt.acknowledge
self.expect_receive_ = not self.expect_receive_
self._send_ack()
return response_format.parse(pkt.message)
def get_meter_info(self):
return common.MeterInfo(
'OneTouch Ultra Easy glucometer',
serial_number=self.get_serial_number(),
version_info=(
'Software version: ' + self.get_version(),),
native_unit=self.get_glucose_unit())
def get_version(self):
self._send_request(_VERSION_REQUEST, None)
response = self._read_response(_VERSION_RESPONSE)
return response.version
def get_serial_number(self):
self._send_request(_SERIAL_NUMBER_REQUEST, None)
response = self._read_response(_SERIAL_NUMBER_RESPONSE)
return response.serial_number
def get_datetime(self):
self._send_request(
_DATETIME_REQUEST, {'request_type': 'read'})
response = self._read_response(_DATETIME_RESPONSE)
return response.timestamp
def set_datetime(self, date=datetime.datetime.now()):
self._send_request(_DATETIME_REQUEST, {
'request_type': 'write',
'timestamp': date,
})
response = self._read_response(_DATETIME_RESPONSE)
return response.timestamp
def zero_log(self):
self._send_request(_ZERO_LOG_REQUEST, None)
self._read_response(_COMMAND_SUCCESS)
def get_glucose_unit(self):
self._send_request(_GLUCOSE_UNIT_REQUEST, None)
response = self._read_response(_GLUCOSE_UNIT_RESPONSE)
return response.unit
def _get_reading(self, record_id):
self._send_request(
_READ_RECORD_REQUEST, {'record_id': record_id})
return self._read_response(_READING_RESPONSE)
def get_readings(self):
self._send_request(
_READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD})
count_response = self._read_response(_READING_COUNT_RESPONSE)
for record_id in range(count_response.count):
self._send_request(
_READ_RECORD_REQUEST, {'record_id': record_id})
reading = self._read_response(_READING_RESPONSE)
yield common.GlucoseReading(
reading.timestamp,
float(reading.value))