From 35a91f26a110e1f46b5f045f065e10bc2c8423cb Mon Sep 17 00:00:00 2001 From: Carl Laufer Date: Tue, 4 Jan 2022 04:24:02 +0000 Subject: tidy signal processor --- _signal_processing/krakenSDR_signal_processor.py | 177 +---------------------- 1 file changed, 7 insertions(+), 170 deletions(-) diff --git a/_signal_processing/krakenSDR_signal_processor.py b/_signal_processing/krakenSDR_signal_processor.py index 651fc39..d0b6dba 100755 --- a/_signal_processing/krakenSDR_signal_processor.py +++ b/_signal_processing/krakenSDR_signal_processor.py @@ -421,6 +421,9 @@ def pruned_correlation(ref_ch, surv_ch, clen, N): ypp = np.vstack([xp[1:, :], np.zeros(cols, dtype=c_dtype)]) #vstack appears to be faster than pad yp = np.concatenate([xp, ypp], axis=1) + print("pruned corr xp shape: " + str(xp.shape)) + print("pruned corr yp shape: " + str(yp.shape)) + # execute FFT on the matrices xpw = fft.fft(xp, n = 2*cols, axis=1, workers=4, overwrite_x=True) bpw = fft.fft(yp, axis=1, workers=4, overwrite_x=True) @@ -523,6 +526,10 @@ def cc_detector_ons(ref_ch, surv_ch, fs, fD_max, r_max): ref_ch_align, surv_ch_align = resize_and_align(no_sub_tasks, ref_ch, surv_ch, fs, fD_max, r_max) + print("ref_ch_align shape: " + str(ref_ch_align.shape)) + print("surv_ch_align shape: " + str(surv_ch_align.shape)) + + # row wise fft on both channels ref_fft = fft.fft(ref_ch_align, axis = 1, overwrite_x=True, workers=4) #pyfftw.interfaces.numpy_fft.fft(ref_ch_align_a, axis = 1, overwrite_input=True, threads=4) #fft.fft(ref_ch_align_a, axis = 1, overwrite_x=True, workers=4) surv_fft = fft.fft(surv_ch_align, axis = 1, overwrite_x=True, workers=4) #pyfftw.interfaces.numpy_fft.fft(surv_ch_align_a, axis = 1, overwrite_input=True, threads=4) #fft.fft(surv_ch_align_a, axis = 1, overwrite_x=True, workers=4) @@ -543,173 +550,3 @@ def cc_detector_ons(ref_ch, surv_ch, fs, fD_max, r_max): return mx - - - - - -#NUMBA optimized center tracking. Gives a mild speed boost ~25% faster. -@njit(fastmath=True, cache=True, parallel=True) -def center_max_signal(processed_signal, frequency, fft_spectrum, threshold, sample_freq): - - # Where is the max frequency? e.g. where is the signal? - max_index = np.argmax(fft_spectrum) - max_frequency = frequency[max_index] - - # Auto decimate down to exactly the max signal width - fft_signal_width = np.sum(fft_spectrum > threshold) + 25 - decimation_factor = max((sample_freq // fft_signal_width) // 2, 1) - - # Auto shift peak frequency center of spectrum, this frequency will be decimated: - # https://pysdr.org/content/filters.html - f0 = -max_frequency #+10 - Ts = 1.0/sample_freq - t = np.arange(0.0, Ts*len(processed_signal[0, :]), Ts) - exponential = np.exp(2j*np.pi*f0*t) # this is essentially a complex sine wave - - return processed_signal * exponential, decimation_factor, fft_signal_width, max_index - - -# NUMBA optimized MUSIC function. About 100x faster on the Pi 4 -@njit(fastmath=True, cache=True) -def DOA_MUSIC(R, scanning_vectors, signal_dimension, angle_resolution=1): - # --> Input check - if R[:,0].size != R[0,:].size: - print("ERROR: Correlation matrix is not quadratic") - return np.ones(1, dtype=nb.c16)*-1 #[(-1, -1j)] - - if R[:,0].size != scanning_vectors[:,0].size: - print("ERROR: Correlation matrix dimension does not match with the antenna array dimension") - return np.ones(1, dtype=nb.c16)*-2 - - #ADORT = np.zeros(scanning_vectors[0,:].size, dtype=np.complex) #CHANGE TO nb.c16 for NUMBA - ADORT = np.zeros(scanning_vectors[0,:].size, dtype=nb.c16) - M = R[:,0].size #np.size(R, 0) - - # --- Calculation --- - # Determine eigenvectors and eigenvalues - sigmai, vi = lin.eig(R) - sigmai = np.abs(sigmai) - - idx = sigmai.argsort()[::1] # Sort eigenvectors by eigenvalues, smallest to largest - #sigmai = sigmai[idx] # Eigenvalues not used again - vi = vi[:,idx] - - # Generate noise subspace matrix - noise_dimension = M - signal_dimension - #E = np.zeros((M, noise_dimension),dtype=np.complex) - E = np.zeros((M, noise_dimension),dtype=nb.c16) - for i in range(noise_dimension): - E[:,i] = vi[:,i] - - theta_index=0 - for i in range(scanning_vectors[0,:].size): - S_theta_ = scanning_vectors[:, i] - S_theta_ = S_theta_.T - ADORT[theta_index] = 1/np.abs(S_theta_.conj().T @ (E @ E.conj().T) @ S_theta_) - theta_index += 1 - - return ADORT - -# Numba optimized version of pyArgus corr_matrix_estimate with "fast". About 2x faster on Pi4 -@njit(fastmath=True, cache=True) #(nb.c8[:,:](nb.c16[:,:])) -def corr_matrix(X): - M = X[:,0].size - N = X[0,:].size - #R = np.zeros((M, M), dtype=nb.c8) - R = np.dot(X, X.conj().T) - R = np.divide(R, N) - return R - -# Numba optimized scanning vectors generation for UCA arrays. About 10x faster on Pi4 -# LRU cache memoize about 1000x faster. -@lru_cache(maxsize=8) -def uca_scanning_vectors(M, DOA_inter_elem_space): - - thetas = np.linspace(0,359,360) # Remember to change self.DOA_thetas too, we didn't include that in this function due to memoization cannot work with arrays - - x = DOA_inter_elem_space * np.cos(2*np.pi/M * np.arange(M)) - y = -DOA_inter_elem_space * np.sin(2*np.pi/M * np.arange(M)) # For this specific array only - - scanning_vectors = np.zeros((M, thetas.size), dtype=np.complex) - for i in range(thetas.size): - scanning_vectors[:,i] = np.exp(1j*2*np.pi* (x*np.cos(np.deg2rad(thetas[i])) + y*np.sin(np.deg2rad(thetas[i])))) - - return scanning_vectors - # scanning_vectors = de.gen_scanning_vectors(M, x, y, self.DOA_theta) - -@njit(fastmath=True, cache=True) -def DOA_plot_util(DOA_data, log_scale_min=-100): - """ - This function prepares the calulcated DoA estimation results for plotting. - - - Noramlize DoA estimation results - - Changes to log scale - """ - - DOA_data = np.divide(np.abs(DOA_data), np.max(np.abs(DOA_data))) # Normalization - DOA_data = 10*np.log10(DOA_data) # Change to logscale - - for i in range(len(DOA_data)): # Remove extremely low values - if DOA_data[i] < log_scale_min: - DOA_data[i] = log_scale_min - - return DOA_data - -@njit(fastmath=True, cache=True) -def calculate_doa_papr(DOA_data): - return 10*np.log10(np.max(np.abs(DOA_data))/np.mean(np.abs(DOA_data))) - -# Old time-domain squelch algorithm (Unused as freq domain FFT with overlaps gives significantly better sensitivity with acceptable time resolution expense -""" - K = 10 - self.filtered_signal = self.raw_signal_amplitude #convolve(np.abs(self.raw_signal_amplitude),np.ones(K), mode = 'same')/K - - # Burst is always started at the begining of the processed block, ensured by the squelch module in the DAQ FW - burst_stop_index = len(self.filtered_signal) # CARL FIX: Initialize this to the length of the signal, incase the signal is active the entire time - self.logger.info("Original burst stop index: {:d}".format(burst_stop_index)) - - min_burst_size = K - burst_stop_amp_val = 0 - for n in np.arange(K, len(self.filtered_signal), 1): - if self.filtered_signal[n] < self.squelch_threshold: - burst_stop_amp_val = self.filtered_signal[n] - burst_stop_index = n - burst_stop_index-=K # Correction with the length of filter - break - - #burst_stop_index-=K # Correction with the length of filter - - - self.logger.info("Burst stop index: {:d}".format(burst_stop_index)) - self.logger.info("Burst stop ampl val: {:f}".format(burst_stop_amp_val)) - self.logger.info("Processed signal length: {:d}".format(len(self.processed_signal[0,:]))) - - # If sign - if burst_stop_index < min_burst_size: - self.logger.debug("The length of the captured burst size is under the minimum: {:d}".format(burst_stop_index)) - burst_stop_index = 0 - - if burst_stop_index !=0: - self.logger.info("INSIDE burst_stop_index != 0") - - self.logger.debug("Burst stop index: {:d}".format(burst_stop_index)) - self.logger.debug("Burst stop ampl val: {:f}".format(burst_stop_amp_val)) - self.squelch_mask = np.zeros(len(self.filtered_signal)) - self.squelch_mask[0 : burst_stop_index] = np.ones(burst_stop_index)*self.squelch_threshold - # Next line removes the end parts of the samples after where the signal ended, truncating the array - self.processed_signal = self.module_receiver.iq_samples[: burst_stop_index, self.squelch_mask == self.squelch_threshold] - self.logger.info("Raw signal length when burst_stop_index!=0: {:d}".format(len(self.module_receiver.iq_samples[0,:]))) - self.logger.info("Processed signal length when burst_stop_index!=0: {:d}".format(len(self.processed_signal[0,:]))) - - #self.logger.info(' '.join(map(str, self.processed_signal))) - - self.data_ready=True - else: - self.logger.info("Signal burst is not found, try to adjust the threshold levels") - #self.data_ready=True - self.squelch_mask = np.ones(len(self.filtered_signal))*self.squelch_threshold - self.processed_signal = np.zeros([self.channel_number, len(self.filtered_signal)]) -""" - - -- cgit v1.2.3