summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDiego Elio Pettenò <flameeyes@flameeyes.eu>2016-02-14 18:14:07 +0100
committerDiego Elio Pettenò <flameeyes@flameeyes.eu>2016-02-14 18:14:07 +0100
commitd4322fc94a02261ebabbca2f24c03e87095720de (patch)
tree8f8a64cf39212358bc68deb57ca520060807ee66
parentMake the ConnectionFailed exception more generic. (diff)
downloadglucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar.gz
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar.bz2
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar.lz
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar.xz
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.tar.zst
glucometerutils-d4322fc94a02261ebabbca2f24c03e87095720de.zip
-rw-r--r--README5
-rw-r--r--glucometerutils/drivers/otverio2015.py226
2 files changed, 231 insertions, 0 deletions
diff --git a/README b/README
index bde999f..0f51b4c 100644
--- a/README
+++ b/README
@@ -23,9 +23,14 @@ Supported devices
* **LifeScan OneTouch Ultra Easy** (also known as **Ultra Mini**):
get information, get and set time, dump of readings in native
units, memory reset.
+ * **LifeScan OneTouch Verio** (USB version), **LifeScan OneTouch
+ Select Plus**: get information, get and set time, dump of
+ readings. Requires [python-scsi][1].
* **Abbott FreeStyle Optium**: get information, get and set time,
dump of readings in native units.
+[1] https://github.com/rosjat/python-scsi
+
Dump format
-----------
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
new file mode 100644
index 0000000..44f3573
--- /dev/null
+++ b/glucometerutils/drivers/otverio2015.py
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+"""Driver for LifeScan OneTouch Verio 2015 devices.
+
+Further information on the device protocol can be found at
+
+https://github.com/Flameeyes/glucometer-protocols/blob/master/lifescan/onetouch-verio-2015.md
+"""
+
+__author__ = 'Diego Elio Pettenò'
+__email__ = 'flameeyes@flameeyes.eu'
+__copyright__ = 'Copyright © 2016, Diego Elio Pettenò'
+__license__ = 'MIT'
+
+import datetime
+import struct
+
+from pyscsi.pyscsi.scsi import SCSI
+from pyscsi.pyscsi.scsi_device import SCSIDevice
+
+from glucometerutils import common
+from glucometerutils import exceptions
+from glucometerutils.drivers import lifescan_common
+
+# Match the same values in the otultraeasy driver.
+_STX = 0x02
+_ETX = 0x03
+
+# This device uses SCSI blocks as registers.
+_REGISTER_SIZE = 512
+
+_STRUCT_PREAMBLE = struct.Struct('<BH')
+_STRUCT_CODA = _STRUCT_PREAMBLE # they are actually the same, mirrored.
+
+_STRUCT_UINT16 = struct.Struct('<H')
+_STRUCT_UINT32 = struct.Struct('<I')
+
+_STRUCT_CHECKSUM = _STRUCT_UINT16
+_STRUCT_TIMESTAMP = _STRUCT_UINT32
+_STRUCT_RECORDID = _STRUCT_UINT16
+_STRUCT_READING = _STRUCT_UINT32
+_STRUCT_RECORD = struct.Struct('<BBHBHIIBBB')
+
+_QUERY_REQUEST = b'\x04\xe6\x02'
+_QUERY_KEY_SERIAL = b'\x00'
+_QUERY_KEY_MODEL = b'\x01'
+_QUERY_KEY_SOFTWARE = b'\x02'
+
+_READ_RTC_REQUEST = b'\x04\x20\x02'
+_WRITE_RTC_REQUEST = b'\x04\x20\x01'
+# All timestamp reported by this device are seconds since this date.
+_EPOCH_BASE = 946684800 # 2010-01-01 00:00
+
+_READ_RECORD_COUNT_REQUEST = b'\x04\x27\x00'
+_READ_RECORD_REQUEST_PREFIX = b'\x04\x31\x02'
+_READ_RECORD_REQUEST_SUFFIX = b'\x00'
+
+_MEMORY_ERASE_REQUEST = b'\x04\x1a'
+
+def _extract_message(register):
+ """Parse the message preamble and verify checksums."""
+ stx, length = _STRUCT_PREAMBLE.unpack_from(register)
+ if stx != _STX:
+ raise lifescan_common.MalformedCommand(
+ 'invalid STX byte: %02x' % stx)
+ if length > _REGISTER_SIZE:
+ raise lifescan_common.MalformedCommand(
+ 'invalid length: %d > REGISTER_SIZE' % length)
+
+ # 2 is the length of the checksum, so it should be ignored.
+ calculated_checksum = lifescan_common.crc_ccitt(register[:(length-2)])
+
+ coda_offset = length - _STRUCT_CODA.size
+ etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:])
+ if etx != _ETX:
+ raise lifescan_common.MalformedCommand(
+ 'invalid ETX byte: %02x' % etx)
+ if encoded_checksum != calculated_checksum:
+ raise lifescan_common.InvalidChecksum(
+ encoded_checksum, calculated_checksum)
+
+ response = register[_STRUCT_PREAMBLE.size:coda_offset]
+ return response
+
+def _encode_message(cmd):
+ """Add message preamble and calculate checksum, add padding."""
+ length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size
+ preamble = _STRUCT_PREAMBLE.pack(_STX, length)
+ message = preamble + cmd + bytes((_ETX,))
+ checksum = _STRUCT_CHECKSUM.pack(lifescan_common.crc_ccitt(message))
+
+ # Pad the message to match the size of the register.
+ return message + checksum + bytes(
+ _REGISTER_SIZE - 2 - len(message))
+
+def _convert_timestamp(timestamp):
+ return datetime.datetime.fromtimestamp(timestamp + _EPOCH_BASE)
+
+class Device(object):
+ def __init__(self, device):
+ self.device_name_ = device
+ self.scsi_device_ = SCSIDevice(device)
+ self.scsi_ = SCSI(self.scsi_device_)
+ self.scsi_.blocksize = _REGISTER_SIZE
+
+ def _send_message(self, cmd, lba):
+ """Send a request to the meter, and read its response.
+
+ Args:
+ cmd: (bytes) the raw command to send the device, without
+ preamble or checksum.
+ lba: (int) the address of the block register to use, known
+ valid addresses are 3, 4 and 5.
+
+ Returns:
+ (bytes) The raw response from the meter. No preamble or coda is
+ present, and the checksum has already been validated.
+ """
+ self.scsi_.write10(lba, 1, _encode_message(cmd))
+ response = self.scsi_.read10(lba, 1)
+ # TODO: validate that the response is valid.
+ return _extract_message(response.datain)
+
+ def connect(self):
+ inq = self.scsi_.inquiry()
+ vendor = inq.result['t10_vendor_identification'][:32]
+ if vendor != b'LifeScan':
+ raise exceptions.ConnectionFailed(
+ 'Device %s is not a LifeScan glucometer.' % self.device_name_)
+
+ def disconnect(self):
+ return
+
+ def get_information_string(self):
+ return ('OneTouch %s glucometer\n'
+ 'Serial number: %s\n'
+ 'Software version: %s\n'
+ 'Time: %s\n'
+ 'Default unit: unknown\n' % (
+ self._query_string(_QUERY_KEY_MODEL),
+ self.get_serial_number(),
+ self.get_version(),
+ self.get_datetime()))
+
+ def _query_string(self, query_key):
+ response = self._send_message(_QUERY_REQUEST + query_key, 3)
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+ # Strings are encoded in wide characters (LE), but they should
+ # only contain ASCII characters. Note that the string is
+ # null-terminated, so the last character should be dropped.
+ return response[2:].decode('utf-16-le')[:-1]
+
+ def get_serial_number(self):
+ return self._query_string(_QUERY_KEY_SERIAL)
+
+ def get_version(self):
+ return self._query_string(_QUERY_KEY_SOFTWARE)
+
+ def get_datetime(self):
+ response = self._send_message(_READ_RTC_REQUEST, 3)
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+ (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:])
+ return _convert_timestamp(timestamp)
+
+ def set_datetime(self, date=datetime.datetime.now()):
+ epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE)
+ delta = date - epoch
+ timestamp = int(delta.total_seconds())
+
+ timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp)
+ response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3)
+
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+
+ # The device does not return the new datetime, so confirm by
+ # calling READ RTC again.
+ return self.get_datetime()
+
+ def zero_log(self):
+ response = self._send_message(_MEMORY_ERASE_REQUEST, 3)
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+
+ def _get_reading_count(self):
+ response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3)
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+
+ (record_count,) = _STRUCT_RECORDID.unpack(response[2:])
+ return record_count
+
+ def get_glucose_unit(self):
+ return common.UNIT_MGDL
+
+ def _get_reading(self, record_number):
+ request = (_READ_RECORD_REQUEST_PREFIX +
+ _STRUCT_RECORDID.pack(record_number) +
+ _READ_RECORD_REQUEST_SUFFIX)
+ response = self._send_message(request, 3)
+ if response[0:2] != b'\x04\06':
+ raise lifescan_common.MalformedCommand(
+ 'invalid response, expected 04 06, received %02x %02x' % (
+ response[0], response[1]))
+
+ (unused_const1, unused_const2, unused_counter, unused_const3,
+ unused_counter2, timestamp, value, unused_flags, unused_const4,
+ unused_const5) = _STRUCT_RECORD.unpack(response)
+
+ return common.Reading(_convert_timestamp(timestamp), float(value))
+
+ def get_readings(self):
+ record_count = self._get_reading_count()
+ for record_number in range(record_count):
+ yield self._get_reading(record_number)