summaryrefslogtreecommitdiffstats
path: root/util/_receiver
diff options
context:
space:
mode:
Diffstat (limited to 'util/_receiver')
-rwxr-xr-xutil/_receiver/iq_header.py152
-rwxr-xr-xutil/_receiver/krakenSDR_receiver.py375
-rwxr-xr-xutil/_receiver/shmemIface.py195
3 files changed, 0 insertions, 722 deletions
diff --git a/util/_receiver/iq_header.py b/util/_receiver/iq_header.py
deleted file mode 100755
index f439019..0000000
--- a/util/_receiver/iq_header.py
+++ /dev/null
@@ -1,152 +0,0 @@
-from struct import pack,unpack
-import logging
-import sys
-"""
- Desctiption: IQ Frame header definition
- For header field description check the corresponding documentation
- Total length: 1024 byte
- Project: HeIMDALL RTL
- Author: Tamás Pető
- Status: Finished
- Version history:
- 1 : Initial version (2019 04 23)
- 2 : Fixed 1024 byte length (2019 07 25)
- 3 : Noise source state (2019 10 01)
- 4 : IQ sync flag (2019 10 21)
- 5 : Sync state (2019 11 10)
- 6 : Unix Epoch timestamp (2019 12 17)
- 6a: Frame type defines (2020 03 19)
- 7 : Sync word (2020 05 03)
-"""
-class IQHeader():
-
- FRAME_TYPE_DATA = 0
- FRAME_TYPE_DUMMY = 1
- FRAME_TYPE_RAMP = 2
- FRAME_TYPE_CAL = 3
- FRAME_TYPE_TRIGW = 4
-
- SYNC_WORD = 0x2bf7b95a
-
- def __init__(self):
-
- self.logger = logging.getLogger(__name__)
- self.header_size = 1024 # size in bytes
- self.reserved_bytes = 192
-
- self.sync_word=self.SYNC_WORD # uint32_t
- self.frame_type=0 # uint32_t
- self.hardware_id="" # char [16]
- self.unit_id=0 # uint32_t
- self.active_ant_chs=0 # uint32_t
- self.ioo_type=0 # uint32_t
- self.rf_center_freq=0 # uint64_t
- self.adc_sampling_freq=0 # uint64_t
- self.sampling_freq=0 # uint64_t
- self.cpi_length=0 # uint32_t
- self.time_stamp=0 # uint64_t
- self.daq_block_index=0 # uint32_t
- self.cpi_index=0 # uint32_t
- self.ext_integration_cntr=0 # uint64_t
- self.data_type=0 # uint32_t
- self.sample_bit_depth=0 # uint32_t
- self.adc_overdrive_flags=0 # uint32_t
- self.if_gains=[0]*32 # uint32_t x 32
- self.delay_sync_flag=0 # uint32_t
- self.iq_sync_flag=0 # uint32_t
- self.sync_state=0 # uint32_t
- self.noise_source_state=0 # uint32_t
- self.reserved=[0]*self.reserved_bytes# uint32_t x reserverd_bytes
- self.header_version=0 # uint32_t
-
- def decode_header(self, iq_header_byte_array):
- """
- Unpack,decode and store the content of the iq header
- """
- iq_header_list = unpack("II16sIIIQQQIQIIQIII"+"I"*32+"IIII"+"I"*self.reserved_bytes+"I", iq_header_byte_array)
-
- self.sync_word = iq_header_list[0]
- self.frame_type = iq_header_list[1]
- self.hardware_id = iq_header_list[2].decode()
- self.unit_id = iq_header_list[3]
- self.active_ant_chs = iq_header_list[4]
- self.ioo_type = iq_header_list[5]
- self.rf_center_freq = iq_header_list[6]
- self.adc_sampling_freq = iq_header_list[7]
- self.sampling_freq = iq_header_list[8]
- self.cpi_length = iq_header_list[9]
- self.time_stamp = iq_header_list[10]
- self.daq_block_index = iq_header_list[11]
- self.cpi_index = iq_header_list[12]
- self.ext_integration_cntr = iq_header_list[13]
- self.data_type = iq_header_list[14]
- self.sample_bit_depth = iq_header_list[15]
- self.adc_overdrive_flags = iq_header_list[16]
- self.if_gains = iq_header_list[17:49]
- self.delay_sync_flag = iq_header_list[49]
- self.iq_sync_flag = iq_header_list[50]
- self.sync_state = iq_header_list[51]
- self.noise_source_state = iq_header_list[52]
- self.header_version = iq_header_list[52+self.reserved_bytes+1]
-
- def encode_header(self):
- """
- Pack the iq header information into a byte array
- """
- iq_header_byte_array=pack("II", self.sync_word, self.frame_type)
- iq_header_byte_array+=self.hardware_id.encode()+bytearray(16-len(self.hardware_id.encode()))
- iq_header_byte_array+=pack("IIIQQQIQIIQIII",
- self.unit_id, self.active_ant_chs, self.ioo_type, self.rf_center_freq, self.adc_sampling_freq,
- self.sampling_freq, self.cpi_length, self.time_stamp, self.daq_block_index, self.cpi_index,
- self.ext_integration_cntr, self.data_type, self.sample_bit_depth, self.adc_overdrive_flags)
- for m in range(32):
- iq_header_byte_array+=pack("I", self.if_gains[m])
-
- iq_header_byte_array+=pack("I", self.delay_sync_flag)
- iq_header_byte_array+=pack("I", self.iq_sync_flag)
- iq_header_byte_array+=pack("I", self.sync_state)
- iq_header_byte_array+=pack("I", self.noise_source_state)
-
- for m in range(self.reserved_bytes):
- iq_header_byte_array+=pack("I",0)
-
- iq_header_byte_array+=pack("I", self.header_version)
- return iq_header_byte_array
-
- def dump_header(self):
- """
- Prints out the content of the header in human readable format
- """
- self.logger.info("Sync word: {:d}".format(self.sync_word))
- self.logger.info("Header version: {:d}".format(self.header_version))
- self.logger.info("Frame type: {:d}".format(self.frame_type))
- self.logger.info("Hardware ID: {:16}".format(self.hardware_id))
- self.logger.info("Unit ID: {:d}".format(self.unit_id))
- self.logger.info("Active antenna channels: {:d}".format(self.active_ant_chs))
- self.logger.info("Illuminator type: {:d}".format(self.ioo_type))
- self.logger.info("RF center frequency: {:.2f} MHz".format(self.rf_center_freq/10**6))
- self.logger.info("ADC sampling frequency: {:.2f} MHz".format(self.adc_sampling_freq/10**6))
- self.logger.info("IQ sampling frequency {:.2f} MHz".format(self.sampling_freq/10**6))
- self.logger.info("CPI length: {:d}".format(self.cpi_length))
- self.logger.info("Unix Epoch timestamp: {:d}".format(self.time_stamp))
- self.logger.info("DAQ block index: {:d}".format(self.daq_block_index))
- self.logger.info("CPI index: {:d}".format(self.cpi_index))
- self.logger.info("Extended integration counter {:d}".format(self.ext_integration_cntr))
- self.logger.info("Data type: {:d}".format(self.data_type))
- self.logger.info("Sample bit depth: {:d}".format(self.sample_bit_depth))
- self.logger.info("ADC overdrive flags: {:d}".format(self.adc_overdrive_flags))
- for m in range(32):
- self.logger.info("Ch: {:d} IF gain: {:.1f} dB".format(m, self.if_gains[m]/10))
- self.logger.info("Delay sync flag: {:d}".format(self.delay_sync_flag))
- self.logger.info("IQ sync flag: {:d}".format(self.iq_sync_flag))
- self.logger.info("Sync state: {:d}".format(self.sync_state))
- self.logger.info("Noise source state: {:d}".format(self.noise_source_state))
-
- def check_sync_word(self):
- """
- Check the sync word of the header
- """
- if self.sync_word != self.SYNC_WORD:
- return -1
- else:
- return 0
diff --git a/util/_receiver/krakenSDR_receiver.py b/util/_receiver/krakenSDR_receiver.py
deleted file mode 100755
index a9e2d63..0000000
--- a/util/_receiver/krakenSDR_receiver.py
+++ /dev/null
@@ -1,375 +0,0 @@
-# KrakenSDR Receiver
-
-# Copyright (C) 2018-2021 Carl Laufer, Tamás Pető
-#
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-#
-
-# -*- coding: utf-8 -*-
-
-# Import built-in modules
-import sys
-import os
-import time
-from struct import pack, unpack
-import socket
-import _thread
-from threading import Lock
-import queue
-import logging
-#import copy
-
-# Import third party modules
-import numpy as np
-from scipy import signal
-from iq_header import IQHeader
-from shmemIface import inShmemIface
-class ReceiverRTLSDR():
-
- def __init__(self, data_que, data_interface = "eth", logging_level=10):
- """
- Parameter:
- ----------
- :param: data_que: Que to communicate with the UI (web iface/Qt GUI)
- :param: data_interface: This field is configured by the GUI during instantiation.
- Valid values are the followings:
- "eth" : The module will receiver IQ frames through an Ethernet connection
- "shmem": The module will receiver IQ frames through a shared memory interface
- :type : data_interface: string
- """
- self.logger = logging.getLogger(__name__)
- self.logger.setLevel(logging_level)
-
- # DAQ parameters
- # These values are used by default to configure the DAQ through the configuration interface
- # Values are configured externally upon configuration request
- self.daq_center_freq = 100 # MHz
- self.daq_rx_gain = 0 # [dB]
- self.daq_squelch_th_dB = 0
-
- # UI interface
- self.data_que = data_que
-
- # IQ data interface
- self.data_interface = data_interface
-
- # -> Ethernet
- self.receiver_connection_status = False
- self.port = 5000
- self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request
- self.socket_inst = socket.socket()
- self.receiverBufferSize = 2 ** 18 # Size of the Ethernet receiver buffer measured in bytes
-
- # -> Shared memory
- root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
- daq_path = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw")
- self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/")
- self.init_data_iface()
-
- # Control interface
- self.ctr_iface_socket = socket.socket()
- self.ctr_iface_port = 5001
- self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread
-
- self.iq_frame_bytes = None
- self.iq_samples = None
- self.iq_header = IQHeader()
- self.M = 0 # Number of receiver channels, updated after establishing connection
-
- def init_data_iface(self):
- if self.data_interface == "shmem":
- # Open shared memory interface to capture the DAQ firmware output
- self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path)
- if not self.in_shmem_iface.init_ok:
- self.logger.critical("Shared memory initialization failed")
- self.in_shmem_iface.destory_sm_buffer()
- return -1
- return 0
-
-
- def eth_connect(self):
- """
- Compatible only with DAQ firmwares that has the IQ streaming mode.
- HeIMDALL DAQ Firmware version: 1.0 or later
- """
- try:
- if not self.receiver_connection_status:
- if self.data_interface == "eth":
- # Establlish IQ data interface connection
- self.socket_inst.connect((self.rec_ip_addr, self.port))
- self.socket_inst.sendall(str.encode('streaming'))
- test_iq = self.receive_iq_frame()
-
- self.M = self.iq_header.active_ant_chs
-
- # Establish control interface connection
- self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port))
- self.receiver_connection_status = True
- self.ctr_iface_init()
- self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq))
- self.set_center_freq(self.daq_center_freq)
- self.set_if_gain(self.daq_rx_gain)
- self.set_squelch_threshold(self.daq_squelch_th_dB)
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Error message: "+str(errorMsg))
- self.receiver_connection_status = False
- self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0]))
-
- # Re-instantiating sockets
- self.socket_inst = socket.socket()
- self.ctr_iface_socket = socket.socket()
- return -1
-
- self.logger.info("Connection established")
- que_data_packet = []
- que_data_packet.append(['conn-ok',])
- self.data_que.put(que_data_packet)
-
- def eth_close(self):
- """
- Close Ethernet conenctions including the IQ data and the control interfaces
- """
- try:
- if self.receiver_connection_status:
- if self.data_interface == "eth":
- self.socket_inst.sendall(str.encode('q')) # Send exit message
- self.socket_inst.close()
- self.socket_inst = socket.socket() # Re-instantiating socket
-
- # Close control interface connection
- exit_message_bytes=("EXIT".encode()+bytearray(124))
- self.ctr_iface_socket.send(exit_message_bytes)
- self.ctr_iface_socket.close()
- self.ctr_iface_socket = socket.socket()
-
- self.receiver_connection_status = False
- que_data_packet = []
- que_data_packet.append(['disconn-ok',])
- self.data_que.put(que_data_packet)
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Error message: {0}".format(errorMsg))
- return -1
-
- if self.data_interface == "shmem":
- self.in_shmem_iface.destory_sm_buffer()
-
- return 0
-
- def get_iq_online(self):
- """
- This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface
- """
-
- # Check connection
- if not self.receiver_connection_status:
- fail = self.eth_connect()
- if fail:
- return -1
-
- if self.data_interface == "eth":
- self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command
- self.iq_samples = self.receive_iq_frame()
-
- elif self.data_interface == "shmem":
- active_buff_index = self.in_shmem_iface.wait_buff_free()
- if active_buff_index < 0 or active_buff_index > 1:
- self.logger.info("Terminating.., signal: {:d}".format(active_buff_index))
- return -1
-
- buffer = self.in_shmem_iface.buffers[active_buff_index]
- iq_header_bytes = buffer[0:1024].tobytes()
- self.iq_header.decode_header(iq_header_bytes)
-
- # Inititalization from header - Set channel numbers
- if self.M == 0:
- self.M = self.iq_header.active_ant_chs
-
- incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
- if incoming_payload_size > 0:
- iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\
- .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)
- self.iq_samples = iq_samples_in.copy() # Must be .copy
-
- self.in_shmem_iface.send_ctr_buff_ready(active_buff_index)
-
- def receive_iq_frame(self):
- """
- Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection
- """
- total_received_bytes = 0
- recv_bytes_count = 0
- iq_header_bytes = bytearray(self.iq_header.header_size) # allocate array
- view = memoryview(iq_header_bytes) # Get buffer
-
- self.logger.debug("Starting IQ header reception")
-
- while total_received_bytes < self.iq_header.header_size:
- # Receive into buffer
- recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes)
- view = view[recv_bytes_count:] # reset memory region
- total_received_bytes += recv_bytes_count
-
- self.iq_header.decode_header(iq_header_bytes)
- # Uncomment to check the content of the IQ header
- #self.iq_header.dump_header()
-
- incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
- if incoming_payload_size > 0:
- # Calculate total bytes to receive from the iq header data
- total_bytes_to_receive = incoming_payload_size
- receiver_buffer_size = 2**18
-
- self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive))
-
- total_received_bytes = 0
- recv_bytes_count = 0
- iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size) # allocate array
- view = memoryview(iq_data_bytes) # Get buffer
-
- while total_received_bytes < total_bytes_to_receive:
- # Receive into buffer
- recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size)
- view = view[recv_bytes_count:] # reset memory region
- total_received_bytes += recv_bytes_count
-
- self.logger.debug(" IQ data succesfully received")
-
- # Convert raw bytes to Complex float64 IQ samples
- self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)
-
- self.iq_frame_bytes = bytearray()+iq_header_bytes+iq_data_bytes
- return self.iq_samples
- else:
- return 0
-
- def set_squelch_threshold(self, threshold_dB):
- """
- Configures the threshold level of the squelch module in the DAQ FW through the control interface
- """
- if self.receiver_connection_status: # Check connection
- self.daq_squelch_th_dB = threshold_dB
- if threshold_dB == -80: threshold = 0
- else: threshold = 10**(threshold_dB/20)
-
- # Assembling message
- cmd="STHU"
- th_bytes=pack("f",threshold)
- msg_bytes=(cmd.encode()+th_bytes+bytearray(120))
- try:
- _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Unable to start communication thread")
- self.logger.error("Error message: {:s}".format(errorMsg))
-
- def ctr_iface_init(self):
- """
- Initialize connection with the DAQ FW through the control interface
- """
- if self.receiver_connection_status: # Check connection
- # Assembling message
- cmd="INIT"
- msg_bytes=(cmd.encode()+bytearray(124))
- try:
- _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Unable to start communication thread")
- self.logger.error("Error message: {:s}".format(errorMsg))
-
- def ctr_iface_communication(self, msg_bytes):
- """
- Handles communication on the control interface with the DAQ FW
-
- Parameters:
- -----------
-
- :param: msg: Message bytes, that will be sent ont the control interface
- :type: msg: Byte array
- """
- self.ctr_iface_thread_lock.acquire()
- self.logger.debug("Sending control message")
- self.ctr_iface_socket.send(msg_bytes)
-
- # Waiting for the command to take effect
- reply_msg_bytes = self.ctr_iface_socket.recv(128)
-
- self.logger.debug("Control interface communication finished")
- self.ctr_iface_thread_lock.release()
-
- status = reply_msg_bytes[0:4].decode()
- if status == "FNSD":
- self.logger.info("Reconfiguration succesfully finished")
- que_data_packet = []
- que_data_packet.append(['config-ok',])
- self.data_que.put(que_data_packet)
-
- else:
- self.logger.error("Failed to set the requested parameter, reply: {0}".format(status))
-
- def set_center_freq(self, center_freq):
- """
- Configures the RF center frequency of the receiver through the control interface
-
- Paramters:
- ----------
- :param: center_freq: Required center frequency to set [Hz]
- :type: center_freq: float
- """
- if self.receiver_connection_status: # Check connection
- self.daq_center_freq = int(center_freq)
- # Set center frequency
- cmd="FREQ"
- freq_bytes=pack("Q",int(center_freq))
- msg_bytes=(cmd.encode()+freq_bytes+bytearray(116))
- try:
- _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Unable to start communication thread")
- self.logger.error("Error message: {:s}".format(errorMsg))
-
- def set_if_gain(self, gain):
- """
- Configures the IF gain of the receiver through the control interface
-
- Paramters:
- ----------
- :param: gain: IF gain value [dB]
- :type: gain: int
- """
- if self.receiver_connection_status: # Check connection
- self.daq_rx_gain = gain
-
- # Set center frequency
- cmd="GAIN"
- gain_list=[297, 37] #[int(gain*10)]*self.M
- gain_bytes=pack("I"*self.M, *gain_list)
- msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4))
- try:
- _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
- except:
- errorMsg = sys.exc_info()[0]
- self.logger.error("Unable to start communication thread")
- self.logger.error("Error message: {:s}".format(errorMsg))
-
- def close(self):
- """
- Disconnet the receiver module and the DAQ FW
- """
- self.eth_close()
-
diff --git a/util/_receiver/shmemIface.py b/util/_receiver/shmemIface.py
deleted file mode 100755
index fd301b1..0000000
--- a/util/_receiver/shmemIface.py
+++ /dev/null
@@ -1,195 +0,0 @@
-"""
- HeIMDALL DAQ Firmware
- Python based shared memory interface implementations
-
- Author: Tamás Pető
- License: GNU GPL V3
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <https://www.gnu.org/licenses/>.
-"""
-import logging
-from struct import pack, unpack
-from multiprocessing import shared_memory
-import numpy as np
-import os
-
-A_BUFF_READY = 1
-B_BUFF_READY = 2
-INIT_READY = 10
-TERMINATE = 255
-class outShmemIface():
-
-
- def __init__(self, shmem_name, shmem_size, drop_mode = False):
-
- self.init_ok = True
- logging.basicConfig(level=logging.INFO)
- self.logger = logging.getLogger(__name__)
- self.drop_mode = drop_mode
- self.dropped_frame_cntr = 0
-
- self.shmem_name = shmem_name
- self.buffer_free = [True, True]
-
- self.memories = []
- self.buffers = []
-
- # Try to remove shared memories if already exist
- try:
- shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size)
- shmem_A.close()
- #shmem_A.unkink()
- except FileNotFoundError as err:
- self.logger.warning("Shared memory not exist")
- try:
- shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size)
- shmem_B.close()
- #shmem_B.unkink()
- except FileNotFoundError as err:
- self.logger.warning("Shared memory not exist")
-
- # Create the shared memories
- self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size))
- self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size))
- self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf))
- self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf))
-
- # Opening control FIFOs
- if self.drop_mode:
- bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK
- else:
- bw_fifo_flags = os.O_RDONLY
- try:
- self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY)
- self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags)
- except OSError as err:
- self.logger.critical("OS error: {0}".format(err))
- self.logger.critical("Failed to open control fifos")
- self.bw_ctr_fifo = None
- self.fw_ctr_fifo = None
- self.init_ok = False
-
- # Send init ready signal
- if self.init_ok:
- os.write(self.fw_ctr_fifo, pack('B',INIT_READY))
-
- def send_ctr_buff_ready(self, active_buffer_index):
- # Send buffer ready signal on the forward FIFO
- if active_buffer_index == 0:
- os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
- elif active_buffer_index == 1:
- os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY))
-
- # Deassert buffer free flag
- self.buffer_free[active_buffer_index] = False
-
- def send_ctr_terminate(self):
- os.write(self.fw_ctr_fifo, pack('B',TERMINATE))
- self.logger.info("Terminate signal sent")
-
- def destory_sm_buffer(self):
- for memory in self.memories:
- memory.close()
- memory.unlink()
-
- if self.fw_ctr_fifo is not None:
- os.close(self.fw_ctr_fifo)
-
- if self.bw_ctr_fifo is not None:
- os.close(self.bw_ctr_fifo)
-
- def wait_buff_free(self):
- if self.buffer_free[0]:
- return 0
- elif self.buffer_free[1]:
- return 1
- else:
- try:
- buffer = os.read(self.bw_ctr_fifo, 1)
- signal = unpack('B', buffer )[0]
-
- if signal == A_BUFF_READY:
- self.buffer_free[0] = True
- return 0
- if signal == B_BUFF_READY:
- self.buffer_free[1] = True
- return 1
- except BlockingIOError as err:
- self.dropped_frame_cntr +=1
- self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr))
- return 3
- return -1
-
-
-class inShmemIface():
-
- def __init__(self, shmem_name, ctr_fifo_path="_data_control/"):
-
- self.init_ok = True
- logging.basicConfig(level=logging.INFO)
- self.logger = logging.getLogger(__name__)
- self.drop_mode = False
-
- self.shmem_name = shmem_name
-
- self.memories = []
- self.buffers = []
- try:
- self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY)
- self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY)
- except OSError as err:
- self.logger.critical("OS error: {0}".format(err))
- self.logger.critical("Failed to open control fifos")
- self.bw_ctr_fifo = None
- self.fw_ctr_fifo = None
- self.init_ok = False
-
- if self.fw_ctr_fifo is not None:
- if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY:
- self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A'))
- self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B'))
- self.buffers.append(np.ndarray((self.memories[0].size,),
- dtype=np.uint8,
- buffer=self.memories[0].buf))
- self.buffers.append(np.ndarray((self.memories[1].size,),
- dtype=np.uint8,
- buffer=self.memories[1].buf))
- else:
- self.init_ok = False
-
- def send_ctr_buff_ready(self, active_buffer_index):
- if active_buffer_index == 0:
- os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY))
- elif active_buffer_index == 1:
- os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY))
-
- def destory_sm_buffer(self):
- for memory in self.memories:
- memory.close()
-
- if self.fw_ctr_fifo is not None:
- os.close(self.fw_ctr_fifo)
-
- if self.bw_ctr_fifo is not None:
- os.close(self.bw_ctr_fifo)
-
- def wait_buff_free(self):
- signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
- if signal == A_BUFF_READY:
- return 0
- elif signal == B_BUFF_READY:
- return 1
- elif signal == TERMINATE:
- return TERMINATE
- return -1