diff options
Diffstat (limited to 'util/_receiver')
-rwxr-xr-x | util/_receiver/iq_header.py | 152 | ||||
-rwxr-xr-x | util/_receiver/krakenSDR_receiver.py | 375 | ||||
-rwxr-xr-x | util/_receiver/shmemIface.py | 195 |
3 files changed, 0 insertions, 722 deletions
diff --git a/util/_receiver/iq_header.py b/util/_receiver/iq_header.py deleted file mode 100755 index f439019..0000000 --- a/util/_receiver/iq_header.py +++ /dev/null @@ -1,152 +0,0 @@ -from struct import pack,unpack -import logging -import sys -""" - Desctiption: IQ Frame header definition - For header field description check the corresponding documentation - Total length: 1024 byte - Project: HeIMDALL RTL - Author: Tamás Pető - Status: Finished - Version history: - 1 : Initial version (2019 04 23) - 2 : Fixed 1024 byte length (2019 07 25) - 3 : Noise source state (2019 10 01) - 4 : IQ sync flag (2019 10 21) - 5 : Sync state (2019 11 10) - 6 : Unix Epoch timestamp (2019 12 17) - 6a: Frame type defines (2020 03 19) - 7 : Sync word (2020 05 03) -""" -class IQHeader(): - - FRAME_TYPE_DATA = 0 - FRAME_TYPE_DUMMY = 1 - FRAME_TYPE_RAMP = 2 - FRAME_TYPE_CAL = 3 - FRAME_TYPE_TRIGW = 4 - - SYNC_WORD = 0x2bf7b95a - - def __init__(self): - - self.logger = logging.getLogger(__name__) - self.header_size = 1024 # size in bytes - self.reserved_bytes = 192 - - self.sync_word=self.SYNC_WORD # uint32_t - self.frame_type=0 # uint32_t - self.hardware_id="" # char [16] - self.unit_id=0 # uint32_t - self.active_ant_chs=0 # uint32_t - self.ioo_type=0 # uint32_t - self.rf_center_freq=0 # uint64_t - self.adc_sampling_freq=0 # uint64_t - self.sampling_freq=0 # uint64_t - self.cpi_length=0 # uint32_t - self.time_stamp=0 # uint64_t - self.daq_block_index=0 # uint32_t - self.cpi_index=0 # uint32_t - self.ext_integration_cntr=0 # uint64_t - self.data_type=0 # uint32_t - self.sample_bit_depth=0 # uint32_t - self.adc_overdrive_flags=0 # uint32_t - self.if_gains=[0]*32 # uint32_t x 32 - self.delay_sync_flag=0 # uint32_t - self.iq_sync_flag=0 # uint32_t - self.sync_state=0 # uint32_t - self.noise_source_state=0 # uint32_t - self.reserved=[0]*self.reserved_bytes# uint32_t x reserverd_bytes - self.header_version=0 # uint32_t - - def decode_header(self, iq_header_byte_array): - """ - Unpack,decode and store the content of the iq header - """ - iq_header_list = unpack("II16sIIIQQQIQIIQIII"+"I"*32+"IIII"+"I"*self.reserved_bytes+"I", iq_header_byte_array) - - self.sync_word = iq_header_list[0] - self.frame_type = iq_header_list[1] - self.hardware_id = iq_header_list[2].decode() - self.unit_id = iq_header_list[3] - self.active_ant_chs = iq_header_list[4] - self.ioo_type = iq_header_list[5] - self.rf_center_freq = iq_header_list[6] - self.adc_sampling_freq = iq_header_list[7] - self.sampling_freq = iq_header_list[8] - self.cpi_length = iq_header_list[9] - self.time_stamp = iq_header_list[10] - self.daq_block_index = iq_header_list[11] - self.cpi_index = iq_header_list[12] - self.ext_integration_cntr = iq_header_list[13] - self.data_type = iq_header_list[14] - self.sample_bit_depth = iq_header_list[15] - self.adc_overdrive_flags = iq_header_list[16] - self.if_gains = iq_header_list[17:49] - self.delay_sync_flag = iq_header_list[49] - self.iq_sync_flag = iq_header_list[50] - self.sync_state = iq_header_list[51] - self.noise_source_state = iq_header_list[52] - self.header_version = iq_header_list[52+self.reserved_bytes+1] - - def encode_header(self): - """ - Pack the iq header information into a byte array - """ - iq_header_byte_array=pack("II", self.sync_word, self.frame_type) - iq_header_byte_array+=self.hardware_id.encode()+bytearray(16-len(self.hardware_id.encode())) - iq_header_byte_array+=pack("IIIQQQIQIIQIII", - self.unit_id, self.active_ant_chs, self.ioo_type, self.rf_center_freq, self.adc_sampling_freq, - self.sampling_freq, self.cpi_length, self.time_stamp, self.daq_block_index, self.cpi_index, - self.ext_integration_cntr, self.data_type, self.sample_bit_depth, self.adc_overdrive_flags) - for m in range(32): - iq_header_byte_array+=pack("I", self.if_gains[m]) - - iq_header_byte_array+=pack("I", self.delay_sync_flag) - iq_header_byte_array+=pack("I", self.iq_sync_flag) - iq_header_byte_array+=pack("I", self.sync_state) - iq_header_byte_array+=pack("I", self.noise_source_state) - - for m in range(self.reserved_bytes): - iq_header_byte_array+=pack("I",0) - - iq_header_byte_array+=pack("I", self.header_version) - return iq_header_byte_array - - def dump_header(self): - """ - Prints out the content of the header in human readable format - """ - self.logger.info("Sync word: {:d}".format(self.sync_word)) - self.logger.info("Header version: {:d}".format(self.header_version)) - self.logger.info("Frame type: {:d}".format(self.frame_type)) - self.logger.info("Hardware ID: {:16}".format(self.hardware_id)) - self.logger.info("Unit ID: {:d}".format(self.unit_id)) - self.logger.info("Active antenna channels: {:d}".format(self.active_ant_chs)) - self.logger.info("Illuminator type: {:d}".format(self.ioo_type)) - self.logger.info("RF center frequency: {:.2f} MHz".format(self.rf_center_freq/10**6)) - self.logger.info("ADC sampling frequency: {:.2f} MHz".format(self.adc_sampling_freq/10**6)) - self.logger.info("IQ sampling frequency {:.2f} MHz".format(self.sampling_freq/10**6)) - self.logger.info("CPI length: {:d}".format(self.cpi_length)) - self.logger.info("Unix Epoch timestamp: {:d}".format(self.time_stamp)) - self.logger.info("DAQ block index: {:d}".format(self.daq_block_index)) - self.logger.info("CPI index: {:d}".format(self.cpi_index)) - self.logger.info("Extended integration counter {:d}".format(self.ext_integration_cntr)) - self.logger.info("Data type: {:d}".format(self.data_type)) - self.logger.info("Sample bit depth: {:d}".format(self.sample_bit_depth)) - self.logger.info("ADC overdrive flags: {:d}".format(self.adc_overdrive_flags)) - for m in range(32): - self.logger.info("Ch: {:d} IF gain: {:.1f} dB".format(m, self.if_gains[m]/10)) - self.logger.info("Delay sync flag: {:d}".format(self.delay_sync_flag)) - self.logger.info("IQ sync flag: {:d}".format(self.iq_sync_flag)) - self.logger.info("Sync state: {:d}".format(self.sync_state)) - self.logger.info("Noise source state: {:d}".format(self.noise_source_state)) - - def check_sync_word(self): - """ - Check the sync word of the header - """ - if self.sync_word != self.SYNC_WORD: - return -1 - else: - return 0 diff --git a/util/_receiver/krakenSDR_receiver.py b/util/_receiver/krakenSDR_receiver.py deleted file mode 100755 index a9e2d63..0000000 --- a/util/_receiver/krakenSDR_receiver.py +++ /dev/null @@ -1,375 +0,0 @@ -# KrakenSDR Receiver - -# Copyright (C) 2018-2021 Carl Laufer, Tamás Pető -# -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <https://www.gnu.org/licenses/>. -# - -# -*- coding: utf-8 -*- - -# Import built-in modules -import sys -import os -import time -from struct import pack, unpack -import socket -import _thread -from threading import Lock -import queue -import logging -#import copy - -# Import third party modules -import numpy as np -from scipy import signal -from iq_header import IQHeader -from shmemIface import inShmemIface -class ReceiverRTLSDR(): - - def __init__(self, data_que, data_interface = "eth", logging_level=10): - """ - Parameter: - ---------- - :param: data_que: Que to communicate with the UI (web iface/Qt GUI) - :param: data_interface: This field is configured by the GUI during instantiation. - Valid values are the followings: - "eth" : The module will receiver IQ frames through an Ethernet connection - "shmem": The module will receiver IQ frames through a shared memory interface - :type : data_interface: string - """ - self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging_level) - - # DAQ parameters - # These values are used by default to configure the DAQ through the configuration interface - # Values are configured externally upon configuration request - self.daq_center_freq = 100 # MHz - self.daq_rx_gain = 0 # [dB] - self.daq_squelch_th_dB = 0 - - # UI interface - self.data_que = data_que - - # IQ data interface - self.data_interface = data_interface - - # -> Ethernet - self.receiver_connection_status = False - self.port = 5000 - self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request - self.socket_inst = socket.socket() - self.receiverBufferSize = 2 ** 18 # Size of the Ethernet receiver buffer measured in bytes - - # -> Shared memory - root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - daq_path = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw") - self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/") - self.init_data_iface() - - # Control interface - self.ctr_iface_socket = socket.socket() - self.ctr_iface_port = 5001 - self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread - - self.iq_frame_bytes = None - self.iq_samples = None - self.iq_header = IQHeader() - self.M = 0 # Number of receiver channels, updated after establishing connection - - def init_data_iface(self): - if self.data_interface == "shmem": - # Open shared memory interface to capture the DAQ firmware output - self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path) - if not self.in_shmem_iface.init_ok: - self.logger.critical("Shared memory initialization failed") - self.in_shmem_iface.destory_sm_buffer() - return -1 - return 0 - - - def eth_connect(self): - """ - Compatible only with DAQ firmwares that has the IQ streaming mode. - HeIMDALL DAQ Firmware version: 1.0 or later - """ - try: - if not self.receiver_connection_status: - if self.data_interface == "eth": - # Establlish IQ data interface connection - self.socket_inst.connect((self.rec_ip_addr, self.port)) - self.socket_inst.sendall(str.encode('streaming')) - test_iq = self.receive_iq_frame() - - self.M = self.iq_header.active_ant_chs - - # Establish control interface connection - self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port)) - self.receiver_connection_status = True - self.ctr_iface_init() - self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq)) - self.set_center_freq(self.daq_center_freq) - self.set_if_gain(self.daq_rx_gain) - self.set_squelch_threshold(self.daq_squelch_th_dB) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Error message: "+str(errorMsg)) - self.receiver_connection_status = False - self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0])) - - # Re-instantiating sockets - self.socket_inst = socket.socket() - self.ctr_iface_socket = socket.socket() - return -1 - - self.logger.info("Connection established") - que_data_packet = [] - que_data_packet.append(['conn-ok',]) - self.data_que.put(que_data_packet) - - def eth_close(self): - """ - Close Ethernet conenctions including the IQ data and the control interfaces - """ - try: - if self.receiver_connection_status: - if self.data_interface == "eth": - self.socket_inst.sendall(str.encode('q')) # Send exit message - self.socket_inst.close() - self.socket_inst = socket.socket() # Re-instantiating socket - - # Close control interface connection - exit_message_bytes=("EXIT".encode()+bytearray(124)) - self.ctr_iface_socket.send(exit_message_bytes) - self.ctr_iface_socket.close() - self.ctr_iface_socket = socket.socket() - - self.receiver_connection_status = False - que_data_packet = [] - que_data_packet.append(['disconn-ok',]) - self.data_que.put(que_data_packet) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Error message: {0}".format(errorMsg)) - return -1 - - if self.data_interface == "shmem": - self.in_shmem_iface.destory_sm_buffer() - - return 0 - - def get_iq_online(self): - """ - This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface - """ - - # Check connection - if not self.receiver_connection_status: - fail = self.eth_connect() - if fail: - return -1 - - if self.data_interface == "eth": - self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command - self.iq_samples = self.receive_iq_frame() - - elif self.data_interface == "shmem": - active_buff_index = self.in_shmem_iface.wait_buff_free() - if active_buff_index < 0 or active_buff_index > 1: - self.logger.info("Terminating.., signal: {:d}".format(active_buff_index)) - return -1 - - buffer = self.in_shmem_iface.buffers[active_buff_index] - iq_header_bytes = buffer[0:1024].tobytes() - self.iq_header.decode_header(iq_header_bytes) - - # Inititalization from header - Set channel numbers - if self.M == 0: - self.M = self.iq_header.active_ant_chs - - incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) - if incoming_payload_size > 0: - iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\ - .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) - self.iq_samples = iq_samples_in.copy() # Must be .copy - - self.in_shmem_iface.send_ctr_buff_ready(active_buff_index) - - def receive_iq_frame(self): - """ - Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection - """ - total_received_bytes = 0 - recv_bytes_count = 0 - iq_header_bytes = bytearray(self.iq_header.header_size) # allocate array - view = memoryview(iq_header_bytes) # Get buffer - - self.logger.debug("Starting IQ header reception") - - while total_received_bytes < self.iq_header.header_size: - # Receive into buffer - recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes) - view = view[recv_bytes_count:] # reset memory region - total_received_bytes += recv_bytes_count - - self.iq_header.decode_header(iq_header_bytes) - # Uncomment to check the content of the IQ header - #self.iq_header.dump_header() - - incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) - if incoming_payload_size > 0: - # Calculate total bytes to receive from the iq header data - total_bytes_to_receive = incoming_payload_size - receiver_buffer_size = 2**18 - - self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive)) - - total_received_bytes = 0 - recv_bytes_count = 0 - iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size) # allocate array - view = memoryview(iq_data_bytes) # Get buffer - - while total_received_bytes < total_bytes_to_receive: - # Receive into buffer - recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size) - view = view[recv_bytes_count:] # reset memory region - total_received_bytes += recv_bytes_count - - self.logger.debug(" IQ data succesfully received") - - # Convert raw bytes to Complex float64 IQ samples - self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) - - self.iq_frame_bytes = bytearray()+iq_header_bytes+iq_data_bytes - return self.iq_samples - else: - return 0 - - def set_squelch_threshold(self, threshold_dB): - """ - Configures the threshold level of the squelch module in the DAQ FW through the control interface - """ - if self.receiver_connection_status: # Check connection - self.daq_squelch_th_dB = threshold_dB - if threshold_dB == -80: threshold = 0 - else: threshold = 10**(threshold_dB/20) - - # Assembling message - cmd="STHU" - th_bytes=pack("f",threshold) - msg_bytes=(cmd.encode()+th_bytes+bytearray(120)) - try: - _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Unable to start communication thread") - self.logger.error("Error message: {:s}".format(errorMsg)) - - def ctr_iface_init(self): - """ - Initialize connection with the DAQ FW through the control interface - """ - if self.receiver_connection_status: # Check connection - # Assembling message - cmd="INIT" - msg_bytes=(cmd.encode()+bytearray(124)) - try: - _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Unable to start communication thread") - self.logger.error("Error message: {:s}".format(errorMsg)) - - def ctr_iface_communication(self, msg_bytes): - """ - Handles communication on the control interface with the DAQ FW - - Parameters: - ----------- - - :param: msg: Message bytes, that will be sent ont the control interface - :type: msg: Byte array - """ - self.ctr_iface_thread_lock.acquire() - self.logger.debug("Sending control message") - self.ctr_iface_socket.send(msg_bytes) - - # Waiting for the command to take effect - reply_msg_bytes = self.ctr_iface_socket.recv(128) - - self.logger.debug("Control interface communication finished") - self.ctr_iface_thread_lock.release() - - status = reply_msg_bytes[0:4].decode() - if status == "FNSD": - self.logger.info("Reconfiguration succesfully finished") - que_data_packet = [] - que_data_packet.append(['config-ok',]) - self.data_que.put(que_data_packet) - - else: - self.logger.error("Failed to set the requested parameter, reply: {0}".format(status)) - - def set_center_freq(self, center_freq): - """ - Configures the RF center frequency of the receiver through the control interface - - Paramters: - ---------- - :param: center_freq: Required center frequency to set [Hz] - :type: center_freq: float - """ - if self.receiver_connection_status: # Check connection - self.daq_center_freq = int(center_freq) - # Set center frequency - cmd="FREQ" - freq_bytes=pack("Q",int(center_freq)) - msg_bytes=(cmd.encode()+freq_bytes+bytearray(116)) - try: - _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Unable to start communication thread") - self.logger.error("Error message: {:s}".format(errorMsg)) - - def set_if_gain(self, gain): - """ - Configures the IF gain of the receiver through the control interface - - Paramters: - ---------- - :param: gain: IF gain value [dB] - :type: gain: int - """ - if self.receiver_connection_status: # Check connection - self.daq_rx_gain = gain - - # Set center frequency - cmd="GAIN" - gain_list=[297, 37] #[int(gain*10)]*self.M - gain_bytes=pack("I"*self.M, *gain_list) - msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4)) - try: - _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) - except: - errorMsg = sys.exc_info()[0] - self.logger.error("Unable to start communication thread") - self.logger.error("Error message: {:s}".format(errorMsg)) - - def close(self): - """ - Disconnet the receiver module and the DAQ FW - """ - self.eth_close() - diff --git a/util/_receiver/shmemIface.py b/util/_receiver/shmemIface.py deleted file mode 100755 index fd301b1..0000000 --- a/util/_receiver/shmemIface.py +++ /dev/null @@ -1,195 +0,0 @@ -""" - HeIMDALL DAQ Firmware - Python based shared memory interface implementations - - Author: Tamás Pető - License: GNU GPL V3 - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <https://www.gnu.org/licenses/>. -""" -import logging -from struct import pack, unpack -from multiprocessing import shared_memory -import numpy as np -import os - -A_BUFF_READY = 1 -B_BUFF_READY = 2 -INIT_READY = 10 -TERMINATE = 255 -class outShmemIface(): - - - def __init__(self, shmem_name, shmem_size, drop_mode = False): - - self.init_ok = True - logging.basicConfig(level=logging.INFO) - self.logger = logging.getLogger(__name__) - self.drop_mode = drop_mode - self.dropped_frame_cntr = 0 - - self.shmem_name = shmem_name - self.buffer_free = [True, True] - - self.memories = [] - self.buffers = [] - - # Try to remove shared memories if already exist - try: - shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size) - shmem_A.close() - #shmem_A.unkink() - except FileNotFoundError as err: - self.logger.warning("Shared memory not exist") - try: - shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size) - shmem_B.close() - #shmem_B.unkink() - except FileNotFoundError as err: - self.logger.warning("Shared memory not exist") - - # Create the shared memories - self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size)) - self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size)) - self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf)) - self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf)) - - # Opening control FIFOs - if self.drop_mode: - bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK - else: - bw_fifo_flags = os.O_RDONLY - try: - self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY) - self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags) - except OSError as err: - self.logger.critical("OS error: {0}".format(err)) - self.logger.critical("Failed to open control fifos") - self.bw_ctr_fifo = None - self.fw_ctr_fifo = None - self.init_ok = False - - # Send init ready signal - if self.init_ok: - os.write(self.fw_ctr_fifo, pack('B',INIT_READY)) - - def send_ctr_buff_ready(self, active_buffer_index): - # Send buffer ready signal on the forward FIFO - if active_buffer_index == 0: - os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY)) - elif active_buffer_index == 1: - os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY)) - - # Deassert buffer free flag - self.buffer_free[active_buffer_index] = False - - def send_ctr_terminate(self): - os.write(self.fw_ctr_fifo, pack('B',TERMINATE)) - self.logger.info("Terminate signal sent") - - def destory_sm_buffer(self): - for memory in self.memories: - memory.close() - memory.unlink() - - if self.fw_ctr_fifo is not None: - os.close(self.fw_ctr_fifo) - - if self.bw_ctr_fifo is not None: - os.close(self.bw_ctr_fifo) - - def wait_buff_free(self): - if self.buffer_free[0]: - return 0 - elif self.buffer_free[1]: - return 1 - else: - try: - buffer = os.read(self.bw_ctr_fifo, 1) - signal = unpack('B', buffer )[0] - - if signal == A_BUFF_READY: - self.buffer_free[0] = True - return 0 - if signal == B_BUFF_READY: - self.buffer_free[1] = True - return 1 - except BlockingIOError as err: - self.dropped_frame_cntr +=1 - self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr)) - return 3 - return -1 - - -class inShmemIface(): - - def __init__(self, shmem_name, ctr_fifo_path="_data_control/"): - - self.init_ok = True - logging.basicConfig(level=logging.INFO) - self.logger = logging.getLogger(__name__) - self.drop_mode = False - - self.shmem_name = shmem_name - - self.memories = [] - self.buffers = [] - try: - self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY) - self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY) - except OSError as err: - self.logger.critical("OS error: {0}".format(err)) - self.logger.critical("Failed to open control fifos") - self.bw_ctr_fifo = None - self.fw_ctr_fifo = None - self.init_ok = False - - if self.fw_ctr_fifo is not None: - if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY: - self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A')) - self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B')) - self.buffers.append(np.ndarray((self.memories[0].size,), - dtype=np.uint8, - buffer=self.memories[0].buf)) - self.buffers.append(np.ndarray((self.memories[1].size,), - dtype=np.uint8, - buffer=self.memories[1].buf)) - else: - self.init_ok = False - - def send_ctr_buff_ready(self, active_buffer_index): - if active_buffer_index == 0: - os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY)) - elif active_buffer_index == 1: - os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY)) - - def destory_sm_buffer(self): - for memory in self.memories: - memory.close() - - if self.fw_ctr_fifo is not None: - os.close(self.fw_ctr_fifo) - - if self.bw_ctr_fifo is not None: - os.close(self.bw_ctr_fifo) - - def wait_buff_free(self): - signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0] - if signal == A_BUFF_READY: - return 0 - elif signal == B_BUFF_READY: - return 1 - elif signal == TERMINATE: - return TERMINATE - return -1 |