summaryrefslogtreecommitdiffstats
path: root/_receiver/iq_header.py
blob: f439019d02d8f87281171a99545f82e4102120ab (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from struct import pack,unpack
import logging
import sys
"""
    Desctiption: IQ Frame header definition
    For header field description check the corresponding documentation
    Total length: 1024 byte
    Project: HeIMDALL RTL
    Author: Tamás Pető
    Status: Finished
    Version history:
            1 : Initial version (2019 04 23)
            2 : Fixed 1024 byte length (2019 07 25)
            3 : Noise source state (2019 10 01)
            4 : IQ sync flag (2019 10 21)
            5 : Sync state (2019 11 10)
            6 : Unix Epoch timestamp (2019 12 17) 
            6a: Frame type defines (2020 03 19)
            7 : Sync word (2020 05 03)
"""
class IQHeader():

    FRAME_TYPE_DATA  = 0
    FRAME_TYPE_DUMMY = 1
    FRAME_TYPE_RAMP  = 2
    FRAME_TYPE_CAL   = 3
    FRAME_TYPE_TRIGW = 4
    
    SYNC_WORD = 0x2bf7b95a

    def __init__(self):
        
        self.logger = logging.getLogger(__name__)
        self.header_size = 1024 # size in bytes
        self.reserved_bytes = 192        

        self.sync_word=self.SYNC_WORD        # uint32_t        
        self.frame_type=0                    # uint32_t 
        self.hardware_id=""                  # char [16]
        self.unit_id=0                       # uint32_t 
        self.active_ant_chs=0                # uint32_t 
        self.ioo_type=0                      # uint32_t 
        self.rf_center_freq=0                # uint64_t 
        self.adc_sampling_freq=0             # uint64_t 
        self.sampling_freq=0                 # uint64_t 
        self.cpi_length=0                    # uint32_t 
        self.time_stamp=0                    # uint64_t 
        self.daq_block_index=0               # uint32_t
        self.cpi_index=0                     # uint32_t 
        self.ext_integration_cntr=0          # uint64_t 
        self.data_type=0                     # uint32_t 
        self.sample_bit_depth=0              # uint32_t 
        self.adc_overdrive_flags=0           # uint32_t 
        self.if_gains=[0]*32                 # uint32_t x 32
        self.delay_sync_flag=0               # uint32_t
        self.iq_sync_flag=0                  # uint32_t
        self.sync_state=0                    # uint32_t
        self.noise_source_state=0            # uint32_t        
        self.reserved=[0]*self.reserved_bytes# uint32_t x reserverd_bytes
        self.header_version=0                # uint32_t 

    def decode_header(self, iq_header_byte_array):
        """
            Unpack,decode and store the content of the iq header
        """
        iq_header_list = unpack("II16sIIIQQQIQIIQIII"+"I"*32+"IIII"+"I"*self.reserved_bytes+"I", iq_header_byte_array)
        
        self.sync_word            = iq_header_list[0]
        self.frame_type           = iq_header_list[1]
        self.hardware_id          = iq_header_list[2].decode()
        self.unit_id              = iq_header_list[3]
        self.active_ant_chs       = iq_header_list[4]
        self.ioo_type             = iq_header_list[5]
        self.rf_center_freq       = iq_header_list[6]
        self.adc_sampling_freq    = iq_header_list[7]
        self.sampling_freq        = iq_header_list[8]
        self.cpi_length           = iq_header_list[9]
        self.time_stamp           = iq_header_list[10]
        self.daq_block_index      = iq_header_list[11]
        self.cpi_index            = iq_header_list[12]
        self.ext_integration_cntr = iq_header_list[13]
        self.data_type            = iq_header_list[14]
        self.sample_bit_depth     = iq_header_list[15]
        self.adc_overdrive_flags  = iq_header_list[16]
        self.if_gains             = iq_header_list[17:49]
        self.delay_sync_flag      = iq_header_list[49]
        self.iq_sync_flag         = iq_header_list[50]
        self.sync_state           = iq_header_list[51]  
        self.noise_source_state   = iq_header_list[52]
        self.header_version       = iq_header_list[52+self.reserved_bytes+1]

    def encode_header(self):
        """
            Pack the iq header information into a byte array
        """
        iq_header_byte_array=pack("II", self.sync_word, self.frame_type)
        iq_header_byte_array+=self.hardware_id.encode()+bytearray(16-len(self.hardware_id.encode()))
        iq_header_byte_array+=pack("IIIQQQIQIIQIII",
                                self.unit_id, self.active_ant_chs, self.ioo_type, self.rf_center_freq, self.adc_sampling_freq,
                                self.sampling_freq, self.cpi_length, self.time_stamp, self.daq_block_index, self.cpi_index, 
                                self.ext_integration_cntr, self.data_type, self.sample_bit_depth, self.adc_overdrive_flags)
        for m in range(32):
            iq_header_byte_array+=pack("I", self.if_gains[m])

        iq_header_byte_array+=pack("I", self.delay_sync_flag)
        iq_header_byte_array+=pack("I", self.iq_sync_flag)
        iq_header_byte_array+=pack("I", self.sync_state)
        iq_header_byte_array+=pack("I", self.noise_source_state)

        for m in range(self.reserved_bytes):
            iq_header_byte_array+=pack("I",0)

        iq_header_byte_array+=pack("I", self.header_version)
        return iq_header_byte_array

    def dump_header(self):
        """
            Prints out the content of the header in human readable format
        """
        self.logger.info("Sync word: {:d}".format(self.sync_word))        
        self.logger.info("Header version: {:d}".format(self.header_version))        
        self.logger.info("Frame type: {:d}".format(self.frame_type))
        self.logger.info("Hardware ID: {:16}".format(self.hardware_id))
        self.logger.info("Unit ID: {:d}".format(self.unit_id))
        self.logger.info("Active antenna channels: {:d}".format(self.active_ant_chs))
        self.logger.info("Illuminator type: {:d}".format(self.ioo_type))
        self.logger.info("RF center frequency: {:.2f} MHz".format(self.rf_center_freq/10**6))
        self.logger.info("ADC sampling frequency: {:.2f} MHz".format(self.adc_sampling_freq/10**6))
        self.logger.info("IQ sampling frequency {:.2f} MHz".format(self.sampling_freq/10**6))
        self.logger.info("CPI length: {:d}".format(self.cpi_length))
        self.logger.info("Unix Epoch timestamp: {:d}".format(self.time_stamp))
        self.logger.info("DAQ block index: {:d}".format(self.daq_block_index))
        self.logger.info("CPI index: {:d}".format(self.cpi_index))
        self.logger.info("Extended integration counter {:d}".format(self.ext_integration_cntr))
        self.logger.info("Data type: {:d}".format(self.data_type))
        self.logger.info("Sample bit depth: {:d}".format(self.sample_bit_depth))
        self.logger.info("ADC overdrive flags: {:d}".format(self.adc_overdrive_flags))    
        for m in range(32):
            self.logger.info("Ch: {:d} IF gain: {:.1f} dB".format(m, self.if_gains[m]/10))
        self.logger.info("Delay sync  flag: {:d}".format(self.delay_sync_flag))
        self.logger.info("IQ sync  flag: {:d}".format(self.iq_sync_flag))
        self.logger.info("Sync state: {:d}".format(self.sync_state))
        self.logger.info("Noise source state: {:d}".format(self.noise_source_state))
    
    def check_sync_word(self):
        """
            Check the sync word of the header
        """
        if self.sync_word != self.SYNC_WORD:
            return -1
        else:
            return 0