Emesary: lock free performance improvements

After testing with multithreading there were still problems related to
scope and transmitters - because a transmitter that was on the stack
as an object could go out of scope before a notify all had finished
and lead to invalid data and a crash

The solution is to always have Recipients allocated via new() and to
use a shared pointer to manage the scope and garbage collect when the
last reference goes out of scope.

To achieve the threadsafe minimal locking the rules are as follows;

* All recipients must be allocated via new()
* Register and DeRegister will only happen when on the outermost
  level of NotifyAll.
 - all other add or delete will be put into the pending queue and
   added at the start of the next outer notify all
 - at outer level notify all a lock will be taken to process pending
   items
* during the main notify processing there will be no locks - however
  it is also assumed that the recipient list will not be changed
This commit is contained in:
Richard Harrison 2021-04-11 00:42:49 +02:00
parent 739c76e862
commit 9b3265c46c
5 changed files with 169 additions and 94 deletions

View File

@ -19,13 +19,16 @@
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
#include <simgear/structure/SGReferenced.hxx>
namespace simgear
{
namespace Emesary
{
/// Interface (base class) for a recipeint.
class IReceiver
class IReceiver : public SGReferenced
{
public:
/// Receive notification - must be implemented

View File

@ -20,10 +20,12 @@
*
*---------------------------------------------------------------------------*/
#include <simgear/structure/SGSharedPtr.hxx>
namespace simgear
{
namespace Emesary
{
typedef SGSharedPtr<IReceiver> IReceiverPtr;
/// Interface (base clasee) for a transmitter.
/// Transmits Message derived objects. Each instance of this class provides a
/// event/databus to which any number of receivers can attach to.
@ -31,9 +33,9 @@ namespace simgear
{
public:
// Registers a recipient to receive message from this transmitter
virtual void Register(IReceiver& R) = 0;
virtual void Register(IReceiverPtr R) = 0;
// Removes a recipient from from this transmitter
virtual void DeRegister(IReceiver& R) = 0;
virtual void DeRegister(IReceiverPtr R) = 0;
//Notify all registered recipients. Stop when receipt status of abort or finished are received.
@ -45,7 +47,7 @@ namespace simgear
virtual ReceiptStatus NotifyAll(INotification& M) = 0;
/// number of recipients
virtual int Count() = 0;
virtual size_t Count() const = 0;
};
}
}

View File

@ -24,30 +24,30 @@ namespace simgear
{
namespace Emesary
{
enum ReceiptStatus
enum class ReceiptStatus
{
/// Processing completed successfully
ReceiptStatusOK = 0,
OK = 0,
/// Individual item failure
ReceiptStatusFail = 1,
Fail = 1,
/// Fatal error; stop processing any further recipieints of this message. Implicitly fail
ReceiptStatusAbort = 2,
Abort = 2,
/// Definitive completion - do not send message to any further recipieints
ReceiptStatusFinished = 3,
Finished = 3,
/// Return value when method doesn't process a message.
ReceiptStatusNotProcessed = 4,
NotProcessed = 4,
/// Message has been sent but the return status cannot be determined as it has not been processed by the recipient.
/// e.g. a queue or outgoing bridge
ReceiptStatusPending = 5,
Pending = 5,
/// Message has been definitively handled but the return value cannot be determined. The message will not be sent any further
/// e.g. a point to point forwarding bridge
ReceiptStatusPendingFinished = 6,
PendingFinished = 6,
};
}
}

View File

@ -23,7 +23,7 @@
#include <algorithm>
#include <string>
#include <list>
#include <vector>
#include <set>
#include <vector>
#include <atomic>
@ -37,16 +37,25 @@ namespace simgear
class Transmitter : public ITransmitter
{
protected:
typedef std::list<IReceiver *> RecipientList;
typedef std::vector<IReceiverPtr> RecipientList;
RecipientList recipient_list;
RecipientList deleted_recipients;
int CurrentRecipientIndex = 0;
RecipientList new_recipient_list;
RecipientList deleted_recipient_list;
std::mutex _lock;
std::atomic<int> receiveDepth;
std::atomic<int> sentMessageCount;
std::atomic<size_t> receiveDepth;
std::atomic<size_t> sentMessageCount;
std::atomic<size_t> recipientCount;
std::atomic<size_t> pendingDeletions;
std::atomic<size_t> pendingAdditions;
public:
Transmitter() : receiveDepth(0), sentMessageCount(0)
Transmitter() :
receiveDepth(0),
sentMessageCount(0),
recipientCount(0),
pendingDeletions(0),
pendingAdditions(0)
{
}
@ -59,31 +68,97 @@ namespace simgear
// the sequence of registration and message receipt can influence the way messages are processing
// when ReceiptStatus of Abort or Finished are encountered. So it was a deliberate decision that the
// most recently registered recipients should process the messages/events first.
virtual void Register(IReceiver& r)
virtual void Register(IReceiverPtr r)
{
std::lock_guard<std::mutex> scopeLock(_lock);
recipient_list.push_back(&r);
r.OnRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &r) != deleted_recipients.end())
deleted_recipients.remove(&r);
}
// Removes an object from receving message from this transmitter
virtual void DeRegister(IReceiver& R)
RecipientList::iterator deleted_location = std::find(deleted_recipient_list.begin(), deleted_recipient_list.end(), r);
if (deleted_location != deleted_recipient_list.end())
deleted_recipient_list.erase(deleted_location);
RecipientList::iterator location = std::find(recipient_list.begin(), recipient_list.end(), r);
if (location == recipient_list.end())
{
std::lock_guard<std::mutex> scopeLock(_lock);
if (recipient_list.size())
{
if (std::find(recipient_list.begin(), recipient_list.end(), &R) != recipient_list.end())
{
recipient_list.remove(&R);
R.OnDeRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &R) == deleted_recipients.end())
deleted_recipients.push_back(&R);
RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r);
if (location == new_recipient_list.end()) {
new_recipient_list.insert(new_recipient_list.begin(), r);
pendingAdditions++;
}
}
}
// Removes an object from receving message from this transmitter
virtual void DeRegister(IReceiverPtr r)
{
std::lock_guard<std::mutex> scopeLock(_lock);
if (new_recipient_list.size())
{
RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r);
if (location != new_recipient_list.end())
new_recipient_list.erase(location);
}
deleted_recipient_list.push_back(r);
pendingDeletions++;
}
// this will purge the recipients that are marked as deleted
// it will only do this when the receive depth is zero - i.e. this
// notification is being sent out from outside a recipient notify.(because it is
// fine for a recipient to retransmit another notification as a result of receiving a notification)
//
// also we can quickly check to see if we have any pending deletions before doing anything.
void AddRemoveIFAppropriate()
{
std::lock_guard<std::mutex> scopeLock(_lock);
/// handle pending deletions first.
if (pendingDeletions > 0) {
///
/// remove deleted recipients from the main list.
std::for_each(deleted_recipient_list.begin(), deleted_recipient_list.end(),
[this](IReceiverPtr r) {
RecipientList::iterator location = std::find(recipient_list.begin(), recipient_list.end(), r);
if (location != recipient_list.end()) {
r->OnDeRegisteredAtTransmitter(this);
recipient_list.erase(location);
}
});
recipientCount -= pendingDeletions;
deleted_recipient_list.erase(deleted_recipient_list.begin(), deleted_recipient_list.end());
pendingDeletions = 0; // can do this because we are guarded
}
if (pendingAdditions) {
/// firstly remove items from the new list that are already in the list
std::for_each(recipient_list.begin(), recipient_list.end(),
[this](IReceiverPtr r) {
RecipientList::iterator location = std::find(new_recipient_list.begin(), new_recipient_list.end(), r);
if (location != new_recipient_list.end()) {
new_recipient_list.erase(location);
pendingAdditions--;
}
});
std::for_each(new_recipient_list.begin(), new_recipient_list.end(),
[this](IReceiverPtr r) {
r->OnRegisteredAtTransmitter(this);
});
recipient_list.insert(recipient_list.begin(),
std::make_move_iterator(new_recipient_list.begin()),
std::make_move_iterator(new_recipient_list.end()));
new_recipient_list.erase(new_recipient_list.begin(), new_recipient_list.end());
recipientCount += pendingAdditions;
pendingAdditions = 0;
}
}
// Notify all registered recipients. Stop when receipt status of abort or finished are received.
// The receipt status from this method will be
// - OK > message handled
@ -92,95 +167,90 @@ namespace simgear
// allows for usages such as access controls.
virtual ReceiptStatus NotifyAll(INotification& M)
{
ReceiptStatus return_status = ReceiptStatusNotProcessed;
ReceiptStatus return_status = ReceiptStatus::NotProcessed;
auto v = receiveDepth.fetch_add(1, std::memory_order_relaxed);
if (v == 0)
AddRemoveIFAppropriate();
sentMessageCount++;
std::vector<IReceiver*> temp;
{
std::lock_guard<std::mutex> scopeLock(_lock);
if (receiveDepth == 0)
deleted_recipients.clear();
receiveDepth++;
bool finished = false;
for (RecipientList::iterator i = recipient_list.begin(); i != recipient_list.end(); i++)
size_t idx = 0;
do {
if (idx < recipient_list.size()) {
IReceiverPtr R = recipient_list[idx++];
if (R != nullptr)
{
temp.push_back(*i);
}
}
int tempSize = temp.size();
for (int index = 0; index < tempSize; index++)
{
IReceiver* R = temp[index];
{
std::lock_guard<std::mutex> scopeLock(_lock);
if (deleted_recipients.size())
{
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), R) != deleted_recipients.end())
{
continue;
}
}
}
if (R)
{
ReceiptStatus rstat = R->Receive(M);
Emesary::ReceiptStatus rstat = R->Receive(M);
switch (rstat)
{
case ReceiptStatusFail:
return_status = ReceiptStatusFail;
case ReceiptStatus::Fail:
return_status = ReceiptStatus::Fail;
break;
case ReceiptStatusPending:
return_status = ReceiptStatusPending;
case ReceiptStatus::Pending:
return_status = ReceiptStatus::Pending;
break;
case ReceiptStatusPendingFinished:
return rstat;
case ReceiptStatusNotProcessed:
case ReceiptStatus::PendingFinished:
return_status = rstat;
finished = true;
break;
case ReceiptStatusOK:
if (return_status == ReceiptStatusNotProcessed)
case ReceiptStatus::NotProcessed:
break;
case ReceiptStatus::OK:
if (return_status == ReceiptStatus::NotProcessed)
return_status = rstat;
break;
case ReceiptStatusAbort:
return ReceiptStatusAbort;
case ReceiptStatus::Abort:
finished = true;
return_status = ReceiptStatus::Abort;
break;
case ReceiptStatusFinished:
return ReceiptStatusOK;
case ReceiptStatus::Finished:
finished = true;
return_status = ReceiptStatus::OK;;
break;
}
}
}
else
break;
} while (!finished);
receiveDepth--;
return return_status;
}
// number of currently registered recipients
virtual int Count()
{
std::lock_guard<std::mutex> scopeLock(_lock);
return recipient_list.size();
}
// number of sent messages.
int SentMessageCount()
int SentMessageCount() const
{
return sentMessageCount;
}
// number of currently registered recipients
// better to avoid using the size members on the list directly as
// using the atomics is lock free and threadsafe.
virtual size_t Count() const
{
return (recipientCount + pendingAdditions) - pendingDeletions;
}
// ascertain if a receipt status can be interpreted as failure.
static bool Failed(ReceiptStatus receiptStatus)
{
//
// failed is either Fail or Abort.
// NotProcessed isn't a failure because it hasn't been processed.
return receiptStatus == ReceiptStatusFail
|| receiptStatus == ReceiptStatusAbort;
return receiptStatus == ReceiptStatus::Fail
|| receiptStatus == ReceiptStatus::Abort;
}
};
}

