lasp/cpp_src/device/lasp_streammgr.h
J.A. de Jong - Redu-Sone B.V., ASCEE V.O.F e973f14884
Some checks failed
Building, testing and releasing LASP if it has a tag / Build-Test-Ubuntu (push) Failing after 2m3s
Building, testing and releasing LASP if it has a tag / Release-Ubuntu (push) Has been skipped
Weak refs to Recording methods. Made the mutexes more simple for stream manager. Added extra guards and statements here and there. Code passes a sever stress test.
2024-03-13 12:19:24 +01:00

230 lines
5.8 KiB
C++

#pragma once
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include "lasp_daq.h"
#include "lasp_siggen.h"
#include "lasp_thread.h"
/** \addtogroup device
* @{
*/
class StreamMgr;
class InDataHandler;
class SeriesBiquad;
/**
* @brief Stream manager. Used to manage the input and output streams.
* Implemented as a singleton: only one stream manager can be in existance for
* a given program. The thread that instantiates a stream manager is the only
* thread that should interact with the stream manager. If this is not true,
* undefined behavior can be expected. If LASP is compiled in debug mode, this
* fact is asserted.
*/
class StreamMgr {
mutable std::recursive_mutex _mtx;
/**
* @brief Storage for streams.
*/
std::unique_ptr<Daq> _inputStream, _outputStream;
std::atomic<bool> _scanningDevices{false};
GlobalThreadPool _pool;
/**
* @brief All indata handlers are called when input data is available. Note
* that they can be called from different threads and should take care of
* thread-safety.
*/
std::list<InDataHandler *> _inDataHandlers;
/**
* @brief Signal generator in use to generate output data. Currently
* implemented as to generate the same data for all output channels.
*/
std::shared_ptr<Siggen> _siggen;
/**
* @brief Filters on input stream. For example, a digital high pass filter.
*/
std::vector<std::unique_ptr<SeriesBiquad>> _inputFilters;
/**
* @brief Current storage for the device list
*/
DeviceInfoList _devices;
// Singleton, no public constructor. Can only be obtained using
// getInstance();
StreamMgr();
friend class InDataHandler;
friend class Siggen;
public:
~StreamMgr();
enum class StreamType : us {
/**
* @brief Input stream
*/
input = 1 << 0,
/**
* @brief Output stream
*/
output = 1 << 1,
};
StreamMgr(const StreamMgr &) = delete;
StreamMgr &operator=(const StreamMgr &) = delete;
/**
* @brief Get access to stream manager instance
*
* @return Reference to stream manager.
*/
static std::shared_ptr<StreamMgr> getInstance();
/**
* @brief Obtain a list of devices currently available. When the StreamMgr is
* first created, this
*
* @return A copy of the internal stored list of devices
*/
DeviceInfoList getDeviceInfo() const {
std::scoped_lock lck(_mtx);
DeviceInfoList d2;
for (const auto &dev : _devices) {
assert(dev != nullptr);
d2.push_back(dev->clone());
}
return d2;
}
/**
* @brief Triggers a background scan of the DAQ devices, which updates the
* internally stored list of devices. Throws a runtime error when a
* background thread is already scanning for devices, or if a stream is
* running.
*
* @param background Perform searching for DAQ devices in the background. If
* set to true, the function returns immediately.
* @param callback Function to call when complete.
*/
void rescanDAQDevices(
bool background = false,
std::function<void()> callback = std::function<void()>());
/**
* @brief Start a stream based on given configuration.
*
* @param config Configuration of a device
*/
void startStream(const DaqConfiguration &config);
/**
* @brief Check if a certain stream is running. If running with no errors, it
* returns true. If an error occured, or the stream is not running, it gives
* false.
*
* @param type The type of stream to check for.
*/
bool isStreamRunningOK(const StreamType type) const {
return getStreamStatus(type).runningOK();
}
bool isStreamRunning(const StreamType type) const {
switch (type) {
case (StreamType::input):
return bool(_inputStream);
break;
case (StreamType::output):
return bool(_outputStream);
break;
}
return false;
}
/**
* @brief Get the streamstatus object corresponding to a given stream.
*
* @param type Type of stream, input, inputType, etc.
*
* @return status object.
*/
Daq::StreamStatus getStreamStatus(const StreamType type) const;
/**
* @brief Get DAQ pointer for a given stream. Gives a nullptr if stream is
* not running.
*
* @param type The stream type to get a DAQ ptr for.
*
* @return Pointer to DAQ
*/
const Daq *getDaq(StreamType type) const;
/**
* @brief Stop stream of given type (input / output/ duplex);
*
* @param stype The stream type to stop.
*/
void stopStream(const StreamType stype);
/**
* @brief Stop and delete all streams. Also called on destruction of the
* StreamMgr.
*/
void stopAllStreams();
/**
* @brief Set active signal generator for output streams. Only one `Siggen'
* is active at the same time. Siggen controls its own data race protection
* using a mutex. If no Siggen is there, and an output stream is running, it
* will send a default signal of 0.
*
* @param s New Siggen pointer
*/
void setSiggen(std::shared_ptr<Siggen> s);
private:
void inCallback(const DaqData &data);
void outCallback(DaqData &data);
/**
* @brief Add an input data handler. The handler's inCallback() function is
* called with data when available. This function should *NOT* be called by
* the user, instead, an InDataHandler can only be created with the StreamMgr
* as argument.
*
* @param handler The handler to add.
*/
void addInDataHandler(InDataHandler *handler);
/**
* @brief Remove InDataHandler from the list.
*
* @param handler
*/
void removeInDataHandler(InDataHandler &handler);
/**
* @brief Do the actual rescanning.
*
* @param callback
*/
void rescanDAQDevices_impl(std::function<void()> callback);
#if LASP_DEBUG == 1
const std::thread::id main_thread_id;
void checkRightThread() const;
#else
void checkRightThread() const {}
#endif
};
/** @} */