summaryrefslogblamecommitdiffstats
path: root/_receiver/krakenSDR_receiver.py
blob: 59938b7a7198d58b10cf6a37707cc41c51b0a744 (plain) (tree)

























































                                                                                                                 
                                                   






























































                                                                                                       










































































                                                                                                                 
                                               


























































                                                                                                                                                                          













































































                                                                                                 
 

                                  





                                                       














                                                                                                
# KrakenSDR Receiver

# Copyright (C) 2018-2021  Carl Laufer, Tamás Pető
#
#
#   This program is free software: you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation, either version 3 of the License, or
#   any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

# -*- coding: utf-8 -*-

# Import built-in modules
import sys
import os
import time
from struct import pack, unpack
import socket
import _thread
from threading import Lock
import queue 
import logging
#import copy

# Import third party modules
import numpy as np
from scipy import signal
from iq_header import IQHeader
from shmemIface import inShmemIface
class ReceiverRTLSDR():
    
    def __init__(self, data_que, data_interface = "eth", logging_level=10):     
        """
        Parameter:
        ----------        
            :param: data_que: Que to communicate with the UI (web iface/Qt GUI)        
            :param: data_interface: This field is configured by the GUI during instantiation. 
                                    Valid values are the followings:
                                    "eth"  : The module will receiver IQ frames through an Ethernet connection
                                    "shmem": The module will receiver IQ frames through a shared memory interface
            :type : data_interface: string
        """
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging_level)

        # DAQ parameters
        # These values are used by default to configure the DAQ through the configuration interface
        # Values are configured externally upon configuration request
        self.daq_center_freq   = 100 # MHz
        self.daq_rx_gain       = [0] * 100   # [dB]

        # UI interface
        self.data_que = data_que

        # IQ data interface
        self.data_interface = data_interface

        # -> Ethernet
        self.receiver_connection_status = False
        self.port = 5000
        self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request
        self.socket_inst = socket.socket()
        self.receiverBufferSize = 2 ** 18  # Size of the Ethernet receiver buffer measured in bytes

        # -> Shared memory
        root_path          = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
        daq_path           = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw")
        self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/")
        self.init_data_iface()

        # Control interface
        self.ctr_iface_socket = socket.socket()
        self.ctr_iface_port = 5001
        self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread

        self.iq_frame_bytes = None
        self.iq_samples = None
        self.iq_header = IQHeader()
        self.M = 0 # Number of receiver channels, updated after establishing connection

    def init_data_iface(self):
        if self.data_interface == "shmem":
            # Open shared memory interface to capture the DAQ firmware output
            self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path)
            if not self.in_shmem_iface.init_ok:
                self.logger.critical("Shared memory initialization failed")
                self.in_shmem_iface.destory_sm_buffer()
                return -1
        return 0


    def eth_connect(self):
        """
            Compatible only with DAQ firmwares that has the IQ streaming mode.
            HeIMDALL DAQ Firmware version: 1.0 or later
        """
        try:
            if not self.receiver_connection_status:
                if self.data_interface == "eth":
                    # Establlish IQ data interface connection
                    self.socket_inst.connect((self.rec_ip_addr, self.port))
                    self.socket_inst.sendall(str.encode('streaming'))
                    test_iq = self.receive_iq_frame()

                    self.M = self.iq_header.active_ant_chs

                # Establish control interface connection
                self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port))
                self.receiver_connection_status = True
                self.ctr_iface_init()
                self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq))
                self.set_center_freq(self.daq_center_freq)
                self.set_if_gain(self.daq_rx_gain)
        except:
            errorMsg = sys.exc_info()[0]
            self.logger.error("Error message: "+str(errorMsg))
            self.receiver_connection_status = False
            self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0]))

            # Re-instantiating sockets
            self.socket_inst = socket.socket()
            self.ctr_iface_socket = socket.socket()
            return -1

        self.logger.info("Connection established")
        que_data_packet = []
        que_data_packet.append(['conn-ok',])
        self.data_que.put(que_data_packet)

    def eth_close(self):
        """
            Close Ethernet conenctions including the IQ data and the control interfaces
        """
        try:
            if self.receiver_connection_status:
                if self.data_interface == "eth":
                    self.socket_inst.sendall(str.encode('q')) # Send exit message
                    self.socket_inst.close()
                    self.socket_inst = socket.socket() # Re-instantiating socket

                # Close control interface connection
                exit_message_bytes=("EXIT".encode()+bytearray(124))
                self.ctr_iface_socket.send(exit_message_bytes)
                self.ctr_iface_socket.close()
                self.ctr_iface_socket = socket.socket()

            self.receiver_connection_status = False
            que_data_packet = []
            que_data_packet.append(['disconn-ok',])
            self.data_que.put(que_data_packet)
        except:
            errorMsg = sys.exc_info()[0]
            self.logger.error("Error message: {0}".format(errorMsg))
            return -1

        if self.data_interface == "shmem":
            self.in_shmem_iface.destory_sm_buffer()

        return 0

    def get_iq_online(self):
        """
            This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface
        """

        # Check connection
        if not self.receiver_connection_status:
            fail = self.eth_connect()
            if fail:
                return -1

        if self.data_interface == "eth":
            self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command
            self.iq_samples = self.receive_iq_frame()

        elif self.data_interface == "shmem":
            active_buff_index = self.in_shmem_iface.wait_buff_free()
            if active_buff_index < 0 or active_buff_index > 1:
                self.logger.info("Terminating.., signal: {:d}".format(active_buff_index))
                return -1

            buffer = self.in_shmem_iface.buffers[active_buff_index]
            iq_header_bytes = buffer[0:1024].tobytes()
            self.iq_header.decode_header(iq_header_bytes)

            # Inititalization from header - Set channel numbers
            if self.M == 0:
                self.M = self.iq_header.active_ant_chs
                self.daq_rx_gain = [0] * self.M

            incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
            if incoming_payload_size > 0:
                iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\
                                .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)
                self.iq_samples = iq_samples_in.copy() # Must be .copy

            self.in_shmem_iface.send_ctr_buff_ready(active_buff_index)

    def receive_iq_frame(self):
        """
            Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection
        """
        total_received_bytes = 0
        recv_bytes_count = 0
        iq_header_bytes = bytearray(self.iq_header.header_size)  # allocate array
        view = memoryview(iq_header_bytes)  # Get buffer

        self.logger.debug("Starting IQ header reception")

        while total_received_bytes < self.iq_header.header_size:
            # Receive into buffer
            recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes)
            view = view[recv_bytes_count:]  # reset memory region
            total_received_bytes += recv_bytes_count

        self.iq_header.decode_header(iq_header_bytes)
        # Uncomment to check the content of the IQ header
        #self.iq_header.dump_header()

        incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
        if incoming_payload_size > 0:
            # Calculate total bytes to receive from the iq header data
            total_bytes_to_receive = incoming_payload_size
            receiver_buffer_size = 2**18

            self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive))

            total_received_bytes = 0
            recv_bytes_count = 0
            iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size)  # allocate array
            view = memoryview(iq_data_bytes)  # Get buffer

            while total_received_bytes < total_bytes_to_receive:
                # Receive into buffer
                recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size)
                view = view[recv_bytes_count:]  # reset memory region
                total_received_bytes += recv_bytes_count

            self.logger.debug(" IQ data succesfully received")

            # Convert raw bytes to Complex float64 IQ samples
            self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)

            self.iq_frame_bytes =  bytearray()+iq_header_bytes+iq_data_bytes
            return self.iq_samples
        else:
             return 0

    def ctr_iface_init(self):
        """
            Initialize connection with the DAQ FW through the control interface
        """
        if self.receiver_connection_status: # Check connection
            # Assembling message            
            cmd="INIT"            
            msg_bytes=(cmd.encode()+bytearray(124))           
            try:                
                _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
            except:                
                errorMsg = sys.exc_info()[0]        
                self.logger.error("Unable to start communication thread")
                self.logger.error("Error message: {:s}".format(errorMsg))

    def ctr_iface_communication(self, msg_bytes):
        """
            Handles communication on the control interface with the DAQ FW

            Parameters:
            -----------

                :param: msg: Message bytes, that will be sent ont the control interface
                :type:  msg: Byte array                
        """ 
        self.ctr_iface_thread_lock.acquire()
        self.logger.debug("Sending control message")        
        self.ctr_iface_socket.send(msg_bytes)
        
        # Waiting for the command to take effect
        reply_msg_bytes = self.ctr_iface_socket.recv(128)    

        self.logger.debug("Control interface communication finished")
        self.ctr_iface_thread_lock.release()
        
        status = reply_msg_bytes[0:4].decode()
        if status == "FNSD":
            self.logger.info("Reconfiguration succesfully finished")
            que_data_packet = []
            que_data_packet.append(['config-ok',])
            self.data_que.put(que_data_packet)
            
        else:
            self.logger.error("Failed to set the requested parameter, reply: {0}".format(status))
        
    def set_center_freq(self, center_freq):
        """
            Configures the RF center frequency of the receiver through the control interface
        
            Paramters:
            ----------
                :param: center_freq: Required center frequency to set [Hz]
                :type:  center_freq: float
        """
        if self.receiver_connection_status: # Check connection
            self.daq_center_freq = int(center_freq)
            # Set center frequency
            cmd="FREQ"
            freq_bytes=pack("Q",int(center_freq))
            msg_bytes=(cmd.encode()+freq_bytes+bytearray(116))           
            try:                
                _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))            
            except:                
                errorMsg = sys.exc_info()[0]        
                self.logger.error("Unable to start communication thread")
                self.logger.error("Error message: {:s}".format(errorMsg))
    
    def set_if_gain(self, gain):
        """
            Configures the IF gain of the receiver through the control interface
        
            Paramters:
            ----------
                :param: gain: IF gain value [dB]
                :type:  gain: int
        """                
        if self.receiver_connection_status: # Check connection
            self.daq_rx_gain     = gain

            # Set center frequency
            cmd="GAIN"

            gain_list = []
            for i in range(0, self.M):
                gain_list.append(int(gain[i]*10))

            #gain_list=[297, 37] #[int(gain*10)]*self.M
            gain_bytes=pack("I"*self.M, *gain_list)
            msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4))  
            try:                
                _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))            
            except:                
                errorMsg = sys.exc_info()[0]        
                self.logger.error("Unable to start communication thread")
                self.logger.error("Error message: {:s}".format(errorMsg))

    def close(self):
        """
            Disconnet the receiver module and the DAQ FW
        """
        self.eth_close()