diff --git a/simgear/emesary/IReceiver.hxx b/simgear/emesary/IReceiver.hxx index f21a8a7c..bd629087 100644 --- a/simgear/emesary/IReceiver.hxx +++ b/simgear/emesary/IReceiver.hxx @@ -19,25 +19,28 @@ * Copyright (C)2019 Richard Harrison Licenced under GPL2 or later. * *---------------------------------------------------------------------------*/ + +#include + namespace simgear { namespace Emesary { /// Interface (base class) for a recipeint. - class IReceiver + class IReceiver : public SGReferenced { public: /// Receive notification - must be implemented virtual ReceiptStatus Receive(INotification& message) = 0; /// Called when registered at a transmitter - virtual void OnRegisteredAtTransmitter(class Transmitter *p) + virtual void OnRegisteredAtTransmitter(class Transmitter* p) { } /// Called when de-registered at a transmitter - virtual void OnDeRegisteredAtTransmitter(class Transmitter *p) + virtual void OnDeRegisteredAtTransmitter(class Transmitter* p) { } }; diff --git a/simgear/emesary/ITransmitter.hxx b/simgear/emesary/ITransmitter.hxx index cd4f750c..0b4c6386 100644 --- a/simgear/emesary/ITransmitter.hxx +++ b/simgear/emesary/ITransmitter.hxx @@ -20,10 +20,12 @@ * *---------------------------------------------------------------------------*/ +#include namespace simgear { namespace Emesary { + typedef SGSharedPtr IReceiverPtr; /// Interface (base clasee) for a transmitter. /// Transmits Message derived objects. Each instance of this class provides a /// event/databus to which any number of receivers can attach to. @@ -31,9 +33,9 @@ namespace simgear { public: // Registers a recipient to receive message from this transmitter - virtual void Register(IReceiver& R) = 0; + virtual void Register(IReceiverPtr R) = 0; // Removes a recipient from from this transmitter - virtual void DeRegister(IReceiver& R) = 0; + virtual void DeRegister(IReceiverPtr R) = 0; //Notify all registered recipients. Stop when receipt status of abort or finished are received. @@ -45,7 +47,7 @@ namespace simgear virtual ReceiptStatus NotifyAll(INotification& M) = 0; /// number of recipients - virtual int Count() = 0; + virtual size_t Count() const = 0; }; } } diff --git a/simgear/emesary/ReceiptStatus.hxx b/simgear/emesary/ReceiptStatus.hxx index 4620e00e..e4582e62 100644 --- a/simgear/emesary/ReceiptStatus.hxx +++ b/simgear/emesary/ReceiptStatus.hxx @@ -24,30 +24,30 @@ namespace simgear { namespace Emesary { - enum ReceiptStatus + enum class ReceiptStatus { /// Processing completed successfully - ReceiptStatusOK = 0, + OK = 0, /// Individual item failure - ReceiptStatusFail = 1, + Fail = 1, /// Fatal error; stop processing any further recipieints of this message. Implicitly fail - ReceiptStatusAbort = 2, + Abort = 2, /// Definitive completion - do not send message to any further recipieints - ReceiptStatusFinished = 3, + Finished = 3, /// Return value when method doesn't process a message. - ReceiptStatusNotProcessed = 4, + NotProcessed = 4, /// Message has been sent but the return status cannot be determined as it has not been processed by the recipient. /// e.g. a queue or outgoing bridge - ReceiptStatusPending = 5, + Pending = 5, /// Message has been definitively handled but the return value cannot be determined. The message will not be sent any further /// e.g. a point to point forwarding bridge - ReceiptStatusPendingFinished = 6, + PendingFinished = 6, }; } } diff --git a/simgear/emesary/Transmitter.hxx b/simgear/emesary/Transmitter.hxx index dfc6aad1..1cd1beaa 100644 --- a/simgear/emesary/Transmitter.hxx +++ b/simgear/emesary/Transmitter.hxx @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include #include @@ -37,16 +37,25 @@ namespace simgear class Transmitter : public ITransmitter { protected: - typedef std::list RecipientList; + typedef std::vector RecipientList; RecipientList recipient_list; - RecipientList deleted_recipients; - int CurrentRecipientIndex = 0; + RecipientList new_recipient_list; + RecipientList deleted_recipient_list; + std::mutex _lock; - std::atomic receiveDepth; - std::atomic sentMessageCount; + std::atomic receiveDepth; + std::atomic sentMessageCount; + std::atomic recipientCount; + std::atomic pendingDeletions; + std::atomic pendingAdditions; public: - Transmitter() : receiveDepth(0), sentMessageCount(0) + Transmitter() : + receiveDepth(0), + sentMessageCount(0), + recipientCount(0), + pendingDeletions(0), + pendingAdditions(0) { } @@ -59,31 +68,97 @@ namespace simgear // the sequence of registration and message receipt can influence the way messages are processing // when ReceiptStatus of Abort or Finished are encountered. So it was a deliberate decision that the // most recently registered recipients should process the messages/events first. - virtual void Register(IReceiver& r) + virtual void Register(IReceiverPtr r) { std::lock_guard scopeLock(_lock); - recipient_list.push_back(&r); - r.OnRegisteredAtTransmitter(this); - if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &r) != deleted_recipients.end()) - deleted_recipients.remove(&r); + + RecipientList::iterator deleted_location = std::find(deleted_recipient_list.begin(), deleted_recipient_list.end(), r); + if (deleted_location != deleted_recipient_list.end()) + deleted_recipient_list.erase(deleted_location); + + RecipientList::iterator location = std::find(recipient_list.begin(), recipient_list.end(), r); + if (location == recipient_list.end()) + { + RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r); + if (location == new_recipient_list.end()) { + new_recipient_list.insert(new_recipient_list.begin(), r); + pendingAdditions++; + } + } } // Removes an object from receving message from this transmitter - virtual void DeRegister(IReceiver& R) + virtual void DeRegister(IReceiverPtr r) { std::lock_guard scopeLock(_lock); - if (recipient_list.size()) - { - if (std::find(recipient_list.begin(), recipient_list.end(), &R) != recipient_list.end()) - { - recipient_list.remove(&R); - R.OnDeRegisteredAtTransmitter(this); - if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &R) == deleted_recipients.end()) - deleted_recipients.push_back(&R); - } - } - } + if (new_recipient_list.size()) + { + RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r); + + if (location != new_recipient_list.end()) + new_recipient_list.erase(location); + } + deleted_recipient_list.push_back(r); + pendingDeletions++; + } + + // this will purge the recipients that are marked as deleted + // it will only do this when the receive depth is zero - i.e. this + // notification is being sent out from outside a recipient notify.(because it is + // fine for a recipient to retransmit another notification as a result of receiving a notification) + // + // also we can quickly check to see if we have any pending deletions before doing anything. + void AddRemoveIFAppropriate() + { + std::lock_guard scopeLock(_lock); + + /// handle pending deletions first. + if (pendingDeletions > 0) { + + /// + /// remove deleted recipients from the main list. + std::for_each(deleted_recipient_list.begin(), deleted_recipient_list.end(), + [this](IReceiverPtr r) { + + RecipientList::iterator location = std::find(recipient_list.begin(), recipient_list.end(), r); + if (location != recipient_list.end()) { + r->OnDeRegisteredAtTransmitter(this); + recipient_list.erase(location); + } + }); + recipientCount -= pendingDeletions; + deleted_recipient_list.erase(deleted_recipient_list.begin(), deleted_recipient_list.end()); + pendingDeletions = 0; // can do this because we are guarded + } + + if (pendingAdditions) { + /// firstly remove items from the new list that are already in the list + std::for_each(recipient_list.begin(), recipient_list.end(), + [this](IReceiverPtr r) { + + RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r); + if (location != new_recipient_list.end()) { + new_recipient_list.erase(location); + pendingAdditions--; + } + }); + + + std::for_each(new_recipient_list.begin(), new_recipient_list.end(), + [this](IReceiverPtr r) { + r->OnRegisteredAtTransmitter(this); + }); + recipient_list.insert(recipient_list.begin(), + std::make_move_iterator(new_recipient_list.begin()), + std::make_move_iterator(new_recipient_list.end())); + + new_recipient_list.erase(new_recipient_list.begin(), new_recipient_list.end()); + recipientCount += pendingAdditions; + pendingAdditions = 0; + + } + } // Notify all registered recipients. Stop when receipt status of abort or finished are received. // The receipt status from this method will be // - OK > message handled @@ -92,95 +167,90 @@ namespace simgear // allows for usages such as access controls. virtual ReceiptStatus NotifyAll(INotification& M) { - ReceiptStatus return_status = ReceiptStatusNotProcessed; + ReceiptStatus return_status = ReceiptStatus::NotProcessed; + + auto v = receiveDepth.fetch_add(1, std::memory_order_relaxed); + if (v == 0) + AddRemoveIFAppropriate(); sentMessageCount++; - std::vector temp; - { - std::lock_guard scopeLock(_lock); - if (receiveDepth == 0) - deleted_recipients.clear(); - receiveDepth++; + bool finished = false; - for (RecipientList::iterator i = recipient_list.begin(); i != recipient_list.end(); i++) - { - temp.push_back(*i); - } - } - int tempSize = temp.size(); - for (int index = 0; index < tempSize; index++) + size_t idx = 0; + do { + + if (idx < recipient_list.size()) { + IReceiverPtr R = recipient_list[idx++]; + + if (R != nullptr) { - IReceiver* R = temp[index]; - { - std::lock_guard scopeLock(_lock); - if (deleted_recipients.size()) - { - if (std::find(deleted_recipients.begin(), deleted_recipients.end(), R) != deleted_recipients.end()) - { - continue; - } - } - } - if (R) - { - ReceiptStatus rstat = R->Receive(M); + Emesary::ReceiptStatus rstat = R->Receive(M); switch (rstat) { - case ReceiptStatusFail: - return_status = ReceiptStatusFail; + case ReceiptStatus::Fail: + return_status = ReceiptStatus::Fail; break; - case ReceiptStatusPending: - return_status = ReceiptStatusPending; + case ReceiptStatus::Pending: + return_status = ReceiptStatus::Pending; break; - case ReceiptStatusPendingFinished: - return rstat; + case ReceiptStatus::PendingFinished: + return_status = rstat; + finished = true; + break; - case ReceiptStatusNotProcessed: + case ReceiptStatus::NotProcessed: break; - case ReceiptStatusOK: - if (return_status == ReceiptStatusNotProcessed) + case ReceiptStatus::OK: + if (return_status == ReceiptStatus::NotProcessed) return_status = rstat; break; - case ReceiptStatusAbort: - return ReceiptStatusAbort; + case ReceiptStatus::Abort: + finished = true; + return_status = ReceiptStatus::Abort; + break; - case ReceiptStatusFinished: - return ReceiptStatusOK; + case ReceiptStatus::Finished: + finished = true; + return_status = ReceiptStatus::OK;; + break; } } } + else + break; + } while (!finished); receiveDepth--; return return_status; } - // number of currently registered recipients - virtual int Count() - { - std::lock_guard scopeLock(_lock); - return recipient_list.size(); - } - // number of sent messages. - int SentMessageCount() + int SentMessageCount() const { return sentMessageCount; } + // number of currently registered recipients + // better to avoid using the size members on the list directly as + // using the atomics is lock free and threadsafe. + virtual size_t Count() const + { + return (recipientCount + pendingAdditions) - pendingDeletions; + } // ascertain if a receipt status can be interpreted as failure. static bool Failed(ReceiptStatus receiptStatus) { // // failed is either Fail or Abort. // NotProcessed isn't a failure because it hasn't been processed. - return receiptStatus == ReceiptStatusFail - || receiptStatus == ReceiptStatusAbort; + return receiptStatus == ReceiptStatus::Fail + || receiptStatus == ReceiptStatus::Abort; } }; } diff --git a/simgear/nasal/cppbind/NasalEmesaryInterface.hxx b/simgear/nasal/cppbind/NasalEmesaryInterface.hxx index a25ab7bf..d9411fa1 100644 --- a/simgear/nasal/cppbind/NasalEmesaryInterface.hxx +++ b/simgear/nasal/cppbind/NasalEmesaryInterface.hxx @@ -64,10 +64,10 @@ namespace nasal class NasalMainLoopRecipient : public simgear::Emesary::IReceiver { public: NasalMainLoopRecipient() : receiveCount(0), CanWait(false), Active(false) { - simgear::Emesary::GlobalTransmitter::instance()->Register(*this); + simgear::Emesary::GlobalTransmitter::instance()->Register(this); } virtual ~NasalMainLoopRecipient() { - simgear::Emesary::GlobalTransmitter::instance()->DeRegister(*this); + simgear::Emesary::GlobalTransmitter::instance()->DeRegister(this); } std::atomic receiveCount; @@ -99,16 +99,16 @@ namespace nasal gct.terminate(); break; } - return simgear::Emesary::ReceiptStatusOK; + return simgear::Emesary::ReceiptStatus::OK; } auto *gccn = dynamic_cast(&n); if (gccn) { CanWait = gccn->GetCanWait(); Active = gccn->GetActive(); - return simgear::Emesary::ReceiptStatusOK; + return simgear::Emesary::ReceiptStatus::OK; } - return simgear::Emesary::ReceiptStatusNotProcessed; + return simgear::Emesary::ReceiptStatus::NotProcessed; } protected: bool CanWait;