summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/fslibre.py
blob: 4dda376ff306d618820e7b5515dcc7d171728f14 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- coding: utf-8 -*-
"""Driver for FreeStyle Libre devices.

Supported features:
    - get readings (sensor, flash and blood glucose), including comments;
    - get and set date and time;
    - get serial number and software version.

Expected device path: /dev/hidraw9 or similar HID device. Optional when using
HIDAPI.

Further information on the device protocol can be found at

https://flameeyes.github.io/glucometer-protocols/abbott/freestyle-libre

"""

__author__ = 'Diego Elio Pettenò'
__email__ = 'flameeyes@flameeyes.eu'
__copyright__ = 'Copyright © 2017, Diego Elio Pettenò'
__license__ = 'MIT'

import datetime

from glucometerutils import common
from glucometerutils.support import freestyle

# Fields of the records returned by both $history and $arresult?
# Tuple of pairs of idx and field name
_BASE_ENTRY_MAP = (
    (1, 'type'),
    (2, 'month'),
    (3, 'day'),
    (4, 'year'),  # 2-digits
    (5, 'hour'),
    (6, 'minute'),
    (7, 'second'),
)

# Fields of the records returned by $history?
_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + (
    (13, 'value'),
    (15, 'errors'),
)

# Fields of the results returned by $arresult? where type = 2
_ARRESULT_TYPE2_ENTRY_MAP = (
    (9, 'reading-type'),  # 0 = sensor, 2 = glucose
    (12, 'value'),
    (15, 'sport-flag'),
    (16, 'medication-flag'),
    (17, 'rapid-acting-flag'),  # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
    (18, 'long-acting-flag'),
    (19, 'custom-comments-bitfield'),
    (23, 'double-long-acting-insulin'),
    (25, 'food-flag'),
    (26, 'food-carbs-grams'),
    (28, 'errors'),
)

# Fields only valid when rapid-acting-flag is "1"
_ARRESULT_RAPID_INSULIN_ENTRY_MAP = (
    (43, 'double-rapid-acting-insulin'),
)


def _parse_record(record, entry_map):
    """Parses a list of string fields into a dictionary of integers."""

    if not record:
        return {}

    try:
        return {
            key: int(record[idx]) for idx, key in entry_map
        }
    except IndexError:
        return {}


def _extract_timestamp(parsed_record):
    """Extract the timestamp from a parsed record.

    This leverages the fact that all the records have the same base structure.
    """

    return datetime.datetime(
        parsed_record['year'] + 2000,
        parsed_record['month'],
        parsed_record['day'],
        parsed_record['hour'],
        parsed_record['minute'],
        parsed_record['second'])


def _parse_arresult(record):
    """Takes an array of string fields as input and parses it into a Reading."""

    parsed_record = _parse_record(record, _BASE_ENTRY_MAP)

    # There are other record types, but we don't currently need to expose these.
    if not parsed_record or parsed_record['type'] != 2:
        return None

    parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP))

    # Check right away if we have rapid insulin
    if parsed_record['rapid-acting-flag']:
        parsed_record.update(
            _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))

    if parsed_record['errors']:
        return None

    comment_parts = []

    if parsed_record['reading-type'] == 2:
        comment_parts.append('(Scan)')
    elif parsed_record['reading-type'] == 0:
        comment_parts.append('(Blood)')
    else:
        # ketone reading
        return None

    custom_comments = record[29:35]
    for comment_index in range(6):
        if parsed_record['custom-comments-bitfield'] & (1 << comment_index):
            comment_parts.append(custom_comments[comment_index][1:-1])

    if parsed_record['sport-flag']:
        comment_parts.append('Sport')

    if parsed_record['medication-flag']:
        comment_parts.append('Medication')

    if parsed_record['food-flag']:
        if parsed_record['food-carbs-grams']:
            comment_parts.append(
                'Food (%d g)' % parsed_record['food-carbs-grams'])
        else:
            comment_parts.append('Food')

    if parsed_record['long-acting-flag']:
        if parsed_record['double-long-acting-insulin']:
            comment_parts.append(
                'Long-acting insulin (%d)' %
                (parsed_record['double-long-acting-insulin']/2))
        else:
            comment_parts.append('Long-acting insulin')

    if parsed_record['rapid-acting-flag']:
        # provide default value, as this record does not always exist
        # (even if rapid-acting-flag is set)
        if parsed_record.get('double-rapid-acting-insulin', 0):
            comment_parts.append(
                'Rapid-acting insulin (%d)' %
                (parsed_record['double-rapid-acting-insulin']/2))
        else:
            comment_parts.append('Rapid-acting insulin')

    return common.Reading(
        _extract_timestamp(parsed_record),
        parsed_record['value'],
        comment='; '.join(comment_parts))

class Device(freestyle.FreeStyleHidDevice):
    """Glucometer driver for FreeStyle Libre devices."""

    USB_PRODUCT_ID = 0x3650

    def get_meter_info(self):
        """Return the device information in structured form."""
        return common.MeterInfo(
            'FreeStyle Libre',
            serial_number=self.get_serial_number(),
            version_info=(
                'Software version: ' + self._get_version(),),
            native_unit=self.get_glucose_unit())

    def get_serial_number(self):
        """Overridden function as the command is not compatible."""
        return self._send_text_command(b'$sn?').rstrip('\r\n')

    def get_glucose_unit(self):
        """Returns the glucose unit of the device."""
        # TODO(Flameeyes): figure out how to identify the actual unit on the
        # device.
        return common.UNIT_MGDL

    def get_readings(self):

        # First of all get the usually longer list of sensor readings, and
        # convert them to Readings objects.
        for record in self._get_multirecord(b'$history?'):
            parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)

            if not parsed_record or parsed_record['errors'] != 0:
                # The reading is considered invalid, so ignore it.
                continue

            yield common.Reading(
                _extract_timestamp(parsed_record),
                parsed_record['value'],
                comment='(Sensor)')

        # Then get the results of explicit scans and blood tests (and other
        # events).
        for record in self._get_multirecord(b'$arresult?'):
            reading = _parse_arresult(record)
            if reading:
                yield reading