summaryrefslogblamecommitdiffstats
path: root/glucometerutils/drivers/fslibre.py
blob: 478668d0238468133506f18f201b0b57b8bbb350 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                       

                              




                                                                         

                                             
 

                                                                             


                                                          
                                                         
 
   
 




                                             
                                                                
                                      
                   
                     
                





                            



                                             



                   

                                                             


                                                    











                                                                       
                                       







                       











                                                                       





                                                           

 
                                                 





                                                                              





                                              







                                                                                




                                                                              
                                                                                        

                                              
                                                      

                                                                 
         

                   









                                                                     
                         

                


                                          
                                                     

                                      

                                            
                                                              



                                            
                                                              

                                                       
                                                                     
         
                         






















                                                                            

                                                                 



                                                       


                                                                     
                                 

                                                                  


                                                        

               
                                          
              
                                         
                                      

                                                             



                                                        

                           






                                                               

                                                 




                                                                   
                                                              


                                                                            
                                

                           
 



                                                                          
 
                                                                 


                                                                  
                                        

                                                  
                                   
                                                            
                                                                     
             
 





                                                                           
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: MIT
"""Driver for FreeStyle Libre devices.

Supported features:
    - get readings (sensor, flash and blood glucose), including comments;
    - get and set date and time;
    - get serial number and software version;
    - get and set patient name.

Expected device path: /dev/hidraw9 or similar HID device. Optional when using
HIDAPI.

Further information on the device protocol can be found at

https://protocols.glucometers.tech/abbott/freestyle-libre

"""

import datetime

from glucometerutils import common
from glucometerutils.support import freestyle

# Fields of the records returned by both $history and $arresult?
# Tuple of pairs of idx and field name
_BASE_ENTRY_MAP = (
    (0, 'device_id'),
    (1, 'type'),
    (2, 'month'),
    (3, 'day'),
    (4, 'year'),  # 2-digits
    (5, 'hour'),
    (6, 'minute'),
    (7, 'second'),
)

# Fields of the records returned by $history?
_HISTORY_ENTRY_MAP = _BASE_ENTRY_MAP + (
    (13, 'value'),
    (15, 'errors'),
)

# Fields of the results returned by $arresult? where type = 2
_ARRESULT_TYPE2_ENTRY_MAP = (
    (9, 'reading-type'),  # 0 = glucose blood strip,
                          # 1 = ketone blood strip,
                          # 2 = glucose sensor
    (12, 'value'),
    (15, 'sport-flag'),
    (16, 'medication-flag'),
    (17, 'rapid-acting-flag'),  # see _ARRESULT_RAPID_INSULIN_ENTRY_MAP
    (18, 'long-acting-flag'),
    (19, 'custom-comments-bitfield'),
    (23, 'double-long-acting-insulin'),
    (25, 'food-flag'),
    (26, 'food-carbs-grams'),
    (28, 'errors'),
)

_ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP = (
    (9, 'old_month'),
    (10, 'old_day'),
    (11, 'old_year'),
    (12, 'old_hour'),
    (13, 'old_minute'),
    (14, 'old_second'),
)

# Fields only valid when rapid-acting-flag is "1"
_ARRESULT_RAPID_INSULIN_ENTRY_MAP = (
    (43, 'double-rapid-acting-insulin'),
)


def _parse_record(record, entry_map):
    """Parses a list of string fields into a dictionary of integers."""

    if not record:
        return {}

    try:
        return {
            key: int(record[idx]) for idx, key in entry_map
        }
    except IndexError:
        return {}


def _extract_timestamp(parsed_record, prefix=''):
    """Extract the timestamp from a parsed record.

    This leverages the fact that all the records have the same base structure.
    """

    return datetime.datetime(
        parsed_record[prefix + 'year'] + 2000,
        parsed_record[prefix + 'month'],
        parsed_record[prefix + 'day'],
        parsed_record[prefix + 'hour'],
        parsed_record[prefix + 'minute'],
        parsed_record[prefix + 'second'])


