summaryrefslogtreecommitdiffstats
path: root/_receiver
diff options
context:
space:
mode:
authorCarl <Kraken.rf.inc@gmail.com>2021-12-23 03:10:19 +0100
committerCarl <Kraken.rf.inc@gmail.com>2021-12-23 03:10:19 +0100
commitefaaf39f79d81cd740a9f741e6cf4815c49c3093 (patch)
tree5a4fb7caaddef5f228f6833260ba7e7b44345051 /_receiver
parentInitial commit (diff)
downloadkrakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.gz
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.bz2
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.lz
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.xz
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.tar.zst
krakensdr_pr-efaaf39f79d81cd740a9f741e6cf4815c49c3093.zip
Diffstat (limited to '')
-rwxr-xr-x_receiver/iq_header.py152
-rwxr-xr-x_receiver/krakenSDR_receiver.py375
-rwxr-xr-x_receiver/shmemIface.py195
3 files changed, 722 insertions, 0 deletions
diff --git a/_receiver/iq_header.py b/_receiver/iq_header.py
new file mode 100755
index 0000000..f439019
--- /dev/null
+++ b/_receiver/iq_header.py
@@ -0,0 +1,152 @@
+from struct import pack,unpack
+import logging
+import sys
+"""
+ Desctiption: IQ Frame header definition
+ For header field description check the corresponding documentation
+ Total length: 1024 byte
+ Project: HeIMDALL RTL
+ Author: Tamás Pető
+ Status: Finished
+ Version history:
+ 1 : Initial version (2019 04 23)
+ 2 : Fixed 1024 byte length (2019 07 25)
+ 3 : Noise source state (2019 10 01)
+ 4 : IQ sync flag (2019 10 21)
+ 5 : Sync state (2019 11 10)
+ 6 : Unix Epoch timestamp (2019 12 17)
+ 6a: Frame type defines (2020 03 19)
+ 7 : Sync word (2020 05 03)
+"""
+class IQHeader():
+
+ FRAME_TYPE_DATA = 0
+ FRAME_TYPE_DUMMY = 1
+ FRAME_TYPE_RAMP = 2
+ FRAME_TYPE_CAL = 3
+ FRAME_TYPE_TRIGW = 4
+
+ SYNC_WORD = 0x2bf7b95a
+
+ def __init__(self):
+
+ self.logger = logging.getLogger(__name__)
+ self.header_size = 1024 # size in bytes
+ self.reserved_bytes = 192
+
+ self.sync_word=self.SYNC_WORD # uint32_t
+ self.frame_type=0 # uint32_t
+ self.hardware_id="" # char [16]
+ self.unit_id=0 # uint32_t
+ self.active_ant_chs=0 # uint32_t
+ self.ioo_type=0 # uint32_t
+ self.rf_center_freq=0 # uint64_t
+ self.adc_sampling_freq=0 # uint64_t
+ self.sampling_freq=0 # uint64_t
+ self.cpi_length=0 # uint32_t
+ self.time_stamp=0 # uint64_t
+ self.daq_block_index=0 # uint32_t
+ self.cpi_index=0 # uint32_t
+ self.ext_integration_cntr=0 # uint64_t
+ self.data_type=0 # uint32_t
+ self.sample_bit_depth=0 # uint32_t
+ self.adc_overdrive_flags=0 # uint32_t
+ self.if_gains=[0]*32 # uint32_t x 32
+ self.delay_sync_flag=0 # uint32_t
+ self.iq_sync_flag=0 # uint32_t
+ self.sync_state=0 # uint32_t
+ self.noise_source_state=0 # uint32_t
+ self.reserved=[0]*self.reserved_bytes# uint32_t x reserverd_bytes
+ self.header_version=0 # uint32_t
+
+ def decode_header(self, iq_header_byte_array):
+ """
+ Unpack,decode and store the content of the iq header
+ """
+ iq_header_list = unpack("II16sIIIQQQIQIIQIII"+"I"*32+"IIII"+"I"*self.reserved_bytes+"I", iq_header_byte_array)
+
+ self.sync_word = iq_header_list[0]
+ self.frame_type = iq_header_list[1]
+ self.hardware_id = iq_header_list[2].decode()
+ self.unit_id = iq_header_list[3]
+ self.active_ant_chs = iq_header_list[4]
+ self.ioo_type = iq_header_list[5]
+ self.rf_center_freq = iq_header_list[6]
+ self.adc_sampling_freq = iq_header_list[7]
+ self.sampling_freq = iq_header_list[8]
+ self.cpi_length = iq_header_list[9]
+ self.time_stamp = iq_header_list[10]
+ self.daq_block_index = iq_header_list[11]
+ self.cpi_index = iq_header_list[12]
+ self.ext_integration_cntr = iq_header_list[13]
+ self.data_type = iq_header_list[14]
+ self.sample_bit_depth = iq_header_list[15]
+ self.adc_overdrive_flags = iq_header_list[16]
+ self.if_gains = iq_header_list[17:49]
+ self.delay_sync_flag = iq_header_list[49]
+ self.iq_sync_flag = iq_header_list[50]
+ self.sync_state = iq_header_list[51]
+ self.noise_source_state = iq_header_list[52]
+ self.header_version = iq_header_list[52+self.reserved_bytes+1]
+
+ def encode_header(self):
+ """
+ Pack the iq header information into a byte array
+ """
+ iq_header_byte_array=pack("II", self.sync_word, self.frame_type)
+ iq_header_byte_array+=self.hardware_id.encode()+bytearray(16-len(self.hardware_id.encode()))
+ iq_header_byte_array+=pack("IIIQQQIQIIQIII",
+ self.unit_id, self.active_ant_chs, self.ioo_type, self.rf_center_freq, self.adc_sampling_freq,
+ self.sampling_freq, self.cpi_length, self.time_stamp, self.daq_block_index, self.cpi_index,
+ self.ext_integration_cntr, self.data_type, self.sample_bit_depth, self.adc_overdrive_flags)
+ for m in range(32):
+ iq_header_byte_array+=pack("I", self.if_gains[m])
+
+ iq_header_byte_array+=pack("I", self.delay_sync_flag)
+ iq_header_byte_array+=pack("I", self.iq_sync_flag)
+ iq_header_byte_array+=pack("I", self.sync_state)
+ iq_header_byte_array+=pack("I", self.noise_source_state)
+
+ for m in range(self.reserved_bytes):
+ iq_header_byte_array+=pack("I",0)
+
+ iq_header_byte_array+=pack("I", self.header_version)
+ return iq_header_byte_array
+
+ def dump_header(self):
+ """
+ Prints out the content of the header in human readable format
+ """
+ self.logger.info("Sync word: {:d}".format(self.sync_word))
+ self.logger.info("Header version: {:d}".format(self.header_version))
+ self.logger.info("Frame type: {:d}".format(self.frame_type))
+ self.logger.info("Hardware ID: {:16}".format(self.hardware_id))
+ self.logger.info("Unit ID: {:d}".format(self.unit_id))
+ self.logger.info("Active antenna channels: {:d}".format(self.active_ant_chs))
+ self.logger.info("Illuminator type: {:d}".format(self.ioo_type))
+ self.logger.info("RF center frequency: {:.2f} MHz".format(self.rf_center_freq/10**6))
+ self.logger.info("ADC sampling frequency: {:.2f} MHz".format(self.adc_sampling_freq/10**6))
+ self.logger.info("IQ sampling frequency {:.2f} MHz".format(self.sampling_freq/10**6))
+ self.logger.info("CPI length: {:d}".format(self.cpi_length))
+ self.logger.info("Unix Epoch timestamp: {:d}".format(self.time_stamp))
+ self.logger.info("DAQ block index: {:d}".format(self.daq_block_index))
+ self.logger.info("CPI index: {:d}".format(self.cpi_index))
+ self.logger.info("Extended integration counter {:d}".format(self.ext_integration_cntr))
+ self.logger.info("Data type: {:d}".format(self.data_type))
+ self.logger.info("Sample bit depth: {:d}".format(self.sample_bit_depth))
+ self.logger.info("ADC overdrive flags: {:d}".format(self.adc_overdrive_flags))
+ for m in range(32):
+ self.logger.info("Ch: {:d} IF gain: {:.1f} dB".format(m, self.if_gains[m]/10))
+ self.logger.info("Delay sync flag: {:d}".format(self.delay_sync_flag))
+ self.logger.info("IQ sync flag: {:d}".format(self.iq_sync_flag))
+ self.logger.info("Sync state: {:d}".format(self.sync_state))
+ self.logger.info("Noise source state: {:d}".format(self.noise_source_state))
+
+ def check_sync_word(self):
+ """
+ Check the sync word of the header
+ """
+ if self.sync_word != self.SYNC_WORD:
+ return -1
+ else:
+ return 0
diff --git a/_receiver/krakenSDR_receiver.py b/_receiver/krakenSDR_receiver.py
new file mode 100755
index 0000000..a9e2d63
--- /dev/null
+++ b/_receiver/krakenSDR_receiver.py
@@ -0,0 +1,375 @@
+# KrakenSDR Receiver
+
+# Copyright (C) 2018-2021 Carl Laufer, Tamás Pető
+#
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+
+# -*- coding: utf-8 -*-
+
+# Import built-in modules
+import sys
+import os
+import time
+from struct import pack, unpack
+import socket
+import _thread
+from threading import Lock
+import queue
+import logging
+#import copy
+
+# Import third party modules
+import numpy as np
+from scipy import signal
+from iq_header import IQHeader
+from shmemIface import inShmemIface
+class ReceiverRTLSDR():
+
+ def __init__(self, data_que, data_interface = "eth", logging_level=10):
+ """
+ Parameter:
+ ----------
+ :param: data_que: Que to communicate with the UI (web iface/Qt GUI)
+ :param: data_interface: This field is configured by the GUI during instantiation.
+ Valid values are the followings:
+ "eth" : The module will receiver IQ frames through an Ethernet connection
+ "shmem": The module will receiver IQ frames through a shared memory interface
+ :type : data_interface: string
+ """
+ self.logger = logging.getLogger(__name__)
+ self.logger.setLevel(logging_level)
+
+ # DAQ parameters
+ # These values are used by default to configure the DAQ through the configuration interface
+ # Values are configured externally upon configuration request
+ self.daq_center_freq = 100 # MHz
+ self.daq_rx_gain = 0 # [dB]
+ self.daq_squelch_th_dB = 0
+
+ # UI interface
+ self.data_que = data_que
+
+ # IQ data interface
+ self.data_interface = data_interface
+
+ # -> Ethernet
+ self.receiver_connection_status = False
+ self.port = 5000
+ self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request
+ self.socket_inst = socket.socket()
+ self.receiverBufferSize = 2 ** 18 # Size of the Ethernet receiver buffer measured in bytes
+
+ # -> Shared memory
+ root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+ daq_path = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw")
+ self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/")
+ self.init_data_iface()
+
+ # Control interface
+ self.ctr_iface_socket = socket.socket()
+ self.ctr_iface_port = 5001
+ self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread
+
+ self.iq_frame_bytes = None
+ self.iq_samples = None
+ self.iq_header = IQHeader()
+ self.M = 0 # Number of receiver channels, updated after establishing connection
+
+ def init_data_iface(self):
+ if self.data_interface == "shmem":
+ # Open shared memory interface to capture the DAQ firmware output
+ self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path)
+ if not self.in_shmem_iface.init_ok:
+ self.logger.critical("Shared memory initialization failed")
+ self.in_shmem_iface.destory_sm_buffer()
+ return -1
+ return 0
+
+
+ def eth_connect(self):
+ """
+ Compatible only with DAQ firmwares that has the IQ streaming mode.
+ HeIMDALL DAQ Firmware version: 1.0 or later
+ """
+ try:
+ if not self.receiver_connection_status:
+ if self.data_interface == "eth":
+ # Establlish IQ data interface connection
+ self.socket_inst.connect((self.rec_ip_addr, self.port))
+ self.socket_inst.sendall(str.encode('streaming'))
+ test_iq = self.receive_iq_frame()
+
+ self.M = self.iq_header.active_ant_chs
+
+ # Establish control interface connection
+ self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port))
+ self.receiver_connection_status = True
+ self.ctr_iface_init()
+ self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq))
+ self.set_center_freq(self.daq_center_freq)
+ self.set_if_gain(self.daq_rx_gain)
+ self.set_squelch_threshold(self.daq_squelch_th_dB)
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Error message: "+str(errorMsg))
+ self.receiver_connection_status = False
+ self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0]))
+
+ # Re-instantiating sockets
+ self.socket_inst = socket.socket()
+ self.ctr_iface_socket = socket.socket()
+ return -1
+
+ self.logger.info("Connection established")
+ que_data_packet = []
+ que_data_packet.append(['conn-ok',])
+ self.data_que.put(que_data_packet)
+
+ def eth_close(self):
+ """
+ Close Ethernet conenctions including the IQ data and the control interfaces
+ """
+ try:
+ if self.receiver_connection_status:
+ if self.data_interface == "eth":
+ self.socket_inst.sendall(str.encode('q')) # Send exit message
+ self.socket_inst.close()
+ self.socket_inst = socket.socket() # Re-instantiating socket
+
+ # Close control interface connection
+ exit_message_bytes=("EXIT".encode()+bytearray(124))
+ self.ctr_iface_socket.send(exit_message_bytes)
+ self.ctr_iface_socket.close()
+ self.ctr_iface_socket = socket.socket()
+
+ self.receiver_connection_status = False
+ que_data_packet = []
+ que_data_packet.append(['disconn-ok',])
+ self.data_que.put(que_data_packet)
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Error message: {0}".format(errorMsg))
+ return -1
+
+ if self.data_interface == "shmem":
+ self.in_shmem_iface.destory_sm_buffer()
+
+ return 0
+
+ def get_iq_online(self):
+ """
+ This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface
+ """
+
+ # Check connection
+ if not self.receiver_connection_status:
+ fail = self.eth_connect()
+ if fail:
+ return -1
+
+ if self.data_interface == "eth":
+ self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command
+ self.iq_samples = self.receive_iq_frame()
+
+ elif self.data_interface == "shmem":
+ active_buff_index = self.in_shmem_iface.wait_buff_free()
+ if active_buff_index < 0 or active_buff_index > 1:
+ self.logger.info("Terminating.., signal: {:d}".format(active_buff_index))
+ return -1
+
+ buffer = self.in_shmem_iface.buffers[active_buff_index]
+ iq_header_bytes = buffer[0:1024].tobytes()
+ self.iq_header.decode_header(iq_header_bytes)
+
+ # Inititalization from header - Set channel numbers
+ if self.M == 0:
+ self.M = self.iq_header.active_ant_chs
+
+ incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
+ if incoming_payload_size > 0:
+ iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\
+ .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)
+ self.iq_samples = iq_samples_in.copy() # Must be .copy
+
+ self.in_shmem_iface.send_ctr_buff_ready(active_buff_index)
+
+ def receive_iq_frame(self):
+ """
+ Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection
+ """
+ total_received_bytes = 0
+ recv_bytes_count = 0
+ iq_header_bytes = bytearray(self.iq_header.header_size) # allocate array
+ view = memoryview(iq_header_bytes) # Get buffer
+
+ self.logger.debug("Starting IQ header reception")
+
+ while total_received_bytes < self.iq_header.header_size:
+ # Receive into buffer
+ recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes)
+ view = view[recv_bytes_count:] # reset memory region
+ total_received_bytes += recv_bytes_count
+
+ self.iq_header.decode_header(iq_header_bytes)
+ # Uncomment to check the content of the IQ header
+ #self.iq_header.dump_header()
+
+ incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8)
+ if incoming_payload_size > 0:
+ # Calculate total bytes to receive from the iq header data
+ total_bytes_to_receive = incoming_payload_size
+ receiver_buffer_size = 2**18
+
+ self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive))
+
+ total_received_bytes = 0
+ recv_bytes_count = 0
+ iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size) # allocate array
+ view = memoryview(iq_data_bytes) # Get buffer
+
+ while total_received_bytes < total_bytes_to_receive:
+ # Receive into buffer
+ recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size)
+ view = view[recv_bytes_count:] # reset memory region
+ total_received_bytes += recv_bytes_count
+
+ self.logger.debug(" IQ data succesfully received")
+
+ # Convert raw bytes to Complex float64 IQ samples
+ self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length)
+
+ self.iq_frame_bytes = bytearray()+iq_header_bytes+iq_data_bytes
+ return self.iq_samples
+ else:
+ return 0
+
+ def set_squelch_threshold(self, threshold_dB):
+ """
+ Configures the threshold level of the squelch module in the DAQ FW through the control interface
+ """
+ if self.receiver_connection_status: # Check connection
+ self.daq_squelch_th_dB = threshold_dB
+ if threshold_dB == -80: threshold = 0
+ else: threshold = 10**(threshold_dB/20)
+
+ # Assembling message
+ cmd="STHU"
+ th_bytes=pack("f",threshold)
+ msg_bytes=(cmd.encode()+th_bytes+bytearray(120))
+ try:
+ _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Unable to start communication thread")
+ self.logger.error("Error message: {:s}".format(errorMsg))
+
+ def ctr_iface_init(self):
+ """
+ Initialize connection with the DAQ FW through the control interface
+ """
+ if self.receiver_connection_status: # Check connection
+ # Assembling message
+ cmd="INIT"
+ msg_bytes=(cmd.encode()+bytearray(124))
+ try:
+ _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Unable to start communication thread")
+ self.logger.error("Error message: {:s}".format(errorMsg))
+
+ def ctr_iface_communication(self, msg_bytes):
+ """
+ Handles communication on the control interface with the DAQ FW
+
+ Parameters:
+ -----------
+
+ :param: msg: Message bytes, that will be sent ont the control interface
+ :type: msg: Byte array
+ """
+ self.ctr_iface_thread_lock.acquire()
+ self.logger.debug("Sending control message")
+ self.ctr_iface_socket.send(msg_bytes)
+
+ # Waiting for the command to take effect
+ reply_msg_bytes = self.ctr_iface_socket.recv(128)
+
+ self.logger.debug("Control interface communication finished")
+ self.ctr_iface_thread_lock.release()
+
+ status = reply_msg_bytes[0:4].decode()
+ if status == "FNSD":
+ self.logger.info("Reconfiguration succesfully finished")
+ que_data_packet = []
+ que_data_packet.append(['config-ok',])
+ self.data_que.put(que_data_packet)
+
+ else:
+ self.logger.error("Failed to set the requested parameter, reply: {0}".format(status))
+
+ def set_center_freq(self, center_freq):
+ """
+ Configures the RF center frequency of the receiver through the control interface
+
+ Paramters:
+ ----------
+ :param: center_freq: Required center frequency to set [Hz]
+ :type: center_freq: float
+ """
+ if self.receiver_connection_status: # Check connection
+ self.daq_center_freq = int(center_freq)
+ # Set center frequency
+ cmd="FREQ"
+ freq_bytes=pack("Q",int(center_freq))
+ msg_bytes=(cmd.encode()+freq_bytes+bytearray(116))
+ try:
+ _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Unable to start communication thread")
+ self.logger.error("Error message: {:s}".format(errorMsg))
+
+ def set_if_gain(self, gain):
+ """
+ Configures the IF gain of the receiver through the control interface
+
+ Paramters:
+ ----------
+ :param: gain: IF gain value [dB]
+ :type: gain: int
+ """
+ if self.receiver_connection_status: # Check connection
+ self.daq_rx_gain = gain
+
+ # Set center frequency
+ cmd="GAIN"
+ gain_list=[297, 37] #[int(gain*10)]*self.M
+ gain_bytes=pack("I"*self.M, *gain_list)
+ msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4))
+ try:
+ _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,))
+ except:
+ errorMsg = sys.exc_info()[0]
+ self.logger.error("Unable to start communication thread")
+ self.logger.error("Error message: {:s}".format(errorMsg))
+
+ def close(self):
+ """
+ Disconnet the receiver module and the DAQ FW
+ """
+ self.eth_close()
+
diff --git a/_receiver/shmemIface.py b/_receiver/shmemIface.py
new file mode 100755
index 0000000..fd301b1
--- /dev/null
+++ b/_receiver/shmemIface.py
@@ -0,0 +1,195 @@
+"""
+ HeIMDALL DAQ Firmware
+ Python based shared memory interface implementations
+
+ Author: Tamás Pető
+ License: GNU GPL V3
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""
+import logging
+from struct import pack, unpack
+from multiprocessing import shared_memory
+import numpy as np
+import os
+
+A_BUFF_READY = 1
+B_BUFF_READY = 2
+INIT_READY = 10
+TERMINATE = 255
+class outShmemIface():
+
+
+ def __init__(self, shmem_name, shmem_size, drop_mode = False):
+
+ self.init_ok = True
+ logging.basicConfig(level=logging.INFO)
+ self.logger = logging.getLogger(__name__)
+ self.drop_mode = drop_mode
+ self.dropped_frame_cntr = 0
+
+ self.shmem_name = shmem_name
+ self.buffer_free = [True, True]
+
+ self.memories = []
+ self.buffers = []
+
+ # Try to remove shared memories if already exist
+ try:
+ shmem_A = shared_memory.SharedMemory(name=shmem_name+'_A',create=False, size=shmem_size)
+ shmem_A.close()
+ #shmem_A.unkink()
+ except FileNotFoundError as err:
+ self.logger.warning("Shared memory not exist")
+ try:
+ shmem_B = shared_memory.SharedMemory(name=shmem_name+'_B',create=False, size=shmem_size)
+ shmem_B.close()
+ #shmem_B.unkink()
+ except FileNotFoundError as err:
+ self.logger.warning("Shared memory not exist")
+
+ # Create the shared memories
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A',create=True, size=shmem_size))
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B',create=True, size=shmem_size))
+ self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[0].buf))
+ self.buffers.append(np.ndarray((shmem_size,), dtype=np.uint8, buffer=self.memories[1].buf))
+
+ # Opening control FIFOs
+ if self.drop_mode:
+ bw_fifo_flags = os.O_RDONLY | os.O_NONBLOCK
+ else:
+ bw_fifo_flags = os.O_RDONLY
+ try:
+ self.fw_ctr_fifo = os.open('_data_control/'+'fw_'+shmem_name, os.O_WRONLY)
+ self.bw_ctr_fifo = os.open('_data_control/'+'bw_'+shmem_name, bw_fifo_flags)
+ except OSError as err:
+ self.logger.critical("OS error: {0}".format(err))
+ self.logger.critical("Failed to open control fifos")
+ self.bw_ctr_fifo = None
+ self.fw_ctr_fifo = None
+ self.init_ok = False
+
+ # Send init ready signal
+ if self.init_ok:
+ os.write(self.fw_ctr_fifo, pack('B',INIT_READY))
+
+ def send_ctr_buff_ready(self, active_buffer_index):
+ # Send buffer ready signal on the forward FIFO
+ if active_buffer_index == 0:
+ os.write(self.fw_ctr_fifo, pack('B',A_BUFF_READY))
+ elif active_buffer_index == 1:
+ os.write(self.fw_ctr_fifo, pack('B',B_BUFF_READY))
+
+ # Deassert buffer free flag
+ self.buffer_free[active_buffer_index] = False
+
+ def send_ctr_terminate(self):
+ os.write(self.fw_ctr_fifo, pack('B',TERMINATE))
+ self.logger.info("Terminate signal sent")
+
+ def destory_sm_buffer(self):
+ for memory in self.memories:
+ memory.close()
+ memory.unlink()
+
+ if self.fw_ctr_fifo is not None:
+ os.close(self.fw_ctr_fifo)
+
+ if self.bw_ctr_fifo is not None:
+ os.close(self.bw_ctr_fifo)
+
+ def wait_buff_free(self):
+ if self.buffer_free[0]:
+ return 0
+ elif self.buffer_free[1]:
+ return 1
+ else:
+ try:
+ buffer = os.read(self.bw_ctr_fifo, 1)
+ signal = unpack('B', buffer )[0]
+
+ if signal == A_BUFF_READY:
+ self.buffer_free[0] = True
+ return 0
+ if signal == B_BUFF_READY:
+ self.buffer_free[1] = True
+ return 1
+ except BlockingIOError as err:
+ self.dropped_frame_cntr +=1
+ self.logger.warning("Dropping frame.. Total: [{:d}] ".format(self.dropped_frame_cntr))
+ return 3
+ return -1
+
+
+class inShmemIface():
+
+ def __init__(self, shmem_name, ctr_fifo_path="_data_control/"):
+
+ self.init_ok = True
+ logging.basicConfig(level=logging.INFO)
+ self.logger = logging.getLogger(__name__)
+ self.drop_mode = False
+
+ self.shmem_name = shmem_name
+
+ self.memories = []
+ self.buffers = []
+ try:
+ self.fw_ctr_fifo = os.open(ctr_fifo_path+'fw_'+shmem_name, os.O_RDONLY)
+ self.bw_ctr_fifo = os.open(ctr_fifo_path+'bw_'+shmem_name, os.O_WRONLY)
+ except OSError as err:
+ self.logger.critical("OS error: {0}".format(err))
+ self.logger.critical("Failed to open control fifos")
+ self.bw_ctr_fifo = None
+ self.fw_ctr_fifo = None
+ self.init_ok = False
+
+ if self.fw_ctr_fifo is not None:
+ if unpack('B', os.read(self.fw_ctr_fifo, 1))[0] == INIT_READY:
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_A'))
+ self.memories.append(shared_memory.SharedMemory(name=shmem_name+'_B'))
+ self.buffers.append(np.ndarray((self.memories[0].size,),
+ dtype=np.uint8,
+ buffer=self.memories[0].buf))
+ self.buffers.append(np.ndarray((self.memories[1].size,),
+ dtype=np.uint8,
+ buffer=self.memories[1].buf))
+ else:
+ self.init_ok = False
+
+ def send_ctr_buff_ready(self, active_buffer_index):
+ if active_buffer_index == 0:
+ os.write(self.bw_ctr_fifo, pack('B',A_BUFF_READY))
+ elif active_buffer_index == 1:
+ os.write(self.bw_ctr_fifo, pack('B',B_BUFF_READY))
+
+ def destory_sm_buffer(self):
+ for memory in self.memories:
+ memory.close()
+
+ if self.fw_ctr_fifo is not None:
+ os.close(self.fw_ctr_fifo)
+
+ if self.bw_ctr_fifo is not None:
+ os.close(self.bw_ctr_fifo)
+
+ def wait_buff_free(self):
+ signal = unpack('B', os.read(self.fw_ctr_fifo, 1))[0]
+ if signal == A_BUFF_READY:
+ return 0
+ elif signal == B_BUFF_READY:
+ return 1
+ elif signal == TERMINATE:
+ return TERMINATE
+ return -1