sdi_toolBox
node.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2026 - SD-Innovation S.A.S. - FRANCE
3*/
4
5/*
6ver: 2.x.x - build: 2026-04-28
7*/
8
9/*
10The zlib License Copyright (c) 2026 SD-Innovation S.A.S. This software is provided ‘as-is’, without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 3. This notice may not be removed or altered from any source distribution.
11*/
12
13#pragma once
14
15#include "bus.h"
16#include "inode.h"
17
18#include <mutex>
19#include <queue>
20
22{
23//--------------------------------------------------------------
63class Node : public INode
64{
65 public:
68
69 Node() = delete;
70
79 virtual ~Node();
80
86 Node(const Node &obj) = delete;
87
93 Node(Node &&obj) noexcept = delete;
94
100 Node &operator=(const Node &obj) = delete;
101
107 Node &operator=(Node &&obj) noexcept = delete;
108
115 explicit Node(Bus &bus);
116
120
137 void syncWaitForMessage();
138
142
152 void subscribe(MessageTypeID eventType);
153
163 void unsubscribe(MessageTypeID eventType);
164
173 void unsubscribeFromAll();
174
178
193 template<class T, class... Args>
194 bool emit(Args &&...args);
195
207 bool post(const std::shared_ptr<Message> &message) const;
208
212
219 size_t getMessageCount() const;
220
231 std::shared_ptr<Message> popMessage(); // Pop a message from the node's message queue (remove and return the front message)
232
234
235 private:
245 void append(const std::shared_ptr<Message> &message) override;
246
254 void messageNotify();
255
256 Bus &m_bus;
257 std::atomic_bool m_nodeWaitingFlag{ false };
258
260 struct
261 {
262 mutable std::mutex mtx;
263 std::queue<std::shared_ptr<Message>> messageQueue;
264 } m_busMessages;
265};
266
267//--------------------------------------------------------------
268/* Constructor */
269inline Node::Node(Bus &bus)
270 : m_bus(bus)
271{
272 // Nothing to do here
273}
274//--------------------------------------------------------------
275/* Default destructor */
277{
278 // Unsubscribe from all event types when the node is destroyed
280
281 // Notify the bus that the node is being destroyed (in case it is waiting for a message)
282 messageNotify();
283}
284//--------------------------------------------------------------
285/* Synchronously wait for a message to be posted to the bus and received by the node (block the calling thread until a message is received) */
287{
288 // Wait until at least one message is received in the node's
289 // message queue
290 m_nodeWaitingFlag = false;
291 m_nodeWaitingFlag.wait(false);
292}
293//--------------------------------------------------------------
294/* Subscribe to receive messages of a specific event type */
295inline void Node::subscribe(const MessageTypeID eventType)
296{
297 m_bus.subscribe(this, eventType);
298}
299//--------------------------------------------------------------
300/* Unsubscribe from receiving messages of a specific event type */
301inline void Node::unsubscribe(const MessageTypeID eventType)
302{
303 m_bus.unsubscribe(this, eventType);
304}
305//--------------------------------------------------------------
306/* Unsubscribe from receiving messages of all event types */
308{
309 m_bus.unsubscribeFromAll(this);
310}
311//--------------------------------------------------------------
312/* Emit a message of type T with the given arguments (create a message and post it to the bus) */
313template<class T, class... Args>
314bool Node::emit(Args &&...args)
315{
316 return m_bus.emit<T>(std::forward<Args>(args)...);
317}
318//--------------------------------------------------------------
319/* Post a message to the bus */
320inline bool Node::post(const std::shared_ptr<Message> &message) const
321{
322 return m_bus.post(message);
323}
324//--------------------------------------------------------------
325/* Get the number of messages in the node's message queue */
326inline size_t Node::getMessageCount() const
327{
328 std::scoped_lock lock(m_busMessages.mtx);
329
330 return m_busMessages.messageQueue.size();
331}
332//--------------------------------------------------------------
333/* Pop a message from the node's message queue (remove and return the front message) */
334inline std::shared_ptr<Message> Node::popMessage()
335{
336 std::scoped_lock lock(m_busMessages.mtx);
337
338 if (m_busMessages.messageQueue.empty())
339 return nullptr; // No messages in the queue
340
341 auto message = m_busMessages.messageQueue.front(); // Get the front message
342 m_busMessages.messageQueue.pop(); // Remove the front message from the queue
343
344 return message; // Return the popped message
345}
346//--------------------------------------------------------------
347/* Insert a message into the node's message queue (called by the bus when a message is posted to the bus) */
348inline void Node::append(const std::shared_ptr<Message> &message)
349{
350 std::scoped_lock lock(m_busMessages.mtx);
351
352 m_busMessages.messageQueue.push(message);
353
354 // If the node is waiting for a message, notify it that a
355 // message has been received
356 messageNotify();
357}
358//--------------------------------------------------------------
359/* Notify the node that a message has been received (used for synchronous waiting) */
360inline void Node::messageNotify()
361{
362 if (!m_nodeWaitingFlag.load())
363 {
364 m_nodeWaitingFlag = true;
365 m_nodeWaitingFlag.notify_one();
366 }
367}
368//--------------------------------------------------------------
369} // namespace sdi_toolBox::desktop::eventBus
Central message dispatcher for the event bus system.
Definition bus.h:63
void unsubscribe(INode *node, MessageTypeID eventType)
Unsubscribe a node from a specific message type.
Definition bus.h:352
bool post(const std::shared_ptr< Message > &message) const
Post an already constructed message to the bus.
Definition bus.h:457
bool emit(Args &&...args)
Construct and emit a message of type T to the bus.
Definition bus.h:445
void subscribe(INode *node, MessageTypeID eventType)
Subscribe a node to a specific message type.
Definition bus.h:328
void unsubscribeFromAll(INode *node)
Unsubscribe a node from all message types and broadcast mode.
Definition bus.h:370
Abstract interface representing a subscriber node in the event bus system.
Definition inode.h:42
Concrete subscriber node in the event bus system.
Definition node.h:64
Node & operator=(Node &&obj) noexcept=delete
Move assignment operator - deleted.
Node(Node &&obj) noexcept=delete
Move constructor - deleted.
void subscribe(MessageTypeID eventType)
Subscribe this node to a specific message type.
Definition node.h:295
std::shared_ptr< Message > popMessage()
Remove and return the front message from the node's queue (FIFO).
Definition node.h:334
virtual ~Node()
Destructor.
Definition node.h:276
Node & operator=(const Node &obj)=delete
Copy assignment operator - deleted.
Node(const Node &obj)=delete
Copy constructor - deleted.
std::queue< std::shared_ptr< Message > > messageQueue
Queue of messages received by the node.
Definition node.h:263
void syncWaitForMessage()
Block the calling thread until a message is received.
Definition node.h:286
bool post(const std::shared_ptr< Message > &message) const
Post an already constructed message through the bus.
Definition node.h:320
bool emit(Args &&...args)
Construct and emit a message of type T through the bus.
Definition node.h:314
Node()=delete
Default constructor - deleted. A Bus reference must be provided.
void unsubscribe(MessageTypeID eventType)
Unsubscribe this node from a specific message type.
Definition node.h:301
size_t getMessageCount() const
Get the number of messages currently in the node's queue.
Definition node.h:326
std::mutex mtx
Mutex for thread-safe access to the message queue.
Definition node.h:262
void unsubscribeFromAll()
Unsubscribe this node from all message types and broadcast mode.
Definition node.h:307
void append(const std::shared_ptr< Message > &message) override
Append a message to the node's internal queue.
Definition node.h:348
Lightweight thread-safe event bus for decoupled message passing.
Definition bus.h:27
uint64_t MessageTypeID
Unique identifier for a message type.
Definition defs.h:21