summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/otverio2015.py
blob: 44f3573a703c0203dd355a51fb2a5509a34756b2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# -*- coding: utf-8 -*-
"""Driver for LifeScan OneTouch Verio 2015 devices.

Further information on the device protocol can be found at

https://github.com/Flameeyes/glucometer-protocols/blob/master/lifescan/onetouch-verio-2015.md
"""

__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2016, Diego Elio Pettenò'
__license__ = 'MIT'

import datetime
import struct

from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice

from glucometerutils import common
from glucometerutils import exceptions
from glucometerutils.drivers import lifescan_common

# Match the same values in the otultraeasy driver.
_STX = 0x02
_ETX = 0x03

# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512

_STRUCT_PREAMBLE = struct.Struct('<BH')
_STRUCT_CODA = _STRUCT_PREAMBLE  # they are actually the same, mirrored.

_STRUCT_UINT16 = struct.Struct('<H')
_STRUCT_UINT32 = struct.Struct('<I')

_STRUCT_CHECKSUM = _STRUCT_UINT16
_STRUCT_TIMESTAMP = _STRUCT_UINT32
_STRUCT_RECORDID = _STRUCT_UINT16
_STRUCT_READING = _STRUCT_UINT32
_STRUCT_RECORD = struct.Struct('<BBHBHIIBBB')

_QUERY_REQUEST = b'\x04\xe6\x02'
_QUERY_KEY_SERIAL = b'\x00'
_QUERY_KEY_MODEL = b'\x01'
_QUERY_KEY_SOFTWARE = b'\x02'

_READ_RTC_REQUEST = b'\x04\x20\x02'
_WRITE_RTC_REQUEST = b'\x04\x20\x01'
# All timestamp reported by this device are seconds since this date.
_EPOCH_BASE = 946684800  # 2010-01-01 00:00

_READ_RECORD_COUNT_REQUEST = b'\x04\x27\x00'
_READ_RECORD_REQUEST_PREFIX = b'\x04\x31\x02'
_READ_RECORD_REQUEST_SUFFIX = b'\x00'

_MEMORY_ERASE_REQUEST = b'\x04\x1a'

def _extract_message(register):
  """Parse the message preamble and verify checksums."""
  stx, length = _STRUCT_PREAMBLE.unpack_from(register)
  if stx != _STX:
    raise lifescan_common.MalformedCommand(
      'invalid STX byte: %02x' % stx)
  if length > _REGISTER_SIZE:
    raise lifescan_common.MalformedCommand(
      'invalid length: %d > REGISTER_SIZE' % length)

  # 2 is the length of the checksum, so it should be ignored.
  calculated_checksum = lifescan_common.crc_ccitt(register[:(length-2)])

  coda_offset = length - _STRUCT_CODA.size
  etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:])
  if etx != _ETX:
    raise lifescan_common.MalformedCommand(
      'invalid ETX byte: %02x' % etx)
  if encoded_checksum != calculated_checksum:
    raise lifescan_common.InvalidChecksum(
      encoded_checksum, calculated_checksum)

  response = register[_STRUCT_PREAMBLE.size:coda_offset]
  return response

def _encode_message(cmd):
  """Add message preamble and calculate checksum, add padding."""
  length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size
  preamble = _STRUCT_PREAMBLE.pack(_STX, length)
  message = preamble + cmd + bytes((_ETX,))
  checksum = _STRUCT_CHECKSUM.pack(lifescan_common.crc_ccitt(message))

  # Pad the message to match the size of the register.
  return message + checksum + bytes(
    _REGISTER_SIZE - 2 - len(message))

def _convert_timestamp(timestamp):
  return datetime.datetime.fromtimestamp(timestamp + _EPOCH_BASE)

class Device(object):
  def __init__(self, device):
    self.device_name_ = device
    self.scsi_device_ = SCSIDevice(device)
    self.scsi_ = SCSI(self.scsi_device_)
    self.scsi_.blocksize = _REGISTER_SIZE

  def _send_message(self, cmd, lba):
    """Send a request to the meter, and read its response.

    Args:
      cmd: (bytes) the raw command to send the device, without
        preamble or checksum.
      lba: (int) the address of the block register to use, known
        valid addresses are 3, 4 and 5.

    Returns:
      (bytes) The raw response from the meter. No preamble or coda is
      present, and the checksum has already been validated.
    """
    self.scsi_.write10(lba, 1, _encode_message(cmd))
    response = self.scsi_.read10(lba, 1)
    # TODO: validate that the response is valid.
    return _extract_message(response.datain)

  def connect(self):
    inq = self.scsi_.inquiry()
    vendor = inq.result['t10_vendor_identification'][:32]
    if vendor != b'LifeScan':
      raise exceptions.ConnectionFailed(
        'Device %s is not a LifeScan glucometer.' % self.device_name_)

  def disconnect(self):
    return

  def get_information_string(self):
    return ('OneTouch %s glucometer\n'
            'Serial number: %s\n'
            'Software version: %s\n'
            'Time: %s\n'
            'Default unit: unknown\n' % (
              self._query_string(_QUERY_KEY_MODEL),
              self.get_serial_number(),
              self.get_version(),
              self.get_datetime()))

  def _query_string(self, query_key):
    response = self._send_message(_QUERY_REQUEST + query_key, 3)
    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))
    # Strings are encoded in wide characters (LE), but they should
    # only contain ASCII characters. Note that the string is
    # null-terminated, so the last character should be dropped.
    return response[2:].decode('utf-16-le')[:-1]

  def get_serial_number(self):
    return self._query_string(_QUERY_KEY_SERIAL)

  def get_version(self):
    return self._query_string(_QUERY_KEY_SOFTWARE)

  def get_datetime(self):
    response = self._send_message(_READ_RTC_REQUEST, 3)
    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))
    (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:])
    return _convert_timestamp(timestamp)

  def set_datetime(self, date=datetime.datetime.now()):
    epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE)
    delta = date - epoch
    timestamp = int(delta.total_seconds())

    timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp)
    response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3)

    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))

    # The device does not return the new datetime, so confirm by
    # calling READ RTC again.
    return self.get_datetime()

  def zero_log(self):
    response = self._send_message(_MEMORY_ERASE_REQUEST, 3)
    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))    

  def _get_reading_count(self):
    response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3)
    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))

    (record_count,) = _STRUCT_RECORDID.unpack(response[2:])
    return record_count

  def get_glucose_unit(self):
    return common.UNIT_MGDL

  def _get_reading(self, record_number):
    request = (_READ_RECORD_REQUEST_PREFIX +
               _STRUCT_RECORDID.pack(record_number) +
               _READ_RECORD_REQUEST_SUFFIX)
    response = self._send_message(request, 3)
    if response[0:2] != b'\x04\06':
      raise lifescan_common.MalformedCommand(
        'invalid response, expected 04 06, received %02x %02x' % (
          response[0], response[1]))

    (unused_const1, unused_const2, unused_counter, unused_const3,
     unused_counter2, timestamp, value, unused_flags, unused_const4,
     unused_const5) = _STRUCT_RECORD.unpack(response)

    return common.Reading(_convert_timestamp(timestamp), float(value))

  def get_readings(self):
    record_count = self._get_reading_count()
    for record_number in range(record_count):
      yield self._get_reading(record_number)