Initial commit of source code
This commit is contained in:
219
src/dBus/api/api.h
Normal file
219
src/dBus/api/api.h
Normal file
@@ -0,0 +1,219 @@
|
||||
#pragma once
|
||||
|
||||
#include "logWrapper.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <dBus/dBus.h>
|
||||
#include <dBus/defs.h>
|
||||
#include <expected>
|
||||
#include <format>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
namespace dBus::api
|
||||
{
|
||||
// Define a type alias for the return status of post
|
||||
// operations, using std::expected to represent success
|
||||
// or failure with an error code
|
||||
using PostReturnStatus = std::expected<void, dBus::ReturnStatus>;
|
||||
|
||||
//--------------------------------------------------------------
|
||||
struct BaseDefaultData
|
||||
{
|
||||
};
|
||||
|
||||
template<HashID ID>
|
||||
struct DefaultData : BaseDefaultData
|
||||
{
|
||||
DefaultData() = default; // Default constructor
|
||||
virtual ~DefaultData() = default; // Default destructor
|
||||
DefaultData(const DefaultData &obj) = default; // Copy constructor
|
||||
DefaultData(DefaultData &&obj) noexcept = default; // Move constructor
|
||||
DefaultData &operator=(const DefaultData &obj) = default; // Copy assignment operator
|
||||
DefaultData &operator=(DefaultData &&obj) noexcept = default; // Move assignment operator
|
||||
|
||||
constexpr HashID getID() const
|
||||
{
|
||||
return ID;
|
||||
}
|
||||
|
||||
[[nodiscard]] virtual std::string toString() const = 0; // Convert the data to a string representation for logging or debugging
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
|
||||
/* --- */
|
||||
|
||||
//--------------------------------------------------------------
|
||||
// Generic message API template for simple data transfer (fire-and-forget)
|
||||
template<class T>
|
||||
class MessageApi : public EventMessage<T>
|
||||
{
|
||||
public:
|
||||
MessageApi() = delete; // Default constructor
|
||||
virtual ~MessageApi() = default; // Default destructor
|
||||
MessageApi(const MessageApi &obj) = delete; // Copy constructor
|
||||
MessageApi(MessageApi &&obj) noexcept = delete; // Move constructor
|
||||
MessageApi &operator=(const MessageApi &obj) = delete; // Copy assignment operator
|
||||
MessageApi &operator=(MessageApi &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit MessageApi(const HashID &messageTypeID, T data, const MessageCategory &category = MessageCategory::System) // Constructor
|
||||
: EventMessage<T>(messageTypeID, category, std::move(data))
|
||||
{
|
||||
}
|
||||
|
||||
// Serialize the message data to a string for logging or debugging
|
||||
[[nodiscard]] std::string serializeData() const override
|
||||
{
|
||||
if constexpr (std::is_base_of_v<BaseDefaultData, T>)
|
||||
{
|
||||
return this->value.toString();
|
||||
}
|
||||
else if constexpr (std::is_arithmetic_v<T>)
|
||||
{
|
||||
return std::to_string(this->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
return "";
|
||||
}
|
||||
}
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
// Request message API template for data transfer with response
|
||||
template<class TRequest, class TResponse>
|
||||
class RequestMessageApi : public RequestMessage<TRequest, TResponse>
|
||||
{
|
||||
public:
|
||||
RequestMessageApi() = delete; // Default constructor
|
||||
virtual ~RequestMessageApi() = default; // Default destructor
|
||||
RequestMessageApi(const RequestMessageApi &obj) = delete; // Copy constructor
|
||||
RequestMessageApi(RequestMessageApi &&obj) noexcept = delete; // Move constructor
|
||||
RequestMessageApi &operator=(const RequestMessageApi &obj) = delete; // Copy assignment operator
|
||||
RequestMessageApi &operator=(RequestMessageApi &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit RequestMessageApi(const HashID &messageTypeID, TRequest request, const MessageCategory &category = MessageCategory::System)
|
||||
: RequestMessage<TRequest, TResponse>(messageTypeID, category, std::move(request), createResponse())
|
||||
{
|
||||
}
|
||||
|
||||
// Serialize the message data to a string for logging or debugging
|
||||
[[nodiscard]] std::string serializeData() const override
|
||||
{
|
||||
if constexpr (std::is_base_of_v<BaseDefaultData, TRequest>)
|
||||
{
|
||||
return this->value.toString();
|
||||
}
|
||||
else if constexpr (std::is_arithmetic_v<TRequest>)
|
||||
{
|
||||
return std::to_string(this->value);
|
||||
}
|
||||
else
|
||||
{
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
static auto createResponse()
|
||||
{
|
||||
if constexpr (std::is_void_v<TResponse>)
|
||||
return nullptr; // Use nullptr for void response type since there is no value to return
|
||||
else
|
||||
return std::make_shared<TResponse>();
|
||||
}
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
|
||||
/* --- */
|
||||
|
||||
//--------------------------------------------------------------
|
||||
// Post a simple data message (fire-and-forget)
|
||||
template<class T>
|
||||
bool postData(Bus &bus,
|
||||
const HashID &messageTypeID,
|
||||
T data,
|
||||
const MessageCategory &category)
|
||||
{
|
||||
auto msg = std::make_shared<MessageApi<T>>(messageTypeID, std::move(data), category);
|
||||
return bus.post(msg);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
// Post a data message with subscriber check
|
||||
template<class T>
|
||||
std::expected<void, ReturnStatus> postDataChecked(Bus &bus,
|
||||
const HashID &messageTypeID,
|
||||
T data,
|
||||
const MessageCategory &category)
|
||||
{
|
||||
auto msg = std::make_shared<MessageApi<T>>(messageTypeID, std::move(data), category);
|
||||
const auto subscribersFound = bus.post(msg);
|
||||
|
||||
if (!subscribersFound)
|
||||
{
|
||||
return std::unexpected(ReturnStatus::NoSubscribers);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
template<class TRequest, class TResponse>
|
||||
std::expected<TResponse, ReturnStatus> requestData(Bus &bus,
|
||||
const HashID &messageTypeID,
|
||||
TRequest request,
|
||||
const MessageCategory &category,
|
||||
const std::chrono::milliseconds timeout = DEFAULT_TIMEOUT)
|
||||
{
|
||||
auto msg = std::make_shared<RequestMessageApi<TRequest, TResponse>>(messageTypeID, std::move(request), category);
|
||||
auto future = msg->getFuture();
|
||||
|
||||
try
|
||||
{
|
||||
// Post the message to the bus and check if there are subscribers
|
||||
const auto subscribersFound = bus.post(msg);
|
||||
if (!subscribersFound)
|
||||
{
|
||||
::api::log::postLogWarning(bus, std::format("No subscribers found for message type: 0x{:08x}", msg->getMessageTypeID()));
|
||||
return std::unexpected(ReturnStatus::NoSubscribers);
|
||||
}
|
||||
|
||||
const auto status = future.wait_for(timeout);
|
||||
if (status == std::future_status::ready)
|
||||
{
|
||||
if constexpr (std::is_void_v<TResponse>)
|
||||
{
|
||||
// Wait for the future to be ready and check for exceptions,
|
||||
// but ignore the value since it's void
|
||||
future.get();
|
||||
|
||||
return {}; // Return success without a value
|
||||
}
|
||||
else
|
||||
{
|
||||
// Wait for the future to be ready and return the value,
|
||||
// or propagate any exceptions that occurred during the
|
||||
// request handling
|
||||
return future.get(); // Return success with value
|
||||
}
|
||||
}
|
||||
else if (status == std::future_status::timeout)
|
||||
{
|
||||
::api::log::postLogWarning(bus, std::format("Timeout while waiting for response to message type: 0x{:08x}", msg->getMessageTypeID()));
|
||||
return std::unexpected(ReturnStatus::Timeout);
|
||||
}
|
||||
else
|
||||
{
|
||||
::api::log::postLogError(bus, std::format("Unexpected future status while waiting for response to message type: 0x{:08x}", msg->getMessageTypeID()));
|
||||
return std::unexpected(ReturnStatus::Error);
|
||||
}
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
::api::log::postLogError(bus, std::format("Exception in requestData for message type: 0x{:08x}: {}", msg->getMessageTypeID(), e.what()));
|
||||
return std::unexpected(ReturnStatus::Exception);
|
||||
}
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
} // namespace dBus::api
|
||||
70
src/dBus/api/log.h
Normal file
70
src/dBus/api/log.h
Normal file
@@ -0,0 +1,70 @@
|
||||
#pragma once
|
||||
|
||||
#include "api.h"
|
||||
|
||||
#include <sdi_toolBox/dateTime/age.h>
|
||||
|
||||
namespace api::log
|
||||
{
|
||||
//--------------------------------------------------------------
|
||||
// Log levels
|
||||
//--------------------------------------------------------------
|
||||
enum class LogLevel : uint8_t
|
||||
{
|
||||
Message = 0,
|
||||
Debug,
|
||||
Info,
|
||||
Warning,
|
||||
Error,
|
||||
|
||||
System, // Reserved for system-level logs, not intended for user messages
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
|
||||
/* --- */
|
||||
|
||||
//--------------------------------------------------------------
|
||||
// Log message structure
|
||||
//--------------------------------------------------------------
|
||||
struct LogMessage : dBus::api::DefaultData<dBus::makeID("log.message")>
|
||||
{
|
||||
using Timestamp = std::chrono::steady_clock::time_point;
|
||||
|
||||
virtual ~LogMessage() = default;
|
||||
|
||||
Timestamp timestamp = std::chrono::steady_clock::now();
|
||||
LogLevel level = LogLevel::Message;
|
||||
std::string message;
|
||||
|
||||
[[nodiscard]] std::string toString() const override
|
||||
{
|
||||
const auto timestampFormatter = Age(std::chrono::duration_cast<std::chrono::nanoseconds>(timestamp.time_since_epoch()));
|
||||
const auto logEntry = std::format("{}{}",
|
||||
getLevelString(),
|
||||
message);
|
||||
return logEntry;
|
||||
}
|
||||
|
||||
private:
|
||||
[[nodiscard]] std::string getLevelString() const
|
||||
{
|
||||
switch (level)
|
||||
{
|
||||
using enum LogLevel;
|
||||
case Debug:
|
||||
return "[Debug] ";
|
||||
case Info:
|
||||
return "[Info] ";
|
||||
case Warning:
|
||||
return "[Warning] ";
|
||||
case Error:
|
||||
return "[Error] ";
|
||||
case System:
|
||||
return "[System] ";
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
} // namespace api::log
|
||||
50
src/dBus/api/logWrapper.cpp
Normal file
50
src/dBus/api/logWrapper.cpp
Normal file
@@ -0,0 +1,50 @@
|
||||
#include "logWrapper.h"
|
||||
|
||||
#include "log.h"
|
||||
|
||||
namespace
|
||||
{
|
||||
//--------------------------------------------------------------
|
||||
void postMessage(dBus::Bus &bus, const api::log::LogLevel logLevel, const std::string &message)
|
||||
{
|
||||
api::log::LogMessage logMsg;
|
||||
logMsg.level = logLevel;
|
||||
logMsg.message = message;
|
||||
|
||||
// Post the log message to the bus with the appropriate category (Fire-and-forget, no response expected)
|
||||
(void)dBus::api::postData(bus, logMsg.getID(), logMsg, dBus::MessageCategory::Log);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
} // namespace
|
||||
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogMessage(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::Message, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogDebug(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::Debug, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogInfo(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::Info, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogWarning(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::Warning, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogError(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::Error, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
void api::log::postLogSystem(dBus::Bus &bus, const std::string &message)
|
||||
{
|
||||
postMessage(bus, LogLevel::System, message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
24
src/dBus/api/logWrapper.h
Normal file
24
src/dBus/api/logWrapper.h
Normal file
@@ -0,0 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include "../bus.h"
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace api::log
|
||||
{
|
||||
//--------------------------------------------------------------
|
||||
// Post event helpers
|
||||
//--------------------------------------------------------------
|
||||
void postLogMessage(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
void postLogDebug(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
void postLogInfo(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
void postLogWarning(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
void postLogError(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
void postLogSystem(dBus::Bus &bus, const std::string &message);
|
||||
//--------------------------------------------------------------
|
||||
} // namespace api::log
|
||||
274
src/dBus/bus.cpp
Normal file
274
src/dBus/bus.cpp
Normal file
@@ -0,0 +1,274 @@
|
||||
#include "bus.h"
|
||||
|
||||
#include "dbus.h"
|
||||
|
||||
#include <ranges>
|
||||
#include <set>
|
||||
|
||||
using namespace std;
|
||||
using namespace dBus;
|
||||
//--------------------------------------------------------------
|
||||
/* Default constructor */
|
||||
Bus::Bus()
|
||||
{
|
||||
// Initialization
|
||||
m_startTimestamp = std::chrono::steady_clock::now();
|
||||
|
||||
// Start the monitoring thread
|
||||
m_monitoringThread = jthread([&](const stop_token &st)
|
||||
{
|
||||
while (!st.stop_requested())
|
||||
{
|
||||
this_thread::sleep_for(100ms);
|
||||
updateMonitoringData();
|
||||
} });
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the timestamp of when the bus started */
|
||||
TimePoint Bus::getStartTimestamp() const
|
||||
{
|
||||
return m_startTimestamp;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Subscribe a listener to a specific event type */
|
||||
void Bus::subscribe(const HashID &eventType, Node *node)
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Event type not registered yet, create a new entry with the node
|
||||
if (!m_routingTable.nodes.contains(eventType))
|
||||
{
|
||||
m_routingTable.nodes[eventType] = { node };
|
||||
m_routingTable.activeNodesCache.insert(node);
|
||||
return;
|
||||
}
|
||||
|
||||
// Event type already registered, add the node if not already subscribed
|
||||
auto &nodes = m_routingTable.nodes.at(eventType);
|
||||
if (std::ranges::find(nodes, node) == nodes.end())
|
||||
{
|
||||
nodes.push_back(node);
|
||||
m_routingTable.activeNodesCache.insert(node);
|
||||
}
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Unsubscribe a listener from a specific event type */
|
||||
void Bus::unsubscribe(const HashID &eventType, Node *node)
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Event type not recorded, nothing to do
|
||||
if (!m_routingTable.nodes.contains(eventType))
|
||||
return;
|
||||
|
||||
// Event type registered, remove the listener if subscribed
|
||||
auto &nodes = m_routingTable.nodes.at(eventType);
|
||||
std::erase(nodes, node);
|
||||
|
||||
// Check if the node is still subscribed to any message type before removing it from the active nodes cache
|
||||
if (!isSubscribedEverywhere(node))
|
||||
m_routingTable.activeNodesCache.erase(node);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Unsubscribe a listener from all event types */
|
||||
void Bus::unsubscribeAll(Node *node)
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Iterate through all event types and remove the node from each list
|
||||
for (auto &nodeList : m_routingTable.nodes | std::views::values)
|
||||
{
|
||||
std::erase(nodeList, node);
|
||||
}
|
||||
|
||||
// Check if the node is still subscribed to any message type before removing it from the active nodes cache
|
||||
if (!isSubscribedEverywhere(node))
|
||||
m_routingTable.activeNodesCache.erase(node);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Check if a listener is subscribed to a specific event type */
|
||||
bool Bus::isSubscribed(const HashID &eventType, const Node *node) const
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Event type not recorded, node not subscribed
|
||||
if (!m_routingTable.nodes.contains(eventType))
|
||||
return false;
|
||||
|
||||
// Event type recorded, check if the node is in the list
|
||||
const auto &nodes = m_routingTable.nodes.at(eventType);
|
||||
return std::ranges::find(nodes, node) != nodes.end();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Subscribe a node to receive all messages (broadcast mode) */
|
||||
void Bus::subscribeToBroadcast(Node *node)
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Add the node to the broadcast nodes set
|
||||
m_routingTable.broadcastNodes.insert(node);
|
||||
m_routingTable.activeNodesCache.insert(node);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Unsubscribe a node from broadcast mode */
|
||||
void Bus::unsubscribeFromBroadcast(Node *node)
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Remove the node from the broadcast nodes set
|
||||
m_routingTable.broadcastNodes.erase(node);
|
||||
|
||||
// Check if the node is still subscribed to any message type before removing it from the active nodes cache
|
||||
if (!isSubscribedEverywhere(node))
|
||||
m_routingTable.activeNodesCache.erase(node);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Check if a node is subscribed to broadcast mode */
|
||||
bool Bus::isSubscribedToBroadcast(Node *node) const
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Check if the node is in the broadcast nodes set
|
||||
return m_routingTable.broadcastNodes.contains(node);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
bool Bus::post(const std::shared_ptr<Message> &message)
|
||||
{
|
||||
// Link the message to the bus (set the posted information for the message)
|
||||
message->linkToBus(this);
|
||||
|
||||
// Record the incoming message for monitoring purposes
|
||||
recordIncoming();
|
||||
|
||||
std::scoped_lock lock(m_routingTable.mtx);
|
||||
|
||||
// Get the list of nodes subscribed to the message type
|
||||
set<Node *> recipients;
|
||||
|
||||
// Add nodes subscribed to the specific message type
|
||||
if (m_routingTable.nodes.contains(message->getMessageTypeID()))
|
||||
{
|
||||
const auto &nodeList = m_routingTable.nodes.at(message->getMessageTypeID());
|
||||
for (const auto &node : nodeList)
|
||||
recipients.insert(node);
|
||||
}
|
||||
const bool subscribersFound = !recipients.empty();
|
||||
|
||||
// Add broadcast nodes
|
||||
recipients.insert(m_routingTable.broadcastNodes.begin(),
|
||||
m_routingTable.broadcastNodes.end());
|
||||
|
||||
// Dispatch the message to all recipients
|
||||
for (const auto &node : recipients)
|
||||
node->receiveMessageFromBus(message);
|
||||
|
||||
return subscribersFound;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the current monitoring data of the bus */
|
||||
Bus::MonitoringData Bus::getMonitoringData() const
|
||||
{
|
||||
std::scoped_lock lock(m_monitoring.mtx);
|
||||
return m_monitoring.data;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Set the age of the message (to be called by the message when it is destroyed) */
|
||||
void Bus::setMessageAge(std::chrono::microseconds age)
|
||||
{
|
||||
scoped_lock lock(m_monitoring.mtx);
|
||||
|
||||
m_monitoring.data.maxAge = std::max(m_monitoring.data.maxAge, age);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Check if a node is subscribed to any message type (internal, assumes lock is held) */
|
||||
bool Bus::isSubscribedEverywhere(Node *node) const
|
||||
{
|
||||
// Sanity check
|
||||
if (!node)
|
||||
throw runtime_error("invalid node handle");
|
||||
|
||||
// Check if the node is still subscribed to any message type
|
||||
for (const auto &nodeList : m_routingTable.nodes | std::views::values)
|
||||
{
|
||||
if (std::ranges::find(nodeList, node) != nodeList.end())
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if the node is subscribed to broadcast mode
|
||||
if (m_routingTable.broadcastNodes.contains(node))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Incoming messages for monitoring purposes */
|
||||
void Bus::recordIncoming()
|
||||
{
|
||||
m_monitoring.messageCounter++;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Update the monitoring data of the bus */
|
||||
void Bus::updateMonitoringData()
|
||||
{
|
||||
if (!m_monitoring.frequencyTimer.isElapsed(10s))
|
||||
return;
|
||||
|
||||
// Update monitoring data
|
||||
MonitoringData monitoringData;
|
||||
{
|
||||
std::scoped_lock lock(m_routingTable.mtx, m_monitoring.mtx);
|
||||
|
||||
// Update the global message count with the number of messages received in the last interval
|
||||
m_monitoring.data.globalMessageCount += m_monitoring.messageCounter;
|
||||
|
||||
// Calculate the frequency of messages being posted to the bus (messages per second)
|
||||
const auto duration_s = static_cast<double>(chrono::duration_cast<chrono::milliseconds>(m_monitoring.frequencyTimer.getElapsed()).count()) / 1000.0;
|
||||
m_monitoring.data.frequencyHz = static_cast<double>(m_monitoring.messageCounter) / duration_s;
|
||||
|
||||
// Update the current message count and maximum age of messages on the bus
|
||||
m_monitoring.data.currentMessageCount = 0;
|
||||
for (const auto &node : m_routingTable.activeNodesCache)
|
||||
m_monitoring.data.currentMessageCount += node->getMessageCount();
|
||||
|
||||
// Copy the monitoring data to a local variable for posting the monitoring message after releasing the locks
|
||||
monitoringData = m_monitoring.data;
|
||||
|
||||
// Reset all monitoring counters and timers for the next interval
|
||||
m_monitoring.data.maxAge = {};
|
||||
m_monitoring.messageCounter = 0;
|
||||
m_monitoring.frequencyTimer.reset();
|
||||
}
|
||||
|
||||
// Post monitoring message to the bus
|
||||
// TODO
|
||||
//postMonitoringMsg(*this, monitoringData);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
83
src/dBus/bus.h
Normal file
83
src/dBus/bus.h
Normal file
@@ -0,0 +1,83 @@
|
||||
#pragma once
|
||||
|
||||
#include "Message.h"
|
||||
#include "Node.h"
|
||||
#include "defs.h"
|
||||
|
||||
#include <expected>
|
||||
#include <mutex>
|
||||
#include <sdi_toolBox/dateTime/timer.h>
|
||||
#include <set>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace dBus
|
||||
{
|
||||
//--------------------------------------------------------------
|
||||
class Bus
|
||||
{
|
||||
public:
|
||||
struct MonitoringData
|
||||
{
|
||||
size_t globalMessageCount{ 0 }; // Total count of messages posted to the bus
|
||||
double frequencyHz = { 0 }; // Current frequency of messages being posted to the bus (calculated as messages per second)
|
||||
|
||||
size_t currentMessageCount{ 0 }; // Current count of messages on the bus
|
||||
std::chrono::microseconds maxAge{}; // Maximum age of messages on the bus
|
||||
};
|
||||
|
||||
public:
|
||||
Bus(); // Default constructor
|
||||
virtual ~Bus() = default; // Default destructor
|
||||
Bus(const Bus &obj) = delete; // Copy constructor
|
||||
Bus(Bus &&obj) noexcept = delete; // Move constructor
|
||||
Bus &operator=(const Bus &obj) = delete; // Copy assignment operator
|
||||
Bus &operator=(Bus &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
[[nodiscard]] TimePoint getStartTimestamp() const; // Get the timestamp of when the bus started
|
||||
|
||||
// Registration and unregistration of listeners
|
||||
void subscribe(const HashID &eventType, Node *node); // Subscribe a listener to a specific event type
|
||||
void unsubscribe(const HashID &eventType, Node *node); // Unsubscribe a listener from a specific event type
|
||||
void unsubscribeAll(Node *node); // Unsubscribe a listener from all event types
|
||||
[[nodiscard]] bool isSubscribed(const HashID &eventType, const Node *node) const; // Check if a listener is subscribed to a specific event type
|
||||
|
||||
// Broadcast management
|
||||
void subscribeToBroadcast(Node *node); // Subscribe a node to receive all messages (broadcast mode)
|
||||
void unsubscribeFromBroadcast(Node *node); // Unsubscribe a node from broadcast mode
|
||||
[[nodiscard]] bool isSubscribedToBroadcast(Node *node) const; // Check if a node is subscribed to broadcast mode
|
||||
|
||||
// Data transmission
|
||||
bool post(const std::shared_ptr<Message> &message); // Post a message to the bus
|
||||
|
||||
// Monitoring
|
||||
MonitoringData getMonitoringData() const; // Get the current monitoring data of the bus (message count, frequency, etc.)
|
||||
void setMessageAge(std::chrono::microseconds age); // Set the age of the message
|
||||
|
||||
protected:
|
||||
TimePoint m_startTimestamp; // Timestamp of when the bus started
|
||||
std::jthread m_monitoringThread; // Thread for monitoring the performance of the bus
|
||||
|
||||
struct
|
||||
{
|
||||
mutable std::mutex mtx; // Mutex for thread-safe access to the nodes map
|
||||
std::unordered_map<HashID, std::vector<Node *>> nodes; // Map of message type IDs to lists of subscribed nodes
|
||||
std::set<Node *> broadcastNodes; // Set of nodes subscribed to receive all messages (broadcast mode)
|
||||
std::set<Node *> activeNodesCache; // Cache of active nodes for quick access during message routing
|
||||
} m_routingTable;
|
||||
|
||||
struct
|
||||
{
|
||||
mutable std::mutex mtx; // Mutex for thread-safe access to the monitoring data
|
||||
MonitoringData data;
|
||||
|
||||
sdi_toolBox::dateTime::Timer frequencyTimer; // Timer for calculating message frequency
|
||||
size_t messageCounter = { 0 }; // Counter for calculating message frequency
|
||||
} m_monitoring;
|
||||
|
||||
private:
|
||||
bool isSubscribedEverywhere(Node *node) const; // Check if a node is subscribed to any message type (used for managing the active nodes cache)
|
||||
void recordIncoming(); // Incoming messages for monitoring purposes
|
||||
void updateMonitoringData(); // Update the monitoring data of the bus (message count, frequency, etc.)
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
} // namespace dBus
|
||||
8
src/dBus/dBus.h
Normal file
8
src/dBus/dBus.h
Normal file
@@ -0,0 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
//--------------------------------------------------------------
|
||||
#include "bus.h"
|
||||
#include "defs.h"
|
||||
#include "message.h"
|
||||
#include "node.h"
|
||||
//--------------------------------------------------------------
|
||||
105
src/dBus/defs.h
Normal file
105
src/dBus/defs.h
Normal file
@@ -0,0 +1,105 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <format>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace dBus
|
||||
{
|
||||
using HashID = uint32_t;
|
||||
using TimePoint = std::chrono::steady_clock::time_point;
|
||||
|
||||
constexpr auto DEFAULT_TIMEOUT = std::chrono::milliseconds(100);
|
||||
|
||||
//--------------------------------------------------------------
|
||||
// Hash function for IDs using FNV-1a algorithm
|
||||
constexpr HashID makeID(const std::string_view id) noexcept
|
||||
{
|
||||
constexpr uint32_t FNV_offset_basis = 2166136261u;
|
||||
constexpr uint32_t FNV_prime = 16777619u;
|
||||
|
||||
uint32_t hash_value = FNV_offset_basis;
|
||||
for (const char c : id)
|
||||
{
|
||||
hash_value ^= static_cast<uint32_t>(static_cast<uint8_t>(c));
|
||||
hash_value *= FNV_prime;
|
||||
}
|
||||
return hash_value;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
template<typename T, typename = void>
|
||||
struct has_to_string : std::false_type
|
||||
{
|
||||
};
|
||||
template<typename T>
|
||||
struct has_to_string<T, std::void_t<decltype(std::to_string(std::declval<T>()))>>
|
||||
: std::true_type
|
||||
{
|
||||
};
|
||||
template<typename T>
|
||||
constexpr bool has_to_string_v = has_to_string<T>::value;
|
||||
//--------------------------------------------------------------
|
||||
enum class ReturnStatus : uint8_t
|
||||
{
|
||||
Success = 0,
|
||||
Timeout,
|
||||
NoSubscribers,
|
||||
Error,
|
||||
Exception,
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
enum class MessageCategory : uint32_t
|
||||
{
|
||||
None = 0,
|
||||
Log = 1 << 0,
|
||||
Hardware = 1 << 1,
|
||||
Dataset = 1 << 2,
|
||||
Database = 1 << 3,
|
||||
System = 1 << 4,
|
||||
UI = 1 << 5,
|
||||
All = 0xFFFFFFFF
|
||||
};
|
||||
|
||||
// Convert LogCategory to a string representation
|
||||
inline std::string logCategoryToString(const MessageCategory &cat)
|
||||
{
|
||||
static const std::unordered_map<MessageCategory, std::string> names = {
|
||||
{ MessageCategory::None, "None" },
|
||||
{ MessageCategory::Log, "Log" },
|
||||
{ MessageCategory::Hardware, "Hardware" },
|
||||
{ MessageCategory::Dataset, "Dataset" },
|
||||
{ MessageCategory::Database, "Database" },
|
||||
{ MessageCategory::System, "System" },
|
||||
{ MessageCategory::UI, "UI" },
|
||||
{ MessageCategory::All, "All" }
|
||||
};
|
||||
|
||||
const auto it = names.find(cat);
|
||||
return (it != names.end()) ? it->second : "Composite/Unknown";
|
||||
}
|
||||
|
||||
// Bitwise OR operator for LogCategory to allow combining categories
|
||||
inline MessageCategory operator|(const MessageCategory lhs, const MessageCategory rhs)
|
||||
{
|
||||
return static_cast<MessageCategory>(
|
||||
static_cast<uint32_t>(lhs) | static_cast<uint32_t>(rhs));
|
||||
}
|
||||
|
||||
// Compound assignment operator for LogCategory
|
||||
inline MessageCategory &operator|=(MessageCategory &lhs, const MessageCategory rhs)
|
||||
{
|
||||
lhs = lhs | rhs;
|
||||
return lhs;
|
||||
}
|
||||
|
||||
// Bitwise AND operator for LogCategory to check if a category is included in a combination
|
||||
inline bool operator&(const MessageCategory lhs, const MessageCategory rhs)
|
||||
{
|
||||
return (static_cast<uint32_t>(lhs) & static_cast<uint32_t>(rhs)) != 0;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
} // namespace dBus
|
||||
125
src/dBus/message.cpp
Normal file
125
src/dBus/message.cpp
Normal file
@@ -0,0 +1,125 @@
|
||||
#include "Message.h"
|
||||
|
||||
#include "bus.h"
|
||||
#include "defs.h"
|
||||
|
||||
#include <sdi_toolBox/dateTime/age.h>
|
||||
|
||||
using namespace std;
|
||||
using namespace dBus;
|
||||
//--------------------------------------------------------------
|
||||
/* Constructor */
|
||||
Message::Message(const HashID &messageTypeID, const MessageCategory &category)
|
||||
{
|
||||
// Default initialization
|
||||
m_postedAt = std::chrono::steady_clock::now();
|
||||
|
||||
// Set the message parameters
|
||||
m_messageTypeID = messageTypeID;
|
||||
m_category = category;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Default destructor */
|
||||
Message::~Message()
|
||||
{
|
||||
// Update the age of the message (to be used when logging the message in the bus)
|
||||
if (m_bus)
|
||||
{
|
||||
const auto finalAge = getAge();
|
||||
m_bus->setMessageAge(finalAge);
|
||||
}
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the timestamp of when the message was posted */
|
||||
TimePoint Message::getPostedAt() const
|
||||
{
|
||||
return m_postedAt;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Set the message parameters when the message is posted to the bus (to be called by the bus when the message is posted) */
|
||||
void Message::linkToBus(Bus *bus)
|
||||
{
|
||||
m_bus = bus;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the age of the message in microseconds */
|
||||
std::chrono::microseconds Message::getAge() const
|
||||
{
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
const auto age = now - m_postedAt;
|
||||
|
||||
return std::chrono::duration_cast<std::chrono::microseconds>(age);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Check if the message is of a specific type */
|
||||
bool Message::isType(const HashID &eventType) const
|
||||
{
|
||||
return m_messageTypeID == eventType;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the unique identifier for the message type */
|
||||
HashID Message::getMessageTypeID() const
|
||||
{
|
||||
return m_messageTypeID;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the log category of the message */
|
||||
MessageCategory Message::getCategory() const
|
||||
{
|
||||
return m_category;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Serialize the message data to a string */
|
||||
std::string Message::serializeData() const
|
||||
{
|
||||
return "";
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get a string representation of the message for logging purposes */
|
||||
std::string Message::toString() const
|
||||
{
|
||||
const auto serializedData = serializeData();
|
||||
|
||||
// Determine the age of the message for logging purposes
|
||||
string age = "N/A";
|
||||
if (m_bus)
|
||||
{
|
||||
const auto &busStartTimestamp = m_bus->getStartTimestamp();
|
||||
const auto dt = m_postedAt - busStartTimestamp;
|
||||
age = Age(dt).toString();
|
||||
}
|
||||
|
||||
// Format message output
|
||||
auto str = std::format("[{:}] - 0x{:08x} ({:})",
|
||||
age,
|
||||
m_messageTypeID,
|
||||
logCategoryToString(m_category));
|
||||
if (!serializedData.empty())
|
||||
str += " - " + serializedData;
|
||||
return str;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get a formatted log line representation of the message for logging purposes */
|
||||
std::string Message::toLogLine() const
|
||||
{
|
||||
const auto serializedData = serializeData();
|
||||
|
||||
// Determine the age of the message for logging purposes
|
||||
std::chrono::microseconds age{ 0 };
|
||||
if (m_bus)
|
||||
{
|
||||
const auto &busStartTimestamp = m_bus->getStartTimestamp();
|
||||
const auto dt = m_postedAt - busStartTimestamp;
|
||||
age = Age(dt).getMicroseconds();
|
||||
}
|
||||
|
||||
// Format message output
|
||||
auto str = std::format("[{:}] - 0x{:08x} ({:})",
|
||||
age.count(),
|
||||
m_messageTypeID,
|
||||
logCategoryToString(m_category));
|
||||
if (!serializedData.empty())
|
||||
str += " - " + serializedData;
|
||||
return str;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
152
src/dBus/message.h
Normal file
152
src/dBus/message.h
Normal file
@@ -0,0 +1,152 @@
|
||||
#pragma once
|
||||
|
||||
#include "defs.h"
|
||||
|
||||
#include <future>
|
||||
|
||||
namespace dBus
|
||||
{
|
||||
//--------------------------------------------------------------
|
||||
class Message : public std::enable_shared_from_this<Message>
|
||||
{
|
||||
friend class Bus;
|
||||
|
||||
public:
|
||||
Message() = default; // Default constructor
|
||||
virtual ~Message(); // Default destructor
|
||||
Message(const Message &obj) = delete; // Copy constructor
|
||||
Message(Message &&obj) noexcept = delete; // Move constructor
|
||||
Message &operator=(const Message &obj) = delete; // Copy assignment operator
|
||||
Message &operator=(Message &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit Message(const HashID &messageTypeID, // Constructor
|
||||
const MessageCategory &category);
|
||||
|
||||
// Message management
|
||||
TimePoint getPostedAt() const; // Get the timestamp of when the message was posted
|
||||
[[nodiscard]] std::chrono::microseconds getAge() const; // Get the age of the message in microseconds
|
||||
[[nodiscard]] bool isType(const HashID &eventType) const; // Check if the message is of a specific type
|
||||
|
||||
// Trace information
|
||||
[[nodiscard]] HashID getMessageTypeID() const; // Get the unique identifier for the message type
|
||||
[[nodiscard]] MessageCategory getCategory() const; // Get the log category of the message
|
||||
|
||||
[[nodiscard]] virtual std::string serializeData() const; // Serialize the message data to a string
|
||||
[[nodiscard]] virtual std::string toString() const; // Get a string representation of the message for logging purposes
|
||||
[[nodiscard]] virtual std::string toLogLine() const; // Get a formatted log line representation of the message for logging purposes
|
||||
|
||||
template<class Derived>
|
||||
[[nodiscard]] std::shared_ptr<Derived> as(); // Template method to cast the message to a specific derived type
|
||||
template<class Derived>
|
||||
[[nodiscard]] std::shared_ptr<const Derived> as() const; // Template method to cast the message to a specific derived type
|
||||
|
||||
protected:
|
||||
HashID m_messageTypeID = 0; // Unique identifier for the message type
|
||||
MessageCategory m_category = MessageCategory::None; // Log category of the message
|
||||
|
||||
Bus *m_bus = nullptr; // Pointer to the bus this message is posted to (to be set by the bus when the message is posted)
|
||||
TimePoint m_postedAt; // Timestamp of when the message was posted
|
||||
|
||||
private:
|
||||
void linkToBus(Bus *bus); // Set the message parameters when the message is posted to the bus
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
/* Template method to cast the message to a specific derived type */
|
||||
template<class Derived>
|
||||
std::shared_ptr<Derived> Message::as()
|
||||
{
|
||||
return std::dynamic_pointer_cast<Derived>(shared_from_this());
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Template method to cast the message to a specific derived type */
|
||||
template<class Derived>
|
||||
std::shared_ptr<const Derived> Message::as() const
|
||||
{
|
||||
return std::dynamic_pointer_cast<const Derived>(shared_from_this());
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
|
||||
/* --- */
|
||||
|
||||
//--------------------------------------------------------------
|
||||
template<class T>
|
||||
class EventMessage : public Message
|
||||
{
|
||||
public:
|
||||
T value; // Value associated with the event message
|
||||
|
||||
EventMessage() = delete; // Default constructor
|
||||
virtual ~EventMessage() = default; // Default destructor
|
||||
EventMessage(const EventMessage &obj) = delete; // Copy constructor
|
||||
EventMessage(EventMessage &&obj) noexcept = delete; // Move constructor
|
||||
EventMessage &operator=(const EventMessage &obj) = delete; // Copy assignment operator
|
||||
EventMessage &operator=(EventMessage &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit EventMessage(const HashID &messageTypeID, // Constructor
|
||||
const MessageCategory &category,
|
||||
T v);
|
||||
|
||||
[[nodiscard]] std::string serializeData() const override; // Serialize the message data to a string
|
||||
};
|
||||
|
||||
//--------------------------------------------------------------
|
||||
/* Constructor */
|
||||
template<class T>
|
||||
EventMessage<T>::EventMessage(const HashID &messageTypeID, const MessageCategory &category, T v)
|
||||
: Message(messageTypeID, category)
|
||||
, value(std::move(v))
|
||||
{
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Serialize the message data to a string */
|
||||
template<class T>
|
||||
std::string EventMessage<T>::serializeData() const
|
||||
{
|
||||
if constexpr (std::is_arithmetic_v<T>)
|
||||
return std::to_string(value);
|
||||
return "";
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
|
||||
/* --- */
|
||||
|
||||
//--------------------------------------------------------------
|
||||
template<class T, class U>
|
||||
class RequestMessage : public Message
|
||||
{
|
||||
public:
|
||||
T value; // Value associated with the event message
|
||||
std::promise<U> responsePromise; // Promise to be fulfilled with the response data once the request is processed
|
||||
|
||||
RequestMessage() = delete; // Default constructor
|
||||
virtual ~RequestMessage() = default; // Default destructor
|
||||
RequestMessage(const RequestMessage &obj) = delete; // Copy constructor
|
||||
RequestMessage(RequestMessage &&obj) noexcept = delete; // Move constructor
|
||||
RequestMessage &operator=(const RequestMessage &obj) = delete; // Copy assignment operator
|
||||
RequestMessage &operator=(RequestMessage &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit RequestMessage(const HashID &messageTypeID, // Constructor
|
||||
const MessageCategory &category,
|
||||
T v,
|
||||
std::shared_ptr<U> p);
|
||||
|
||||
std::future<U> getFuture(); // Get the future associated with the response promise
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
/* Constructor */
|
||||
template<class T, class U>
|
||||
RequestMessage<T, U>::RequestMessage(const HashID &messageTypeID, const MessageCategory &category, T v, std::shared_ptr<U> p)
|
||||
: Message(messageTypeID, category)
|
||||
, value(std::move(v))
|
||||
, responsePromise(std::promise<U>{})
|
||||
{
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the future associated with the response promise */
|
||||
template<class T, class U>
|
||||
std::future<U> RequestMessage<T, U>::getFuture()
|
||||
{
|
||||
return std::move(responsePromise.get_future());
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
} // namespace dBus
|
||||
121
src/dBus/node.cpp
Normal file
121
src/dBus/node.cpp
Normal file
@@ -0,0 +1,121 @@
|
||||
#include "node.h"
|
||||
|
||||
#include "bus.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace dBus;
|
||||
//--------------------------------------------------------------
|
||||
/* Constructor */
|
||||
Node::Node(Bus &bus)
|
||||
: m_bus(bus)
|
||||
{
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Default destructor */
|
||||
Node::~Node()
|
||||
{
|
||||
unsubscribeAll();
|
||||
|
||||
// Notify the node thread to stop waiting for messages and exit the loop (if applicable)
|
||||
notifyMessageQueue();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Subscribe to a specific message type */
|
||||
void Node::subscribe(const HashID &eventType)
|
||||
{
|
||||
m_bus.subscribe(eventType, this);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Unsubscribe from a specific message type */
|
||||
void Node::unsubscribe(const HashID &eventType)
|
||||
{
|
||||
m_bus.unsubscribe(eventType, this);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Unsubscribe from all message types */
|
||||
void Node::unsubscribeAll()
|
||||
{
|
||||
m_bus.unsubscribeAll(this);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Check if is subscribed to a specific message type */
|
||||
bool Node::isSubscribed(const HashID &eventType) const
|
||||
{
|
||||
return m_bus.isSubscribed(eventType, this);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Wait for a message to be received */
|
||||
void Node::syncWaitForMessage()
|
||||
{
|
||||
// Check if there are already messages in the queue before waiting
|
||||
// to avoid missing notifications if a message is received between
|
||||
// the last check and the wait call
|
||||
{
|
||||
std::scoped_lock lock(m_dBusMessages.mtx);
|
||||
if (!m_dBusMessages.messageQueue.empty())
|
||||
return; // Messages already in the queue, no need to wait
|
||||
}
|
||||
|
||||
// Wait until a message is received (the bus will call notifyMessageFromBus()
|
||||
// to wake up the node thread when a message is posted)
|
||||
m_nodeWaitFlag = false;
|
||||
m_nodeWaitFlag.wait(false);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Post a message to the bus */
|
||||
void Node::postMessage(const std::shared_ptr<Message> &message) const
|
||||
{
|
||||
m_bus.post(message);
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the maximum age of messages in the queue */
|
||||
std::chrono::microseconds Node::getMessageMaxAge() const
|
||||
{
|
||||
std::scoped_lock lock(m_dBusMessages.mtx);
|
||||
|
||||
// If the queue is empty, return a default value (e.g., zero)
|
||||
if (m_dBusMessages.messageQueue.empty())
|
||||
return {};
|
||||
|
||||
// Get the age of the oldest message in the queue
|
||||
// We can assume that the messages are ordered by age, so we can check the front of the queue
|
||||
const auto &oldestMessage = m_dBusMessages.messageQueue.front();
|
||||
return oldestMessage->getAge();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the number of received messages */
|
||||
size_t Node::getMessageCount() const
|
||||
{
|
||||
std::scoped_lock lock(m_dBusMessages.mtx);
|
||||
return m_dBusMessages.messageQueue.size();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Get the next message from the queue */
|
||||
std::shared_ptr<Message> Node::popNextMessage()
|
||||
{
|
||||
std::scoped_lock lock(m_dBusMessages.mtx);
|
||||
if (m_dBusMessages.messageQueue.empty())
|
||||
return nullptr;
|
||||
|
||||
auto event = m_dBusMessages.messageQueue.front();
|
||||
m_dBusMessages.messageQueue.pop();
|
||||
|
||||
return event;
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Receive a message from the bus */
|
||||
void Node::receiveMessageFromBus(std::shared_ptr<Message> message)
|
||||
{
|
||||
std::scoped_lock lock(m_dBusMessages.mtx);
|
||||
m_dBusMessages.messageQueue.push(std::move(message));
|
||||
|
||||
notifyMessageQueue();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
/* Notify the node of a new message from the bus */
|
||||
void Node::notifyMessageQueue()
|
||||
{
|
||||
m_nodeWaitFlag.store(true);
|
||||
m_nodeWaitFlag.notify_one();
|
||||
}
|
||||
//--------------------------------------------------------------
|
||||
57
src/dBus/node.h
Normal file
57
src/dBus/node.h
Normal file
@@ -0,0 +1,57 @@
|
||||
#pragma once
|
||||
|
||||
#include "Message.h"
|
||||
#include "defs.h"
|
||||
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
namespace dBus
|
||||
{
|
||||
class Bus;
|
||||
//--------------------------------------------------------------
|
||||
class Node
|
||||
{
|
||||
friend class Bus;
|
||||
|
||||
public:
|
||||
Node() = delete; // Default constructor
|
||||
virtual ~Node(); // Default destructor
|
||||
Node(const Node &obj) = delete; // Copy constructor
|
||||
Node(Node &&obj) noexcept = delete; // Move constructor
|
||||
Node &operator=(const Node &obj) = delete; // Copy assignment operator
|
||||
Node &operator=(Node &&obj) noexcept = delete; // Move assignment operator
|
||||
|
||||
explicit Node(Bus &bus); // Constructor
|
||||
|
||||
// Event subscription management
|
||||
void subscribe(const HashID &eventType); // Subscribe to a specific message type
|
||||
void unsubscribe(const HashID &eventType); // Unsubscribe from a specific message type
|
||||
void unsubscribeAll(); // Unsubscribe from all message types
|
||||
[[nodiscard]] bool isSubscribed(const HashID &eventType) const; // Check if is subscribed to a specific message type
|
||||
|
||||
void syncWaitForMessage(); // Wait for a message to be received
|
||||
|
||||
// Data transmission
|
||||
void postMessage(const std::shared_ptr<Message> &message) const; // Post a message to the bus
|
||||
|
||||
[[nodiscard]] std::chrono::microseconds getMessageMaxAge() const; // Get the maximum age of messages in the queue
|
||||
[[nodiscard]] size_t getMessageCount() const; // Get the number of received messages
|
||||
[[nodiscard]] std::shared_ptr<Message> popNextMessage(); // Get the next message from the queue
|
||||
|
||||
protected:
|
||||
virtual void receiveMessageFromBus(std::shared_ptr<Message> message); // Receive a message from the bus (to be called by the bus when a message is posted)
|
||||
void notifyMessageQueue(); // Notify the node of a new message from the bus
|
||||
|
||||
Bus &m_bus; // Reference to the bus this node is connected to
|
||||
|
||||
std::atomic_bool m_nodeWaitFlag = false; // Flag to control the node thread loop
|
||||
|
||||
struct
|
||||
{
|
||||
mutable std::mutex mtx; // Mutex for thread-safe access to the event queue
|
||||
std::queue<std::shared_ptr<Message>> messageQueue; // Queue of received events
|
||||
} m_dBusMessages;
|
||||
};
|
||||
//--------------------------------------------------------------
|
||||
} // namespace dBus
|
||||
Reference in New Issue
Block a user