summaryrefslogblamecommitdiffstats
path: root/util/_receiver/shmemIface.py
blob: fd301b178ca73aa093a18c2610ddcd0ec30a869b (plain) (tree)


































































































































































































                                                                                                                   
"""
    HeIMDALL DAQ Firmware
    Python based shared memory interface implementations
    
    Author: Tamás Pető    
    License: GNU GPL V3
        
   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation, either version 3 of the License, or
   any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
import logging
from struct import pack, unpack
from multiprocessing import shared_memory
import numpy as np
import os

A_BUFF_READY =   1
B_BUFF_READY =   2
INIT_READY   =  10
TERMINATE    = 255
class outShmemIface():
   

    def __init__(self, shmem_name, shmem_size, drop_mode = False):
        
        self.init_ok = True        
        logging.basicConfig(level=logging.INFO) 
        self.logger = logging.getLogger(__name__)
        self.drop_mode = drop_mode
        self.dropped_frame_cntr = 0
        
        self.shmem_name = shmem_name
        self.buffer_free = [True, True]

        self.memories = []
        self.buffers = []
        
        # Try to remove shared memories if already exist
        try:
            shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size)
            shmem_A.close()
            #shmem_A.unkink()
        except FileNotFoundError as err:
            self.logger.warning("Shared memory not exist")
        try:
            shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size)
            shmem_B.close()
            #shmem_B.unkink()
        except FileNotFoundError as err:
            self.logger.warning("Shared memory not exist")
        
        # Create the shared memories
        self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size))
        self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size))
        self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf))
        self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf))                
        
        # Opening control FIFOs
        if self.drop_mode:
            bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK
        else:
            bw_fifo_flags = os.O_RDONLY
        try:            
            self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY)             
            self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags)            
        except OSError as err:
            self.logger.critical("OS error: {0}".format(err))
            self.logger.critical("Failed to open control fifos")
            self.bw_ctr_fifo = None
            self.fw_ctr_fifo = None
            self.init_ok = False
        
        # Send init ready signal        
        if self.init_ok:            
            os.write(self.fw_ctr_fifo, pack('B',INIT_READY))                        
        
    def send_ctr_buff_ready(self, active_buffer_index):
        # Send buffer ready signal on the forward FIFO
        if active_buffer_index == 0:            
            os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
        elif active_buffer_index == 1:            
            os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY))
        
        # Deassert buffer free flag
        self.buffer_free[active_buffer_index] = False
    
    def send_ctr_terminate(self):
        os.write(self.fw_ctr_fifo, pack('B',TERMINATE))
        self.logger.info("Terminate signal sent")
        
    def destory_sm_buffer(self):
        for memory in self.memories:
            memory.close()
            memory.unlink()
        
        if self.fw_ctr_fifo is not None:            
            os.close(self.fw_ctr_fifo)

        if self.bw_ctr_fifo is not None:            
            os.close(self.bw_ctr_fifo)          
        
    def wait_buff_free(self):
        if self.buffer_free[0]:
            return 0
        elif self.buffer_free[1]:
            return 1
        else:       
            try:
                buffer = os.read(self.bw_ctr_fifo, 1)
                signal = unpack('B', buffer )[0]

                if signal  == A_BUFF_READY:
                    self.buffer_free[0] = True
                    return 0
                if signal == B_BUFF_READY:
                    self.buffer_free[1] = True
                    return 1
            except BlockingIOError as err:                     
                self.dropped_frame_cntr +=1
                self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr))     
            return 3            
        return -1


class inShmemIface():

    def __init__(self, shmem_name, ctr_fifo_path="_data_control/"):
        
        self.init_ok = True        
        logging.basicConfig(level=logging.INFO) 
        self.logger = logging.getLogger(__name__)
        self.drop_mode = False
        
        self.shmem_name = shmem_name
        
        self.memories = []
        self.buffers = []        
        try:
            self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY)
            self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY)                         
        except OSError as err:
            self.logger.critical("OS error: {0}".format(err))
            self.logger.critical("Failed to open control fifos")
            self.bw_ctr_fifo = None
            self.fw_ctr_fifo = None
            self.init_ok = False
        
        if self.fw_ctr_fifo is not None:
            if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY:
                self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A'))
                self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B'))
                self.buffers.append(np.ndarray((self.memories[0].size,), 
                                                dtype=np.uint8, 
                                                buffer=self.memories[0].buf))
                self.buffers.append(np.ndarray((self.memories[1].size,), 
                                                dtype=np.uint8, 
                                                buffer=self.memories[1].buf))
            else:
                self.init_ok = False

    def send_ctr_buff_ready(self, active_buffer_index):                
        if active_buffer_index == 0:            
            os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY))
        elif active_buffer_index == 1:            
            os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY))
                
    def destory_sm_buffer(self):
        for memory in self.memories:
            memory.close()
        
        if self.fw_ctr_fifo is not None:            
            os.close(self.fw_ctr_fifo)

        if self.bw_ctr_fifo is not None:            
            os.close(self.bw_ctr_fifo)         
        
    def wait_buff_free(self):
        signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
        if signal == A_BUFF_READY:                    
            return 0
        elif signal == B_BUFF_READY:                                 
            return 1
        elif signal == TERMINATE:
            return TERMINATE
        return -1