summaryrefslogtreecommitdiffstats
path: root/_receiver/shmemIface.py
diff options
context:
space:
mode:
Diffstat (limited to '_receiver/shmemIface.py')
-rwxr-xr-x_receiver/shmemIface.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/_receiver/shmemIface.py b/_receiver/shmemIface.py
new file mode 100755
index 0000000..fd301b1
--- /dev/null
+++ b/_receiver/shmemIface.py
@@ -0,0 +1,195 @@
+"""
+ HeIMDALL DAQ Firmware
+ Python based shared memory interface implementations
+
+ Author: Tamás Pető
+ License: GNU GPL V3
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""
+import logging
+from struct import pack, unpack
+from multiprocessing import shared_memory
+import numpy as np
+import os
+
+A_BUFF_READY = 1
+B_BUFF_READY = 2
+INIT_READY = 10
+TERMINATE = 255
+class outShmemIface():
+
+
+ def __init__(self, shmem_name, shmem_size, drop_mode = False):
+
+ self.init_ok = True
+ logging.basicConfig(level=logging.INFO)
+ self.logger = logging.getLogger(__name__)
+ self.drop_mode = drop_mode
+ self.dropped_frame_cntr = 0
+
+ self.shmem_name = shmem_name
+ self.buffer_free = [True, True]
+
+ self.memories = []
+ self.buffers = []
+
+ # Try to remove shared memories if already exist
+ try:
+ shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size)
+ shmem_A.close()
+ #shmem_A.unkink()
+ except FileNotFoundError as err:
+ self.logger.warning("Shared memory not exist")
+ try:
+ shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size)
+ shmem_B.close()
+ #shmem_B.unkink()
+ except FileNotFoundError as err:
+ self.logger.warning("Shared memory not exist")
+
+ # Create the shared memories
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size))
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size))
+ self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf))
+ self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf))
+
+ # Opening control FIFOs
+ if self.drop_mode:
+ bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK
+ else:
+ bw_fifo_flags = os.O_RDONLY
+ try:
+ self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY)
+ self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags)
+ except OSError as err:
+ self.logger.critical("OS error: {0}".format(err))
+ self.logger.critical("Failed to open control fifos")
+ self.bw_ctr_fifo = None
+ self.fw_ctr_fifo = None
+ self.init_ok = False
+
+ # Send init ready signal
+ if self.init_ok:
+ os.write(self.fw_ctr_fifo, pack('B',INIT_READY))
+
+ def send_ctr_buff_ready(self, active_buffer_index):
+ # Send buffer ready signal on the forward FIFO
+ if active_buffer_index == 0:
+ os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
+ elif active_buffer_index == 1:
+ os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY))
+
+ # Deassert buffer free flag
+ self.buffer_free[active_buffer_index] = False
+
+ def send_ctr_terminate(self):
+ os.write(self.fw_ctr_fifo, pack('B',TERMINATE))
+ self.logger.info("Terminate signal sent")
+
+ def destory_sm_buffer(self):
+ for memory in self.memories:
+ memory.close()
+ memory.unlink()
+
+ if self.fw_ctr_fifo is not None:
+ os.close(self.fw_ctr_fifo)
+
+ if self.bw_ctr_fifo is not None:
+ os.close(self.bw_ctr_fifo)
+
+ def wait_buff_free(self):
+ if self.buffer_free[0]:
+ return 0
+ elif self.buffer_free[1]:
+ return 1
+ else:
+ try:
+ buffer = os.read(self.bw_ctr_fifo, 1)
+ signal = unpack('B', buffer )[0]
+
+ if signal == A_BUFF_READY:
+ self.buffer_free[0] = True
+ return 0
+ if signal == B_BUFF_READY:
+ self.buffer_free[1] = True
+ return 1
+ except BlockingIOError as err:
+ self.dropped_frame_cntr +=1
+ self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr))
+ return 3
+ return -1
+
+
+class inShmemIface():
+
+ def __init__(self, shmem_name, ctr_fifo_path="_data_control/"):
+
+ self.init_ok = True
+ logging.basicConfig(level=logging.INFO)
+ self.logger = logging.getLogger(__name__)
+ self.drop_mode = False
+
+ self.shmem_name = shmem_name
+
+ self.memories = []
+ self.buffers = []
+ try:
+ self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY)
+ self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY)
+ except OSError as err:
+ self.logger.critical("OS error: {0}".format(err))
+ self.logger.critical("Failed to open control fifos")
+ self.bw_ctr_fifo = None
+ self.fw_ctr_fifo = None
+ self.init_ok = False
+
+ if self.fw_ctr_fifo is not None:
+ if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY:
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A'))
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B'))
+ self.buffers.append(np.ndarray((self.memories[0].size,),
+ dtype=np.uint8,
+ buffer=self.memories[0].buf))
+ self.buffers.append(np.ndarray((self.memories[1].size,),
+ dtype=np.uint8,
+ buffer=self.memories[1].buf))
+ else:
+ self.init_ok = False
+
+ def send_ctr_buff_ready(self, active_buffer_index):
+ if active_buffer_index == 0:
+ os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY))
+ elif active_buffer_index == 1:
+ os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY))
+
+ def destory_sm_buffer(self):
+ for memory in self.memories:
+ memory.close()
+
+ if self.fw_ctr_fifo is not None:
+ os.close(self.fw_ctr_fifo)
+
+ if self.bw_ctr_fifo is not None:
+ os.close(self.bw_ctr_fifo)
+
+ def wait_buff_free(self):
+ signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
+ if signal == A_BUFF_READY:
+ return 0
+ elif signal == B_BUFF_READY:
+ return 1
+ elif signal == TERMINATE:
+ return TERMINATE
+ return -1