def _parse_arresult(record):
    """Takes an array of string fields as input and parses it into a Reading."""

    parsed_record = _parse_record(record, _BASE_ENTRY_MAP)

    # There are other record types, but we don't currently need to expose these.
    if not parsed_record:
        return None
    elif parsed_record['type'] == 2:
        parsed_record.update(_parse_record(record, _ARRESULT_TYPE2_ENTRY_MAP))
    elif parsed_record['type'] == 5:
        parsed_record.update(_parse_record(record, _ARRESULT_TIME_ADJUSTMENT_ENTRY_MAP))
        return common.TimeAdjustment(
            _extract_timestamp(parsed_record),
            _extract_timestamp(parsed_record, 'old_'),
            extra_data={'device_id': parsed_record['device_id']},
        )
    else:
        return None


    # Check right away if we have rapid insulin
    if parsed_record['rapid-acting-flag']:
        parsed_record.update(
            _parse_record(record, _ARRESULT_RAPID_INSULIN_ENTRY_MAP))

    if parsed_record['errors']:
        return None

    comment_parts = []
    measure_method = None
    cls = None
    value = None

    if parsed_record['reading-type'] == 2:
        comment_parts.append('(Scan)')
        measure_method = common.MeasurementMethod.CGM
        cls = common.GlucoseReading
        value = parsed_record['value']
    elif parsed_record['reading-type'] == 0:
        comment_parts.append('(Blood)')
        measure_method = common.MeasurementMethod.BLOOD_SAMPLE
        cls = common.GlucoseReading
        value = parsed_record['value']
    elif parsed_record['reading-type'] == 1:
        comment_parts.append('(Ketone)')
        measure_method = common.MeasurementMethod.BLOOD_SAMPLE
        cls = common.KetoneReading
        # automatically convert the raw value in mmol/L
        value = freestyle.convert_ketone_unit(parsed_record['value'])
    else:
        # unknown reading
        return None

    custom_comments = record[29:35]
    for comment_index in range(6):
        if parsed_record['custom-comments-bitfield'] & (1 << comment_index):
            comment_parts.append(custom_comments[comment_index][1:-1])

    if parsed_record['sport-flag']:
        comment_parts.append('Sport')

    if parsed_record['medication-flag']:
        comment_parts.append('Medication')

    if parsed_record['food-flag']:
        if parsed_record['food-carbs-grams']:
            comment_parts.append(
                'Food (%d g)' % parsed_record['food-carbs-grams'])
        else:
            comment_parts.append('Food')

    if parsed_record['long-acting-flag']:
        if parsed_record['double-long-acting-insulin']:
            comment_parts.append(
                'Long-acting insulin (%.1f)' %
                (parsed_record['double-long-acting-insulin']/2.))
        else:
            comment_parts.append('Long-acting insulin')

    if parsed_record['rapid-acting-flag']:
        # provide default value, as this record does not always exist
        # (even if rapid-acting-flag is set)
        if parsed_record.get('double-rapid-acting-insulin', 0):
            comment_parts.append(
                'Rapid-acting insulin (%.1f)' %
                (parsed_record['double-rapid-acting-insulin']/2.))
        else:
            comment_parts.append('Rapid-acting insulin')


    return cls(
        _extract_timestamp(parsed_record),
        value,
        comment='; '.join(comment_parts),
        measure_method=measure_method,
        extra_data={'device_id': parsed_record['device_id']},
    )

class Device(freestyle.FreeStyleHidDevice):
    """Glucometer driver for FreeStyle Libre devices."""

    USB_PRODUCT_ID = 0x3650

    def get_meter_info(self):
        """Return the device information in structured form."""
        return common.MeterInfo(
            'FreeStyle Libre',
            serial_number=self.get_serial_number(),
            version_info=(
                'Software version: ' + self._get_version(),),
            native_unit=self.get_glucose_unit(),
            patient_name=self.get_patient_name())

    def get_serial_number(self):
        """Overridden function as the command is not compatible."""
        return self._send_text_command(b'$sn?').rstrip('\r\n')

    def get_glucose_unit(self):  # pylint: disable=no-self-use
        """Returns the glucose unit of the device."""
        # TODO(Flameeyes): figure out how to identify the actual unit on the
        # device.
        return common.Unit.MG_DL

    def get_readings(self):

        # First of all get the usually longer list of sensor readings, and
        # convert them to Readings objects.
        for record in self._get_multirecord(b'$history?'):
            parsed_record = _parse_record(record, _HISTORY_ENTRY_MAP)

            if not parsed_record or parsed_record['errors'] != 0:
                # The reading is considered invalid, so ignore it.
                continue

            yield common.GlucoseReading(
                _extract_timestamp(parsed_record),
                parsed_record['value'],
                comment='(Sensor)',
                measure_method=common.MeasurementMethod.CGM,
                extra_data={'device_id': parsed_record['device_id']},
            )

        # Then get the results of explicit scans and blood tests (and other
        # events).
        for record in self._get_multirecord(b'$arresult?'):
            reading = _parse_arresult(record)
            if reading:
                yield reading