sdi_toolBox
bus.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2026 - SD-Innovation S.A.S. - FRANCE
3*/
4
5/*
6ver: 2.x.x - build: 2026-04-28
7*/
8
9/*
10The zlib License Copyright (c) 2026 SD-Innovation S.A.S. This software is provided ‘as-is’, without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 3. This notice may not be removed or altered from any source distribution.
11*/
12
13#pragma once
14
15#include "defs.h"
16#include "inode.h"
17#include "message.h"
18
19#include <chrono>
20#include <memory>
21#include <mutex>
22#include <ranges>
23#include <set>
24#include <unordered_map>
25
27{
28//--------------------------------------------------------------
62class Bus final
63{
64 friend class Node;
65
66 using VectorNode = std::vector<INode *>;
67
68 public:
71
78 Bus();
79
83 ~Bus() = default;
84
90 Bus(const Bus &obj) = delete;
91
97 Bus(Bus &&obj) noexcept = delete;
98
104 Bus &operator=(const Bus &obj) = delete;
105
111 Bus &operator=(Bus &&obj) noexcept = delete;
112
116
127
141 void subscribe(INode *node, MessageTypeID eventType);
142
156 void unsubscribe(INode *node, MessageTypeID eventType);
157
171 void unsubscribeFromAll(INode *node);
172
184 [[nodiscard]] bool isSubscribed(const INode *node, MessageTypeID eventType) const;
185
189
203 void subscribeToBroadcast(INode *node);
204
217
228 [[nodiscard]] bool isSubscribedToBroadcast(INode *node) const; // Check if a node is subscribed to broadcast mode
229
233
254 template<class T, class... Args>
255 bool emit(Args &&...args);
256
270 bool post(const std::shared_ptr<Message> &message) const; // Post a message to the bus
271
273
274 private:
281 size_t postMessageToSubscribers(MessageTypeID eventType, const std::shared_ptr<Message> &message) const;
282
288 size_t postMessageToBroadcastSubscribers(const std::shared_ptr<Message> &message) const;
289
295 const VectorNode *getSubscribersForMessageType(MessageTypeID messageType) const; // Get the list of subscribers for a specific message type
296
298 TimePoint m_busStartTimestamp;
299
301 struct
302 {
303 mutable std::mutex mtx;
304 std::unordered_map<MessageTypeID, VectorNode> nodeList;
305 std::set<INode *> broadcastNodeList;
306 } m_routingTable;
307};
308//--------------------------------------------------------------
309
310//--------------------------------------------------------------
311/* Default constructor */
312inline Bus::Bus()
313{
314 // Initialization
315 m_busStartTimestamp = std::chrono::steady_clock::now();
316}
317//--------------------------------------------------------------
318/* Clear all subscriptions from the bus (remove all nodes from the routing table) */
320{
321 std::scoped_lock lock(m_routingTable.mtx);
322
323 m_routingTable.nodeList.clear(); // Clear all event subscriptions
324 m_routingTable.broadcastNodeList.clear(); // Clear all broadcast subscriptions
325}
326//--------------------------------------------------------------
327/* Subscribe a listener to a specific event type */
328inline void Bus::subscribe(INode *node, MessageTypeID eventType)
329{
330 // Sanity check
331 if (!node)
332 throw std::runtime_error("invalid node handle");
333
334 std::scoped_lock lock(m_routingTable.mtx);
335
336 if (!m_routingTable.nodeList.contains(eventType)) // Event type not registered yet, create a new entry with the node
337 {
338 m_routingTable.nodeList[eventType] = { node };
339 }
340 else // Event type already registered, add the node if not already subscribed
341 {
342 auto &nodes = m_routingTable.nodeList.at(eventType);
343 if (std::ranges::find(nodes, node) == nodes.end())
344 {
345 // Node not already subscribed, add it to the list
346 nodes.push_back(node);
347 }
348 }
349}
350//--------------------------------------------------------------
351/* Unsubscribe a listener from a specific event type */
352inline void Bus::unsubscribe(INode *node, MessageTypeID eventType)
353{
354 // Sanity check
355 if (!node)
356 throw std::runtime_error("invalid node handle");
357
358 std::scoped_lock lock(m_routingTable.mtx);
359
360 // Event type not recorded, nothing to do
361 if (!m_routingTable.nodeList.contains(eventType))
362 return;
363
364 // Event type registered, remove the listener if subscribed
365 auto &nodes = m_routingTable.nodeList.at(eventType);
366 std::erase(nodes, node);
367}
368//--------------------------------------------------------------
369/* Unsubscribe a listener from a specific event type */
371{
372 // Sanity check
373 if (!node)
374 throw std::runtime_error("invalid node handle");
375
376 std::scoped_lock lock(m_routingTable.mtx);
377
378 // Iterate through all event types and remove the node from each list
379 for (auto &nodeList : m_routingTable.nodeList | std::views::values)
380 std::erase(nodeList, node);
381
382 // Remove from broadcast subscriptions as well
383 m_routingTable.broadcastNodeList.erase(node);
384}
385//--------------------------------------------------------------
386/* Check if a listener is subscribed to a specific event type */
387inline bool Bus::isSubscribed(const INode *node, MessageTypeID eventType) const
388{
389 // Sanity check
390 if (!node)
391 throw std::runtime_error("invalid node handle");
392
393 std::scoped_lock lock(m_routingTable.mtx);
394
395 // Event type not recorded, node not subscribed
396 if (!m_routingTable.nodeList.contains(eventType))
397 return false;
398
399 // Event type recorded, check if the node is in the list
400 const auto &nodes = m_routingTable.nodeList.at(eventType);
401 return std::ranges::find(nodes, node) != nodes.end();
402}
403//--------------------------------------------------------------
404/* Subscribe a node to receive all messages (broadcast mode) */
406{
407 // Sanity check
408 if (!node)
409 throw std::runtime_error("invalid node handle");
410
411 std::scoped_lock lock(m_routingTable.mtx);
412
413 // Add the node to the broadcast nodes set
414 m_routingTable.broadcastNodeList.insert(node);
415}
416//--------------------------------------------------------------
417/* Unsubscribe a node from broadcast mode */
419{
420 // Sanity check
421 if (!node)
422 throw std::runtime_error("invalid node handle");
423
424 std::scoped_lock lock(m_routingTable.mtx);
425
426 // Remove the node from the broadcast nodes set
427 m_routingTable.broadcastNodeList.erase(node);
428}
429//--------------------------------------------------------------
430/* Check if a node is subscribed to broadcast mode */
431inline bool Bus::isSubscribedToBroadcast(INode *node) const
432{
433 // Sanity check
434 if (!node)
435 throw std::runtime_error("invalid node handle");
436
437 std::scoped_lock lock(m_routingTable.mtx);
438
439 // Check if the node is in the broadcast nodes set
440 return m_routingTable.broadcastNodeList.contains(node);
441}
442//--------------------------------------------------------------
443/* Emit a message of type T with the given arguments(create a message and post it to the bus) */
444template<class T, class... Args>
445bool Bus::emit(Args &&...args)
446{
447 static_assert(std::derived_from<T, Message>, "T must be derived from IMessage");
448
449 // Create a message of type T with the given arguments
450 auto message = std::make_shared<T>(std::forward<Args>(args)...);
451
452 // Post the message to the bus
453 return post(message);
454}
455//--------------------------------------------------------------
456/* Post a message to the bus */
457inline bool Bus::post(const std::shared_ptr<Message> &message) const
458{
459 std::scoped_lock lock(m_routingTable.mtx);
460
461 // Update message state
462 message->updateTimestamp(); // Update the timestamp of when the message was posted to the bus
463
464 // Post the message to all subscribers of the specific
465 // message type and get the number of subscribers that
466 // received the message
467 const auto subscriberCount = postMessageToSubscribers(message->getMessageTypeID(), message);
468
469 // Post the message to all broadcast subscribers and
470 // get the number of subscribers that received the
471 // message
472 (void)postMessageToBroadcastSubscribers(message);
473
474 return subscriberCount > 0;
475}
476//--------------------------------------------------------------
477/* Post a message to all subscribers of a specific event type and return the number of subscribers that received the message */
478inline size_t Bus::postMessageToSubscribers(const MessageTypeID eventType, const std::shared_ptr<Message> &message) const
479{
480 const auto subscriberList = getSubscribersForMessageType(eventType);
481 if (!subscriberList)
482 return 0;
483
484 for (const auto &node : *subscriberList)
485 node->append(message);
486
487 return subscriberList->size();
488}
489//--------------------------------------------------------------
490/* Post a message to all broadcast subscribers and return the number of subscribers that received the message */
491inline size_t Bus::postMessageToBroadcastSubscribers(const std::shared_ptr<Message> &message) const
492{
493 for (const auto &node : m_routingTable.broadcastNodeList)
494 node->append(message);
495
496 return m_routingTable.broadcastNodeList.size();
497}
498//--------------------------------------------------------------
499/* Get the list of subscribers for a specific message type */
500inline const Bus::VectorNode *Bus::getSubscribersForMessageType(const MessageTypeID messageType) const
501{
502 if (!m_routingTable.nodeList.contains(messageType))
503 return nullptr; // No subscribers for this message type
504
505 return &m_routingTable.nodeList.at(messageType);
506}
507//--------------------------------------------------------------
508} // namespace sdi_toolBox::desktop::eventBus
Central message dispatcher for the event bus system.
Definition bus.h:63
void unsubscribe(INode *node, MessageTypeID eventType)
Unsubscribe a node from a specific message type.
Definition bus.h:352
Bus()
Default constructor.
Definition bus.h:312
void subscribeToBroadcast(INode *node)
Subscribe a node to broadcast mode.
Definition bus.h:405
std::set< INode * > broadcastNodeList
Set of nodes subscribed to receive all messages (broadcast mode)
Definition bus.h:305
Bus(Bus &&obj) noexcept=delete
Move constructor - deleted.
Bus(const Bus &obj)=delete
Copy constructor - deleted.
std::unordered_map< MessageTypeID, VectorNode > nodeList
Map of message type IDs to lists of subscribed nodes.
Definition bus.h:304
bool isSubscribedToBroadcast(INode *node) const
Check whether a node is subscribed to broadcast mode.
Definition bus.h:431
bool isSubscribed(const INode *node, MessageTypeID eventType) const
Check whether a node is subscribed to a specific message type.
Definition bus.h:387
bool post(const std::shared_ptr< Message > &message) const
Post an already constructed message to the bus.
Definition bus.h:457
bool emit(Args &&...args)
Construct and emit a message of type T to the bus.
Definition bus.h:445
void subscribe(INode *node, MessageTypeID eventType)
Subscribe a node to a specific message type.
Definition bus.h:328
~Bus()=default
Default destructor.
void clearAllSubscriptions()
Remove all subscriptions from the routing table.
Definition bus.h:319
Bus & operator=(Bus &&obj) noexcept=delete
Move assignment operator - deleted.
std::mutex mtx
Mutex for thread-safe access to the nodes map.
Definition bus.h:303
void unsubscribeFromBroadcast(INode *node)
Unsubscribe a node from broadcast mode.
Definition bus.h:418
void unsubscribeFromAll(INode *node)
Unsubscribe a node from all message types and broadcast mode.
Definition bus.h:370
Bus & operator=(const Bus &obj)=delete
Copy assignment operator - deleted.
Abstract interface representing a subscriber node in the event bus system.
Definition inode.h:42
Concrete subscriber node in the event bus system.
Definition node.h:64
Lightweight thread-safe event bus for decoupled message passing.
Definition bus.h:27
uint64_t MessageTypeID
Unique identifier for a message type.
Definition defs.h:21
std::chrono::steady_clock::time_point TimePoint
Monotonic timestamp type used throughout the event bus.
Definition defs.h:22