View File

@ -64,10 +64,10 @@ namespace nasal
class NasalMainLoopRecipient : public simgear::Emesary::IReceiver {
public:
NasalMainLoopRecipient() : receiveCount(0), CanWait(false), Active(false) {
simgear::Emesary::GlobalTransmitter::instance()->Register(*this);
simgear::Emesary::GlobalTransmitter::instance()->Register(this);
}
virtual ~NasalMainLoopRecipient() {
simgear::Emesary::GlobalTransmitter::instance()->DeRegister(*this);
simgear::Emesary::GlobalTransmitter::instance()->DeRegister(this);
}
std::atomic<int> receiveCount;
@ -99,16 +99,16 @@ namespace nasal
gct.terminate();
break;
}
return simgear::Emesary::ReceiptStatusOK;
return simgear::Emesary::ReceiptStatus::OK;
}
auto *gccn = dynamic_cast<simgear::Notifications::NasalGarbageCollectionConfigurationNotification *>(&n);
if (gccn) {
CanWait = gccn->GetCanWait();
Active = gccn->GetActive();
return simgear::Emesary::ReceiptStatusOK;
return simgear::Emesary::ReceiptStatus::OK;
}
return simgear::Emesary::ReceiptStatusNotProcessed;
return simgear::Emesary::ReceiptStatus::NotProcessed;
}
protected:
bool CanWait;