diff options
Diffstat (limited to '')
-rw-r--r-- | glucometerutils/drivers/accuchek_reports.py | 174 | ||||
-rw-r--r-- | glucometerutils/drivers/fsoptium.py | 368 | ||||
-rw-r--r-- | glucometerutils/drivers/otultra2.py | 386 | ||||
-rw-r--r-- | glucometerutils/exceptions.py | 44 | ||||
-rwxr-xr-x | glucometerutils/glucometer.py | 236 | ||||
-rw-r--r-- | glucometerutils/support/lifescan.py | 52 |
6 files changed, 630 insertions, 630 deletions
diff --git a/glucometerutils/drivers/accuchek_reports.py b/glucometerutils/drivers/accuchek_reports.py index 404ed9b..49c698d 100644 --- a/glucometerutils/drivers/accuchek_reports.py +++ b/glucometerutils/drivers/accuchek_reports.py @@ -48,90 +48,90 @@ _TIME_FORMAT = '%H:%M' _DATETIME_FORMAT = ' '.join((_DATE_FORMAT, _TIME_FORMAT)) class Device(object): - def __init__(self, device): - if not device or not os.path.isdir(device): - raise exceptions.CommandLineError( - '--device parameter is required, should point to mount path for the ' - 'meter.') - - report_files = glob.glob(os.path.join(device, '*', 'Reports', '*.csv')) - if not report_files: - raise exceptions.ConnectionFailed( - 'No report file found in path "%s".' % reports_path) - - self.report_file = report_files[0] - - def _get_records_reader(self): - self.report.seek(0) - # Skip the first two lines - next(self.report) - next(self.report) - - return csv.DictReader( - self.report, delimiter=';', skipinitialspace=True, quoting=csv.QUOTE_NONE) - - def connect(self): - self.report = open(self.report_file, 'r', newline='\r\n', encoding='utf-8') - - def disconnect(self): - self.report.close() - - def get_meter_info(self): - return common.MeterInfo( - '%s glucometer' % self.get_model(), - serial_number=self.get_serial_number(), - native_unit=self.get_glucose_unit()) - - def get_model(self): - # $device/MODEL/Reports/*.csv - return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) - - def get_serial_number(self): - self.report.seek(0) - # ignore the first line. - next(self.report) - # The second line of the CSV is serial-no;report-date;report-time;;;;;;; - return next(self.report).split(';')[0] - - def get_glucose_unit(self): - # Get the first record available and parse that. - record = next(self._get_records_reader()) - return _UNIT_MAP[record[_UNIT_CSV_KEY]] - - def get_datetime(self): - raise NotImplemented - - def set_datetime(self, date=None): - raise NotImplemented - - def zero_log(self): - raise NotImplemented - - def _extract_datetime(self, record): - # Date and time are in separate column, but we want to parse them - # together. - date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) - return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) - - def _extract_meal(self, record): - if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: - raise InvalidResponse('Reading cannot be before and after meal.') - elif record[_AFTER_MEAL_CSV_KEY]: - return common.Meal.AFTER - elif record[_BEFORE_MEAL_CSV_KEY]: - return common.Meal.BEFORE - else: - return common.Meal.NONE - - def get_readings(self): - for record in self._get_records_reader(): - if record[_RESULT_CSV_KEY] is None: - continue - - yield common.GlucoseReading( - self._extract_datetime(record), - common.convert_glucose_unit( - float(record[_RESULT_CSV_KEY]), - _UNIT_MAP[record[_UNIT_CSV_KEY]], - common.Unit.MG_DL), - meal=self._extract_meal(record)) + def __init__(self, device): + if not device or not os.path.isdir(device): + raise exceptions.CommandLineError( + '--device parameter is required, should point to mount path for the ' + 'meter.') + + report_files = glob.glob(os.path.join(device, '*', 'Reports', '*.csv')) + if not report_files: + raise exceptions.ConnectionFailed( + 'No report file found in path "%s".' % reports_path) + + self.report_file = report_files[0] + + def _get_records_reader(self): + self.report.seek(0) + # Skip the first two lines + next(self.report) + next(self.report) + + return csv.DictReader( + self.report, delimiter=';', skipinitialspace=True, quoting=csv.QUOTE_NONE) + + def connect(self): + self.report = open(self.report_file, 'r', newline='\r\n', encoding='utf-8') + + def disconnect(self): + self.report.close() + + def get_meter_info(self): + return common.MeterInfo( + '%s glucometer' % self.get_model(), + serial_number=self.get_serial_number(), + native_unit=self.get_glucose_unit()) + + def get_model(self): + # $device/MODEL/Reports/*.csv + return os.path.basename(os.path.dirname(os.path.dirname(self.report_file))) + + def get_serial_number(self): + self.report.seek(0) + # ignore the first line. + next(self.report) + # The second line of the CSV is serial-no;report-date;report-time;;;;;;; + return next(self.report).split(';')[0] + + def get_glucose_unit(self): + # Get the first record available and parse that. + record = next(self._get_records_reader()) + return _UNIT_MAP[record[_UNIT_CSV_KEY]] + + def get_datetime(self): + raise NotImplemented + + def set_datetime(self, date=None): + raise NotImplemented + + def zero_log(self): + raise NotImplemented + + def _extract_datetime(self, record): + # Date and time are in separate column, but we want to parse them + # together. + date_and_time = ' '.join((record[_DATE_CSV_KEY], record[_TIME_CSV_KEY])) + return datetime.datetime.strptime(date_and_time, _DATETIME_FORMAT) + + def _extract_meal(self, record): + if record[_AFTER_MEAL_CSV_KEY] and record[_BEFORE_MEAL_CSV_KEY]: + raise InvalidResponse('Reading cannot be before and after meal.') + elif record[_AFTER_MEAL_CSV_KEY]: + return common.Meal.AFTER + elif record[_BEFORE_MEAL_CSV_KEY]: + return common.Meal.BEFORE + else: + return common.Meal.NONE + + def get_readings(self): + for record in self._get_records_reader(): + if record[_RESULT_CSV_KEY] is None: + continue + + yield common.GlucoseReading( + self._extract_datetime(record), + common.convert_glucose_unit( + float(record[_RESULT_CSV_KEY]), + _UNIT_MAP[record[_UNIT_CSV_KEY]], + common.Unit.MG_DL), + meal=self._extract_meal(record)) diff --git a/glucometerutils/drivers/fsoptium.py b/glucometerutils/drivers/fsoptium.py index 1e1a319..395494c 100644 --- a/glucometerutils/drivers/fsoptium.py +++ b/glucometerutils/drivers/fsoptium.py @@ -65,214 +65,214 @@ _MONTH_MATCHES = { def _parse_clock(datestr): - """Convert the date/time string used by the the device into a datetime. + """Convert the date/time string used by the the device into a datetime. - Args: - datestr: a string as returned by the device during information handling. - """ - match = _CLOCK_RE.match(datestr) - if not match: - raise exceptions.InvalidResponse(datestr) - - # int() parses numbers in decimal, so we don't have to worry about '08' - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) - - hour, minute, second = map(match.group('time').split(':'), int) - - return datetime.datetime(year, month, day, hour, minute, second) - - -class Device(serial.SerialDevice): - BAUDRATE = 19200 - DEFAULT_CABLE_ID = '1a61:3420' - - def _send_command(self, command): - cmd_bytes = bytes('$%s\r\n' % command, 'ascii') - logging.debug('Sending command: %r', cmd_bytes) - - self.serial_.write(cmd_bytes) - self.serial_.flush() - - response = self.serial_.readlines() - - logging.debug('Received response: %r', response) - - # We always want to decode the output, and remove stray \r\n. Any failure in - # decoding means the output is invalid anyway. - decoded_response = [line.decode('ascii').rstrip('\r\n') - for line in response] - return decoded_response - - def connect(self): - self._send_command('xmem') # ignore output this time - self._fetch_device_information() - - def disconnect(self): - return - - def _fetch_device_information(self): - data = self._send_command('colq') - - for line in data: - parsed_line = line.split('\t') - - if parsed_line[0] == 'S/N:': - self.device_serialno_ = parsed_line[1] - elif parsed_line[0] == 'Ver:': - self.device_version_ = parsed_line[1] - if parsed_line[2] == 'MMOL': - self.device_glucose_unit_ = common.Unit.MMOL_L - else: # I only have a mmol/l device, so I can't be sure. - self.device_glucose_unit_ = common.Unit.MG_DL - # There are more entries: Clock, Market, ROM and Usage, but we don't care - # for those here. - elif parsed_line[0] == 'CMD OK': - return - - # I have not figured out why this happens, but sometimes it's echoing back - # the commands and not replying to them. - raise exceptions.ConnectionFailed() - - def get_meter_info(self): - """Fetch and parses the device information. - - Returns: - A common.MeterInfo object. - """ - return common.MeterInfo( - 'Freestyle Optium glucometer', - serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) - - def get_version(self): - """Returns an identifier of the firmware version of the glucometer. - - Returns: - The software version returned by the glucometer, such as "0.22" - """ - return self.device_version_ - - def get_serial_number(self): - """Retrieve the serial number of the device. - - Returns: - A string representing the serial number of the device. - """ - return self.device_serialno_ - - def get_glucose_unit(self): - """Returns a constant representing the unit displayed by the meter. - - Returns: - common.Unit.MG_DL: if the glucometer displays in mg/dL - common.Unit.MMOL_L: if the glucometer displays in mmol/L - """ - return self.device_glucose_unit_ - - def get_datetime(self): - """Returns the current date and time for the glucometer. - - Returns: - A datetime object built according to the returned response. + Args: + datestr: a string as returned by the device during information handling. """ - data = self._send_command('colq') + match = _CLOCK_RE.match(datestr) + if not match: + raise exceptions.InvalidResponse(datestr) - for line in data: - if not line.startswith('Clock:'): - continue + # int() parses numbers in decimal, so we don't have to worry about '08' + day = int(match.group('day')) + month = _MONTH_MATCHES[match.group('month')] + year = int(match.group('year')) - return _parse_clock(line) + hour, minute, second = map(match.group('time').split(':'), int) - raise exceptions.InvalidResponse('\n'.join(data)) + return datetime.datetime(year, month, day, hour, minute, second) - def set_datetime(self, date=datetime.datetime.now()): - """Sets the date and time of the glucometer. - Args: - date: The value to set the date/time of the glucometer to. If none is - given, the current date and time of the computer is used. +class Device(serial.SerialDevice): + BAUDRATE = 19200 + DEFAULT_CABLE_ID = '1a61:3420' - Returns: - A datetime object built according to the returned response. - """ - data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M')) + def _send_command(self, command): + cmd_bytes = bytes('$%s\r\n' % command, 'ascii') + logging.debug('Sending command: %r', cmd_bytes) - parsed_data = ''.join(data) - if parsed_data != 'CMD OK': - raise exceptions.InvalidResponse(parsed_data) + self.serial_.write(cmd_bytes) + self.serial_.flush() - return self.get_datetime() + response = self.serial_.readlines() - def zero_log(self): - """Zeros out the data log of the device. + logging.debug('Received response: %r', response) - This function will clear the memory of the device deleting all the readings - in an irrecoverable way. - """ - raise NotImplementedError + # We always want to decode the output, and remove stray \r\n. Any failure in + # decoding means the output is invalid anyway. + decoded_response = [line.decode('ascii').rstrip('\r\n') + for line in response] + return decoded_response - def get_readings(self): - """Iterates over the reading values stored in the glucometer. + def connect(self): + self._send_command('xmem') # ignore output this time + self._fetch_device_information() - Args: - unit: The glucose unit to use for the output. + def disconnect(self): + return - Yields: - A tuple (date, value) of the readings in the glucometer. The value is a - floating point in the unit specified; if no unit is specified, the default - unit in the glucometer will be used. + def _fetch_device_information(self): + data = self._send_command('colq') + + for line in data: + parsed_line = line.split('\t') - Raises: - exceptions.InvalidResponse: if the response does not match what expected. - """ - data = self._send_command('xmem') + if parsed_line[0] == 'S/N:': + self.device_serialno_ = parsed_line[1] + elif parsed_line[0] == 'Ver:': + self.device_version_ = parsed_line[1] + if parsed_line[2] == 'MMOL': + self.device_glucose_unit_ = common.Unit.MMOL_L + else: # I only have a mmol/l device, so I can't be sure. + self.device_glucose_unit_ = common.Unit.MG_DL + # There are more entries: Clock, Market, ROM and Usage, but we don't care + # for those here. + elif parsed_line[0] == 'CMD OK': + return + + # I have not figured out why this happens, but sometimes it's echoing back + # the commands and not replying to them. + raise exceptions.ConnectionFailed() + + def get_meter_info(self): + """Fetch and parses the device information. + + Returns: + A common.MeterInfo object. + """ + return common.MeterInfo( + 'Freestyle Optium glucometer', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) + + def get_version(self): + """Returns an identifier of the firmware version of the glucometer. + + Returns: + The software version returned by the glucometer, such as "0.22" + """ + return self.device_version_ + + def get_serial_number(self): + """Retrieve the serial number of the device. + + Returns: + A string representing the serial number of the device. + """ + return self.device_serialno_ + + def get_glucose_unit(self): + """Returns a constant representing the unit displayed by the meter. + + Returns: + common.Unit.MG_DL: if the glucometer displays in mg/dL + common.Unit.MMOL_L: if the glucometer displays in mmol/L + """ + return self.device_glucose_unit_ + + def get_datetime(self): + """Returns the current date and time for the glucometer. + + Returns: + A datetime object built according to the returned response. + """ + data = self._send_command('colq') + + for line in data: + if not line.startswith('Clock:'): + continue + + return _parse_clock(line) + + raise exceptions.InvalidResponse('\n'.join(data)) + + def set_datetime(self, date=datetime.datetime.now()): + """Sets the date and time of the glucometer. + + Args: + date: The value to set the date/time of the glucometer to. If none is + given, the current date and time of the computer is used. + + Returns: + A datetime object built according to the returned response. + """ + data = self._send_command(date.strftime('tim,%m,%d,%y,%H,%M')) + + parsed_data = ''.join(data) + if parsed_data != 'CMD OK': + raise exceptions.InvalidResponse(parsed_data) + + return self.get_datetime() + + def zero_log(self): + """Zeros out the data log of the device. + + This function will clear the memory of the device deleting all the readings + in an irrecoverable way. + """ + raise NotImplementedError + + def get_readings(self): + """Iterates over the reading values stored in the glucometer. + + Args: + unit: The glucose unit to use for the output. + + Yields: + A tuple (date, value) of the readings in the glucometer. The value is a + floating point in the unit specified; if no unit is specified, the default + unit in the glucometer will be used. + + Raises: + exceptions.InvalidResponse: if the response does not match what expected. + """ + data = self._send_command('xmem') - # The first line is empty, the second is the serial number, the third the - # version, the fourth the current time, and the fifth the record count.. The - # last line has a checksum and the end. - count = int(data[4]) - if count != (len(data) - 6): - raise exceptions.InvalidResponse('\n'.join(data)) + # The first line is empty, the second is the serial number, the third the + # version, the fourth the current time, and the fifth the record count.. The + # last line has a checksum and the end. + count = int(data[4]) + if count != (len(data) - 6): + raise exceptions.InvalidResponse('\n'.join(data)) - # Extract the checksum from the last line. - checksum_match = _CHECKSUM_RE.match(data[-1]) - if not checksum_match: - raise exceptions.InvalidResponse('\n'.join(data)) + # Extract the checksum from the last line. + checksum_match = _CHECKSUM_RE.match(data[-1]) + if not checksum_match: + raise exceptions.InvalidResponse('\n'.join(data)) - expected_checksum = int(checksum_match.group('checksum'), 16) - # exclude the last line in the checksum calculation, as that's the checksum - # itself. The final \r\n is added separately. - calculated_checksum = sum(ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa + expected_checksum = int(checksum_match.group('checksum'), 16) + # exclude the last line in the checksum calculation, as that's the checksum + # itself. The final \r\n is added separately. + calculated_checksum = sum(ord(c) for c in '\r\n'.join(data[:-1])) + 0xd + 0xa - if expected_checksum != calculated_checksum: - raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) + if expected_checksum != calculated_checksum: + raise exceptions.InvalidChecksum(expected_checksum, calculated_checksum) - for line in data[5:-1]: - match = _READING_RE.match(line) - if not match: - raise exceptions.InvalidResponse(line) + for line in data[5:-1]: + match = _READING_RE.match(line) + if not match: + raise exceptions.InvalidResponse(line) - if match.group('type') != 'G': - logging.warning('Non-glucose readings are not supported, ignoring.') - continue + if match.group('type') != 'G': + logging.warning('Non-glucose readings are not supported, ignoring.') + continue - if match.group('reading') == 'HI ': - value = float("inf") - else: - value = float(match.group('reading')) + if match.group('reading') == 'HI ': + value = float("inf") + else: + value = float(match.group('reading')) - day = int(match.group('day')) - month = _MONTH_MATCHES[match.group('month')] - year = int(match.group('year')) + day = int(match.group('day')) + month = _MONTH_MATCHES[match.group('month')] + year = int(match.group('year')) - hour, minute = map(int, match.group('time').split(':')) + hour, minute = map(int, match.group('time').split(':')) - timestamp = datetime.datetime(year, month, day, hour, minute) + timestamp = datetime.datetime(year, month, day, hour, minute) - # The reading, if present, is always in mg/dL even if the glucometer is - # set to mmol/L. - yield common.GlucoseReading(timestamp, value) + # The reading, if present, is always in mg/dL even if the glucometer is + # set to mmol/L. + yield common.GlucoseReading(timestamp, value) diff --git a/glucometerutils/drivers/otultra2.py b/glucometerutils/drivers/otultra2.py index 762991e..a27d333 100644 --- a/glucometerutils/drivers/otultra2.py +++ b/glucometerutils/drivers/otultra2.py @@ -57,273 +57,273 @@ _DUMP_LINE_RE = re.compile( _RESPONSE_MATCH = re.compile(r'^(.+) ([0-9A-F]{4})\r$') def _calculate_checksum(bytestring): - """Calculate the checksum used by OneTouch Ultra and Ultra2 devices + """Calculate the checksum used by OneTouch Ultra and Ultra2 devices - Args: - bytestring: the string of which the checksum has to be calculated. + Args: + bytestring: the string of which the checksum has to be calculated. - Returns: - A string with the hexdecimal representation of the checksum for the input. + Returns: + A string with the hexdecimal representation of the checksum for the input. - The checksum is a very stupid one: it just sums all the bytes, - modulo 16-bit, without any parity. - """ - checksum = 0 + The checksum is a very stupid one: it just sums all the bytes, + modulo 16-bit, without any parity. + """ + checksum = 0 - for byte in bytestring: - checksum = (checksum + byte) & 0xffff + for byte in bytestring: + checksum = (checksum + byte) & 0xffff - return checksum + return checksum def _validate_and_strip_checksum(line): - """Verify the simple 16-bit checksum and remove it from the line. + """Verify the simple 16-bit checksum and remove it from the line. - Args: - line: the line to check the checksum of. + Args: + line: the line to check the checksum of. - Returns: - A copy of the line with the checksum stripped out. - """ - match = _RESPONSE_MATCH.match(line) + Returns: + A copy of the line with the checksum stripped out. + """ + match = _RESPONSE_MATCH.match(line) - if not match: - raise lifescan.MissingChecksum(line) + if not match: + raise lifescan.MissingChecksum(line) - response, checksum_string = match.groups() + response, checksum_string = match.groups() - try: - checksum_given = int(checksum_string, 16) - checksum_calculated = _calculate_checksum( - bytes(response, 'ascii')) + try: + checksum_given = int(checksum_string, 16) + checksum_calculated = _calculate_checksum( + bytes(response, 'ascii')) - if checksum_given != checksum_calculated: - raise exceptions.InvalidChecksum(checksum_given, - checksum_calculated) - except ValueError: - raise exceptions.InvalidChecksum(checksum_given, None) + if checksum_given != checksum_calculated: + raise exceptions.InvalidChecksum(checksum_given, + checksum_calculated) + except ValueError: + raise exceptions.InvalidChecksum(checksum_given, None) - return response + return response _DATETIME_RE = re.compile( r'^"[A-Z]{3}","([0-9]{2}/[0-9]{2}/[0-9]{2})","([0-9]{2}:[0-9]{2}:[0-9]{2}) "$') def _parse_datetime(response): - """Convert a response with date and time from the meter into a datetime. + """Convert a response with date and time from the meter into a datetime. - Args: - response: the response coming from a DMF or DMT command + Args: + response: the response coming from a DMF or DMT command - Returns: - A datetime object built according to the returned response. + Returns: + A datetime object built according to the returned response. - Raises: - InvalidResponse if the string cannot be matched by _DATETIME_RE. - """ - match = _DATETIME_RE.match(response) - if not match: - raise exceptions.InvalidResponse(response) + Raises: + InvalidResponse if the string cannot be matched by _DATETIME_RE. + """ + match = _DATETIME_RE.match(response) + if not match: + raise exceptions.InvalidResponse(response) - date, time = match.groups() - month, day, year = map(int, date.split('/')) - hour, minute, second = map(int, time.split(':')) + date, time = match.groups() + month, day, year = map(int, date.split('/')) + hour, minute, second = map(int, time.split(':')) - # Yes, OneTouch2's firmware is not Y2K safe. - return datetime.datetime(2000 + year, month, day, hour, minute, second) + # Yes, OneTouch2's firmware is not Y2K safe. + return datetime.datetime(2000 + year, month, day, hour, minute, second) class Device(serial.SerialDevice): - BAUDRATE = 9600 - DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. + BAUDRATE = 9600 + DEFAULT_CABLE_ID = '067b:2303' # Generic PL2303 cable. - def connect(self): - return + def connect(self): + return - def disconnect(self): - return + def disconnect(self): + return - def _send_command(self, cmd): - """Send command interface. + def _send_command(self, cmd): + """Send command interface. - Args: - cmd: command and parameters to send (without newline) - """ - cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii') - self.serial_.write(cmdstring) - self.serial_.flush() + Args: + cmd: command and parameters to send (without newline) + """ + cmdstring = bytes('\x11\r' + cmd + '\r', 'ascii') + self.serial_.write(cmdstring) + self.serial_.flush() - def _send_oneliner_command(self, cmd): - """Send command and read a one-line response. + def _send_oneliner_command(self, cmd): + """Send command and read a one-line response. - Args: - cmd: command and parameters to send (without newline) + Args: + cmd: command and parameters to send (without newline) - Returns: - A single line of text that the glucometer responds, without the checksum. - """ - self._send_command(cmd) + Returns: + A single line of text that the glucometer responds, without the checksum. + """ + self._send_command(cmd) - line = self.serial_.readline().decode('ascii') - return _validate_and_strip_checksum(line) + line = self.serial_.readline().decode('ascii') + return _validate_and_strip_checksum(line) - def get_meter_info(self): - """Fetch and parses the device information. + def get_meter_info(self): + """Fetch and parses the device information. - Returns: - A common.MeterInfo object. - """ - return common.MeterInfo( - 'OneTouch Ultra 2 glucometer', - serial_number=self.get_serial_number(), - version_info=( - 'Software version: ' + self.get_version(),), - native_unit=self.get_glucose_unit()) + Returns: + A common.MeterInfo object. + """ + return common.MeterInfo( + 'OneTouch Ultra 2 glucometer', + serial_number=self.get_serial_number(), + version_info=( + 'Software version: ' + self.get_version(),), + native_unit=self.get_glucose_unit()) - def get_version(self): - """Returns an identifier of the firmware version of the glucometer. + def get_version(self): + """Returns an identifier of the firmware version of the glucometer. - Returns: - The software version returned by the glucometer, such as - "P02.00.00 30/08/06". - """ - response = self._send_oneliner_command('DM?') + Returns: + The software version returned by the glucometer, such as + "P02.00.00 30/08/06". + """ + response = self._send_oneliner_command('DM?') - if response[0] != '?': - raise exceptions.InvalidResponse(response) + if response[0] != '?': + raise exceptions.InvalidResponse(response) - return response[1:] + return response[1:] - _SERIAL_NUMBER_RE = re.compile('^@ "([A-Z0-9]{9})"$') + _SERIAL_NUMBER_RE = re.compile('^@ "([A-Z0-9]{9})"$') - def get_serial_number(self): - """Retrieve the serial number of the device. + def get_serial_number(self): + """Retrieve the serial number of the device. - Returns: - A string representing the serial number of the device. + Returns: + A string representing the serial number of the device. - Raises: - exceptions.InvalidResponse: if the DM@ command returns a string not - matching _SERIAL_NUMBER_RE. - InvalidSerialNumber: if the returned serial number does not match - the OneTouch2 device as per specs. - """ - response = self._send_oneliner_command('DM@') + Raises: + exceptions.InvalidResponse: if the DM@ command returns a string not + matching _SERIAL_NUMBER_RE. + InvalidSerialNumber: if the returned serial number does not match + the OneTouch2 device as per specs. + """ + response = self._send_oneliner_command('DM@') - match = self._SERIAL_NUMBER_RE.match(response) - if not match: - raise exceptions.InvalidResponse(response) + match = self._SERIAL_NUMBER_RE.match(response) + if not match: + raise exceptions.InvalidResponse(response) - serial_number = match.group(1) + serial_number = match.group(1) - # 'Y' at the far right of the serial number is the indication of a OneTouch - # Ultra2 device, as per specs. - if serial_number[-1] != 'Y': - raise lifescan.InvalidSerialNumber(serial_number) + # 'Y' at the far right of the serial number is the indication of a OneTouch + # Ultra2 device, as per specs. + if serial_number[-1] != 'Y': + raise lifescan.InvalidSerialNumber(serial_number) - return serial_number + return serial_number - def get_datetime(self): - """Returns the current date and time for the glucometer. + def get_datetime(self): + """Returns the current date and time for the glucometer. - Returns: - A datetime object built according to the returned response. - """ - response = self._send_oneliner_command('DMF') - return _parse_datetime(response[2:]) + Returns: + A datetime object built according to the returned response. + """ + response = self._send_oneliner_command('DMF') + return _parse_datetime(response[2:]) - def set_datetime(self, date=datetime.datetime.now()): - """Sets the date and time of the glucometer. + def set_datetime(self, date=datetime.datetime.now()): + """Sets the date and time of the glucometer. - Args: - date: The value to set the date/time of the glucometer to. If none is - given, the current date and time of the computer is used. + Args: + date: The value to set the date/time of the glucometer to. If none is + given, the current date and time of the computer is used. - Returns: - A datetime object built according to the returned response. - """ - response = self._send_oneliner_command( - 'DMT' + date.strftime('%m/%d/%y %H:%M:%S')) + Returns: + A datetime object built according to the returned response. + """ + response = self._send_oneliner_command( + 'DMT' + date.strftime('%m/%d/%y %H:%M:%S')) - return _parse_datetime(response[2:]) + return _parse_datetime(response[2:]) - def zero_log(self): - """Zeros out the data log of the device. + def zero_log(self): + """Zeros out the data log of the device. - This function will clear the memory of the device deleting all the readings - in an irrecoverable way. - """ - response = self._send_oneliner_command('DMZ') - if response != 'Z': - raise exceptions.InvalidResponse(response) + This function will clear the memory of the device deleting all the readings + in an irrecoverable way. + """ + response = self._send_oneliner_command('DMZ') + if response != 'Z': + raise exceptions.InvalidResponse(response) - _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') + _GLUCOSE_UNIT_RE = re.compile(r'^SU\?,"(MG/DL |MMOL/L)"') - def get_glucose_unit(self): - """Returns a constant representing the unit displayed by the meter. + def get_glucose_unit(self): + """Returns a constant representing the unit displayed by the meter. - Returns: - common.Unit.MG_DL: if the glucometer displays in mg/dL - common.Unit.MMOL_L: if the glucometer displays in mmol/L + Returns: + common.Unit.MG_DL: if the glucometer displays in mg/dL + common.Unit.MMOL_L: if the glucometer displays in mmol/L - Raises: - exceptions.InvalidGlucoseUnit: if the unit is not recognized + Raises: + exceptions.InvalidGlucoseUnit: if the unit is not recognized - OneTouch meters will always dump data in mg/dL because that's their internal - storage. They will then provide a separate method to read the unit used for - display. This is not settable by the user in all modern meters. + OneTouch meters will always dump data in mg/dL because that's their internal + storage. They will then provide a separate method to read the unit used for + display. This is not settable by the user in all modern meters. - """ - response = self._send_oneliner_command('DMSU?') + """ + response = self._send_oneliner_command('DMSU?') - match = self._GLUCOSE_UNIT_RE.match(response) - unit = match.group(1) + match = self._GLUCOSE_UNIT_RE.match(response) + unit = match.group(1) - if unit == 'MG/DL ': - return common.Unit.MG_DL - elif unit == 'MMOL/L': - return common.Unit.MMOL_L - else: - raise exceptions.InvalidGlucoseUnit(string) + if unit == 'MG/DL ': + return common.Unit.MG_DL + elif unit == 'MMOL/L': + return common.Unit.MMOL_L + else: + raise exceptions.InvalidGlucoseUnit(string) - def get_readings(self): - """Iterates over the reading values stored in the glucometer. + def get_readings(self): + """Iterates over the reading values stored in the glucometer. - Args: - unit: The glucose unit to use for the output. + Args: + unit: The glucose unit to use for the output. - Yields: - A tuple (date, value) of the readings in the glucometer. The value is a - floating point in the unit specified; if no unit is specified, the default - unit in the glucometer will be used. + Yields: + A tuple (date, value) of the readings in the glucometer. The value is a + floating point in the unit specified; if no unit is specified, the default + unit in the glucometer will be used. - Raises: - exceptions.InvalidResponse: if the response does not match what expected. - """ - self._send_command('DMP') - data = self.serial_.readlines() + Raises: + exceptions.InvalidResponse: if the response does not match what expected. + """ + self._send_command('DMP') + data = self.serial_.readlines() - header = data.pop(0).decode('ascii') - match = _DUMP_HEADER_RE.match(header) - if not match: - raise exceptions.InvalidResponse(header) + header = data.pop(0).decode('ascii') + match = _DUMP_HEADER_RE.match(header) + if not match: + raise exceptions.InvalidResponse(header) - count = int(match.group(1)) - assert count == len(data) + count = int(match.group(1)) + assert count == len(data) - for line in data: - line = _validate_and_strip_checksum(line.decode('ascii')) + for line in data: + line = _validate_and_strip_checksum(line.decode('ascii')) - match = _DUMP_LINE_RE.match(line) - if not match: - raise exceptions.InvalidResponse(line) + match = _DUMP_LINE_RE.match(line) + if not match: + raise exceptions.InvalidResponse(line) - line_data = match.groupdict() + line_data = match.groupdict() - date = _parse_datetime(line_data['datetime']) - meal = _MEAL_CODES[line_data['meal']] - comment = _COMMENT_CODES[line_data['comment']] + date = _parse_datetime(line_data['datetime']) + meal = _MEAL_CODES[line_data['meal']] + comment = _COMMENT_CODES[line_data['comment']] - # OneTouch2 always returns the data in mg/dL even if the glucometer is set - # to mmol/L, so there is no conversion required. - yield common.GlucoseReading( - date, float(line_data['value']), meal=meal, comment=comment) + # OneTouch2 always returns the data in mg/dL even if the glucometer is set + # to mmol/L, so there is no conversion required. + yield common.GlucoseReading( + date, float(line_data['value']), meal=meal, comment=comment) diff --git a/glucometerutils/exceptions.py b/glucometerutils/exceptions.py index e1d9200..9a0f1fd 100644 --- a/glucometerutils/exceptions.py +++ b/glucometerutils/exceptions.py @@ -7,49 +7,49 @@ __copyright__ = 'Copyright © 2013, Diego Elio Pettenò' __license__ = 'MIT' class Error(Exception): - """Base class for the errors.""" + """Base class for the errors.""" - def __str__(self): - return self.message + def __str__(self): + return self.message class CommandLineError(Error): - """Error with commandline parameters provided.""" + """Error with commandline parameters provided.""" - def __init__(self, message=''): - self.message = message + def __init__(self, message=''): + self.message = message class ConnectionFailed(Error): - """It was not possible to connect to the meter.""" + """It was not possible to connect to the meter.""" - def __init__(self, message='Unable to connect to the meter.'): - self.message = message + def __init__(self, message='Unable to connect to the meter.'): + self.message = message class CommandError(Error): - """It was not possible to send a command to the device.""" + """It was not possible to send a command to the device.""" - def __init__(self, message="Unable to send command to device."): - self.message = message + def __init__(self, message="Unable to send command to device."): + self.message = message class InvalidResponse(Error): - """The response received from the meter was not understood""" + """The response received from the meter was not understood""" - def __init__(self, response): - self.message = 'Invalid response received:\n%s' % response + def __init__(self, response): + self.message = 'Invalid response received:\n%s' % response class InvalidChecksum(InvalidResponse): - def __init__(self, expected, gotten): - self.message = ( - 'Response checksum not matching: %08x expected, %08x gotten' % - (expected, gotten)) + def __init__(self, expected, gotten): + self.message = ( + 'Response checksum not matching: %08x expected, %08x gotten' % + (expected, gotten)) class InvalidGlucoseUnit(Error): - """Unable to parse the given glucose unit""" + """Unable to parse the given glucose unit""" - def __init__(self, unit): - self.message = 'Invalid glucose unit received:\n%s' % unit + def __init__(self, unit): + self.message = 'Invalid glucose unit received:\n%s' % unit diff --git a/glucometerutils/glucometer.py b/glucometerutils/glucometer.py index 30efc83..97fe80e 100755 --- a/glucometerutils/glucometer.py +++ b/glucometerutils/glucometer.py @@ -17,123 +17,123 @@ from glucometerutils import common from glucometerutils import exceptions def main(): - if sys.version_info < (3, 4): - raise Exception( - 'Unsupported Python version, please use at least Python 3.4') - - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(dest="action") - - parser.add_argument( - '--driver', action='store', required=True, - help='Select the driver to use for connecting to the glucometer.') - parser.add_argument( - '--device', action='store', required=False, - help=('Select the path to the glucometer device. Some devices require this ' - 'argument, others will try autodetection.')) - - parser.add_argument( - '--vlog', action='store', required=False, type=int, - help=('Python logging level. See the levels at ' - 'https://docs.python.org/3/library/logging.html#logging-levels')) - - subparsers.add_parser( - 'help', help=('Display a description of the driver, including supported ' - 'features and known quirks.')) - subparsers.add_parser( - 'info', help='Display information about the meter.') - subparsers.add_parser( - 'zero', help='Zero out the data log of the meter.') - - parser_dump = subparsers.add_parser( - 'dump', help='Dump the readings stored in the device.') - parser_dump.add_argument( - '--unit', action='store', - choices=[unit.value for unit in common.Unit], - help='Select the unit to use for the dumped data.') - parser_dump.add_argument( - '--with-ketone', action='store_true', default=False, - help='Enable ketone reading if available on the glucometer.') - - parser_date = subparsers.add_parser( - 'datetime', help='Reads or sets the date and time of the glucometer.') - parser_date.add_argument( - '--set', action='store', nargs='?', const='now', default=None, - help='Set the date rather than just reading it from the device.') - - args = parser.parse_args() - - logging.basicConfig(level=args.vlog) - - try: - driver = importlib.import_module('glucometerutils.drivers.' + args.driver) - except ImportError as e: - logging.error( - 'Error importing driver "%s", please check your --driver parameter:\n%s', - args.driver, e) - return 1 - - # This check needs to happen before we try to initialize the device, as the - # help action does not require a --device at all. - if args.action == 'help': - print(inspect.getdoc(driver)) - return 0 - - device = driver.Device(args.device) - - device.connect() - device_info = device.get_meter_info() - - try: - if args.action == 'info': - try: - time_str = device.get_datetime() - except NotImplementedError: - time_str = 'N/A' - print("{device_info}Time: {time}".format( - device_info=str(device_info), time=time_str)) - elif args.action == 'dump': - unit = args.unit - if unit is None: - unit = device_info.native_unit - - readings = device.get_readings() - - if not args.with_ketone: - readings = (reading for reading in readings - if not isinstance(reading, common.KetoneReading)) - - for reading in sorted(readings, key=lambda r: r.timestamp): - print(reading.as_csv(unit)) - elif args.action == 'datetime': - if args.set == 'now': - print(device.set_datetime()) - elif args.set: - try: - from dateutil import parser as date_parser - new_date = date_parser.parse(args.set) - except ImportError: - logging.error( - 'Unable to import module "dateutil", please install it.') - return 1 - except ValueError: - logging.error('%s: not a valid date', args.set) - return 1 - print(device.set_datetime(new_date)) - else: - print(device.get_datetime()) - elif args.action == 'zero': - confirm = input('Delete the device data log? (y/N) ') - if confirm.lower() in ['y', 'ye', 'yes']: - device.zero_log() - print('\nDevice data log zeroed.') - else: - print('\nDevice data log not zeroed.') + if sys.version_info < (3, 4): + raise Exception( + 'Unsupported Python version, please use at least Python 3.4') + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest="action") + + parser.add_argument( + '--driver', action='store', required=True, + help='Select the driver to use for connecting to the glucometer.') + parser.add_argument( + '--device', action='store', required=False, + help=('Select the path to the glucometer device. Some devices require this ' + 'argument, others will try autodetection.')) + + parser.add_argument( + '--vlog', action='store', required=False, type=int, + help=('Python logging level. See the levels at ' + 'https://docs.python.org/3/library/logging.html#logging-levels')) + + subparsers.add_parser( + 'help', help=('Display a description of the driver, including supported ' + 'features and known quirks.')) + subparsers.add_parser( + 'info', help='Display information about the meter.') + subparsers.add_parser( + 'zero', help='Zero out the data log of the meter.') + + parser_dump = subparsers.add_parser( + 'dump', help='Dump the readings stored in the device.') + parser_dump.add_argument( + '--unit', action='store', + choices=[unit.value for unit in common.Unit], + help='Select the unit to use for the dumped data.') + parser_dump.add_argument( + '--with-ketone', action='store_true', default=False, + help='Enable ketone reading if available on the glucometer.') + + parser_date = subparsers.add_parser( + 'datetime', help='Reads or sets the date and time of the glucometer.') + parser_date.add_argument( + '--set', action='store', nargs='?', const='now', default=None, + help='Set the date rather than just reading it from the device.') + + args = parser.parse_args() + + logging.basicConfig(level=args.vlog) + + try: + driver = importlib.import_module('glucometerutils.drivers.' + args.driver) + except ImportError as e: + logging.error( + 'Error importing driver "%s", please check your --driver parameter:\n%s', + args.driver, e) return 1 - else: - return 1 - except exceptions.Error as err: - print('Error while executing \'%s\': %s' % (args.action, str(err))) - return 1 - device.disconnect() + # This check needs to happen before we try to initialize the device, as the + # help action does not require a --device at all. + if args.action == 'help': + print(inspect.getdoc(driver)) + return 0 + + device = driver.Device(args.device) + + device.connect() + device_info = device.get_meter_info() + + try: + if args.action == 'info': + try: + time_str = device.get_datetime() + except NotImplementedError: + time_str = 'N/A' + print("{device_info}Time: {time}".format( + device_info=str(device_info), time=time_str)) + elif args.action == 'dump': + unit = args.unit + if unit is None: + unit = device_info.native_unit + + readings = device.get_readings() + + if not args.with_ketone: + readings = (reading for reading in readings + if not isinstance(reading, common.KetoneReading)) + + for reading in sorted(readings, key=lambda r: r.timestamp): + print(reading.as_csv(unit)) + elif args.action == 'datetime': + if args.set == 'now': + print(device.set_datetime()) + elif args.set: + try: + from dateutil import parser as date_parser + new_date = date_parser.parse(args.set) + except ImportError: + logging.error( + 'Unable to import module "dateutil", please install it.') + return 1 + except ValueError: + logging.error('%s: not a valid date', args.set) + return 1 + print(device.set_datetime(new_date)) + else: + print(device.get_datetime()) + elif args.action == 'zero': + confirm = input('Delete the device data log? (y/N) ') + if confirm.lower() in ['y', 'ye', 'yes']: + device.zero_log() + print('\nDevice data log zeroed.') + else: + print('\nDevice data log not zeroed.') + return 1 + else: + return 1 + except exceptions.Error as err: + print('Error while executing \'%s\': %s' % (args.action, str(err))) + return 1 + + device.disconnect() diff --git a/glucometerutils/support/lifescan.py b/glucometerutils/support/lifescan.py index 4df0b8f..13529ec 100644 --- a/glucometerutils/support/lifescan.py +++ b/glucometerutils/support/lifescan.py @@ -10,43 +10,43 @@ from glucometerutils import exceptions class MissingChecksum(exceptions.InvalidResponse): - """The response misses the expected 4-digits checksum.""" - def __init__(self, response): - self.message = 'Response is missing checksum: %s' % response + """The response misses the expected 4-digits checksum.""" + def __init__(self, response): + self.message = 'Response is missing checksum: %s' % response class InvalidSerialNumber(exceptions.Error): - """The serial number is not as expected.""" - def __init__(self, serial_number): - self.message = 'Serial number %s is invalid.' % serial_number + """The serial number is not as expected.""" + def __init__(self, serial_number): + self.message = 'Serial number %s is invalid.' % serial_number class MalformedCommand(exceptions.InvalidResponse): - def __init__(self, message): - exceptions.InvalidResponse.__init__( - self, 'Malformed command: %s' % message) + def __init__(self, message): + exceptions.InvalidResponse.__init__( + self, 'Malformed command: %s' % message) def crc_ccitt(data): - # type: (bytes) -> int - """Calculate the CRC-16-CCITT with LifeScan's common seed. + # type: (bytes) -> int + """Calculate the CRC-16-CCITT with LifeScan's common seed. - Args: - data: (bytes) the data to calculate the checksum of + Args: + data: (bytes) the data to calculate the checksum of - Returns: - (int) The 16-bit integer value of the CRC-CCITT calculated. + Returns: + (int) The 16-bit integer value of the CRC-CCITT calculated. - This function uses the non-default 0xFFFF seed as used by multiple - LifeScan meters. - """ - crc = 0xffff + This function uses the non-default 0xFFFF seed as used by multiple + LifeScan meters. + """ + crc = 0xffff - for byte in data: - crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff - crc ^= byte - crc ^= (crc & 0xff) >> 4 - crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff - crc ^= (crc & 0xff) << 5 + for byte in data: + crc = (crc >> 8) & 0xffff | (crc << 8) & 0xffff + crc ^= byte + crc ^= (crc & 0xff) >> 4 + crc ^= (((crc << 8) & 0xffff) << 4) & 0xffff + crc ^= (crc & 0xff) << 5 - return (crc & 0xffff) + return (crc & 0xffff) |