Added Emesary

derived from: https://github.com/Zaretto/Emesary/tree/master/src/Emesary/EmesaryC++

This is a similar implementation to that in Nasal; currently this is only for use in C++ modules as there is no Nasal interface - however this is something that may be added later.

The basic premise of the Emesary messaging system is to allow the decoupled operation of the various components that comprise any system - usually within the same process.

The basic unit of communication is a Notification, which is passed around to any and or all objects that implement the IReceive interface (pure virtual class). Using Interfaces and ihneritance
it is possible to pass around Notifications that have special meanings to certain objects and allow them to perform the appropriate function.

Notifications are created and sent via a call to NotifyAll. Any object within the system can inherit from IReceive and register itself with a Transmitter to receive all notifications that are sent out.

The underlying concept is that one part of a system knows that something needs to be done without needing to know how to do it. The part of the system that needs something done simply creates a notification and sends it out. Once received by the part of the system that is capable of performing the requested Notification the relevant actions will be carried out and a status of OK or Finished returned.

The return code from the Receive method must follow the guidlines as follows:

If a notification is not recognised in the recipient must return ReceiptStatusNotProcessed. Normally a recipient will return ReceiptStatusOK, sometimes ReceiptStatusFail to indicate that the recipient could not properly process the notification.

The overall result of a call to NotifyAll is a composite of the results of each individual recipient's return value; and generally OK or Fail. It is acceptable to use the return code to ascertain success or failure and take appropriate action.

It is generally recommended that more detail specific to the particular purpose be included within the Notification to allow more control.

Notifications can be modified during processing to permit a multistage process; however this needs to be designed correctly.

* ReceiptStatusOK : Notification processed fine by Receive, transmitter continues with next recipient
* ReceiptStatusFail : Notification processed but failed, transmitter continues with next recipient
* ReceiptStatusAbort : Notification processed and the receive method declares a fatal error; the transmitter will stop notifying recipients
* ReceiptStatusFinished : Notification processed and the recipient declares that the processing is completed, the transmitter will stop notifying recipients
* ReceiptStatusNotProcessed : receive method does not recognise the notification

These status are for later implmentation.
* ReceiptStatusPending : (future ues); notification been sent but the return status cannot be determined as it has been queued for later processing, transmitter continues with next recipient
* ReceiptStatusPendingFinished : Message has been definitively handled but the return value cannot be determined. the transmitter will stop notifying recipients
This commit is contained in:
Richard Harrison 2019-06-09 10:58:03 +02:00
commit 7a903361e6
11 changed files with 716 additions and 0 deletions

View File

@ -6,6 +6,7 @@ foreach( mylibfolder
bvh
debug
embedded_resources
emesary
ephemeris
io
magvar

View File

@ -0,0 +1,33 @@
include (SimGearComponent)
set(HEADERS
Emesary.hxx
INotification.hxx
IReceiver.hxx
ITransmitter.hxx
ReceiptStatus.hxx
Transmitter.hxx
notifications.hxx
)
set(SOURCES
emesary.cxx
)
simgear_component(emesary emesary "${SOURCES}" "${HEADERS}")
if(ENABLE_TESTS)
add_executable(test_emesary test_emesary.cxx)
set_target_properties(test_emesary PROPERTIES
COMPILE_DEFINITIONS "SRC_DIR=\"${CMAKE_CURRENT_SOURCE_DIR}\"" )
target_link_libraries(test_emesary ${TEST_LIBS})
add_test(emesary ${EXECUTABLE_OUTPUT_PATH}/test_emesary)
endif(ENABLE_TESTS)

View File

@ -0,0 +1,31 @@
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : Emesary main.
* : This only needs to instance the GlobalTransmitter as all of the
* : logic is in the header files (by design)
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002
*
* Version : $Header: $
*
* Copyright © 2002 Richard Harrison All Rights Reserved.
*
*---------------------------------------------------------------------------*/
#include "simgear/emesary/Emesary.hxx"
namespace simgear
{
namespace Emesary
{
}
}

View File

@ -0,0 +1,48 @@
#ifndef EMESARY_hxx
#define EMESARY_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : Provides generic inter-object communication. For an object to receive a message it
* : must first register with a Transmitter, such as GlobalTransmitter, and implement the
* : IReceiver interface. That's it.
* : To send a message use a Transmitter with an object. That's all there is to it.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
#include <typeinfo>
#include "ReceiptStatus.hxx"
#include "INotification.hxx"
#include "IReceiver.hxx"
#include "ITransmitter.hxx"
#include "Transmitter.hxx"
#include <simgear/structure/Singleton.hxx>
namespace simgear
{
namespace Emesary
{
class GlobalTransmitter : public simgear::Singleton<Transmitter>
{
public:
GlobalTransmitter()
{
}
virtual ~GlobalTransmitter() {}
};
}
}
#endif

View File

@ -0,0 +1,54 @@
#ifndef INOTIFICATION_hxx
#define INOTIFICATION_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - Notification base class
*
* File Type : Implementation File
*
* Description : Base class (interface) for all Notifications.
* : This is also compatible with the usual implementation of how we
* : implement queued notifications.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
namespace simgear
{
namespace Emesary
{
/// Interface (base class) for all notifications.
class INotification
{
public:
// text representation of notification type. must be unique across all notifications
virtual const char *GetType() = 0;
/// Used to control the sending of notifications. If this returns false then the Transmitter
/// should not send this notification.
virtual bool IsReadyToSend() { return true; }
/// Used to control the timeout. If this notification has timed out - then the processor is entitled
/// to true.
virtual bool IsTimedOut() { return false; }
/// when this notification has completed the processing recipient must set this to true.
/// the processing recipient is responsible for follow on notifications.
/// a notification can remain as complete until the transmit queue decides to remove it from the queue.
/// there is no requirement that elements are removed immediately upon completion merely that once complete
/// the transmitter should not notify any more elements.
/// The current notification loop may be completed - following the usual convention unless Completed or Abort
/// is returned as the status.
virtual bool IsComplete() { return true; }
};
}
}
#endif

View File

@ -0,0 +1,47 @@
#ifndef IRECEIVER_hxx
#define IRECEIVER_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - Receiver base class
*
* File Type : Implementation File
*
* Description : Base class for all recipients.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
namespace simgear
{
namespace Emesary
{
/// Interface (base class) for a recipeint.
class IReceiver
{
public:
/// Receive notification - must be implemented
virtual ReceiptStatus Receive(INotification& message) = 0;
/// Called when registered at a transmitter
virtual void OnRegisteredAtTransmitter(class Transmitter *p)
{
}
/// Called when de-registered at a transmitter
virtual void OnDeRegisteredAtTransmitter(class Transmitter *p)
{
}
};
}
}
#endif

View File

@ -0,0 +1,52 @@
#ifndef ITRANSMITTER_hxx
#define ITRANSMITTER_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - Transmitter base class
*
* File Type : Implementation File
*
* Description : Base class for all transmitters.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
namespace simgear
{
namespace Emesary
{
/// Interface (base clasee) for a transmitter.
/// Transmits Message derived objects. Each instance of this class provides a
/// event/databus to which any number of receivers can attach to.
class ITransmitter
{
public:
// Registers a recipient to receive message from this transmitter
virtual void Register(IReceiver& R) = 0;
// Removes a recipient from from this transmitter
virtual void DeRegister(IReceiver& R) = 0;
//Notify all registered recipients. Stop when receipt status of abort or finished are received.
//The receipt status from this method will be
// - OK > message handled
// - Fail > message not handled. A status of Abort from a recipient will result in our status
// being fail as Abort means that the message was not and cannot be handled, and
// allows for usages such as access controls.
virtual ReceiptStatus NotifyAll(INotification& M) = 0;
/// number of recipients
virtual int Count() = 0;
};
}
}
#endif

View File

@ -0,0 +1,54 @@
#ifndef RECEIPTSTATUS_hxx
#define RECEIPTSTATUS_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - Transmitter base class
*
* File Type : Implementation File
*
* Description : Defines the receipt status that can be returned from
* : a receive method.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
namespace simgear
{
namespace Emesary
{
enum ReceiptStatus
{
/// Processing completed successfully
ReceiptStatusOK = 0,
/// Individual item failure
ReceiptStatusFail = 1,
/// Fatal error; stop processing any further recipieints of this message. Implicitly fail
ReceiptStatusAbort = 2,
/// Definitive completion - do not send message to any further recipieints
ReceiptStatusFinished = 3,
/// Return value when method doesn't process a message.
ReceiptStatusNotProcessed = 4,
/// Message has been sent but the return status cannot be determined as it has not been processed by the recipient.
/// e.g. a queue or outgoing bridge
ReceiptStatusPending = 5,
/// Message has been definitively handled but the return value cannot be determined. The message will not be sent any further
/// e.g. a point to point forwarding bridge
ReceiptStatusPendingFinished = 6,
};
}
}
#endif

View File

@ -0,0 +1,202 @@
#ifndef TRANSMITTER_hxx
#define TRANSMITTER_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - Transmitter base class
*
* File Type : Implementation File
*
* Description : Defines the receipt status that can be returned from
* : a receive method.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017, simgear version 2019
*
* Version : $Header: $
*
* Copyright (C)2019 Richard Harrison Licenced under GPL2 or later.
*
*---------------------------------------------------------------------------*/
#include <string>
#include <list>
#include <set>
#include <vector>
#include <atomic>
#include <simgear/threads/SGThread.hxx>
namespace simgear
{
namespace Emesary
{
// Implementation of a ITransmitter
class Transmitter : public ITransmitter
{
protected:
typedef std::list<IReceiver *> RecipientList;
RecipientList recipient_list;
RecipientList deleted_recipients;
int CurrentRecipientIndex = 0;
SGMutex _lock;
std::atomic<int> receiveDepth;
std::atomic<int> sentMessageCount;
void UnlockList()
{
_lock.unlock();
}
void LockList()
{
_lock.lock();
}
public:
Transmitter() : receiveDepth(0), sentMessageCount(0)
{
}
virtual ~Transmitter()
{
}
// Registers an object to receive messsages from this transmitter.
// This object is added to the top of the list of objects to be notified. This is deliberate as
// the sequence of registration and message receipt can influence the way messages are processing
// when ReceiptStatus of Abort or Finished are encountered. So it was a deliberate decision that the
// most recently registered recipients should process the messages/events first.
virtual void Register(IReceiver& r)
{
LockList();
recipient_list.push_back(&r);
r.OnRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &r) != deleted_recipients.end())
deleted_recipients.remove(&r);
UnlockList();
}
// Removes an object from receving message from this transmitter
virtual void DeRegister(IReceiver& R)
{
LockList();
//printf("Remove %x\n", &R);
if (recipient_list.size())
{
if (std::find(recipient_list.begin(), recipient_list.end(), &R) != recipient_list.end())
{
recipient_list.remove(&R);
R.OnDeRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &R) == deleted_recipients.end())
deleted_recipients.push_back(&R);
}
}
UnlockList();
}
// Notify all registered recipients. Stop when receipt status of abort or finished are received.
// The receipt status from this method will be
// - OK > message handled
// - Fail > message not handled. A status of Abort from a recipient will result in our status
// being fail as Abort means that the message was not and cannot be handled, and
// allows for usages such as access controls.
virtual ReceiptStatus NotifyAll(INotification& M)
{
ReceiptStatus return_status = ReceiptStatusNotProcessed;
sentMessageCount++;
try
{
LockList();
if (receiveDepth == 0)
deleted_recipients.clear();
receiveDepth++;
std::vector<IReceiver*> temp(recipient_list.size());
int idx = 0;
for (RecipientList::iterator i = recipient_list.begin(); i != recipient_list.end(); i++)
{
temp[idx++] = *i;
}
UnlockList();
int tempSize = temp.size();
for (int index = 0; index < tempSize; index++)
{
IReceiver* R = temp[index];
LockList();
if (deleted_recipients.size())
{
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), R) != deleted_recipients.end())
{
UnlockList();
continue;
}
}
UnlockList();
if (R)
{
ReceiptStatus rstat = R->Receive(M);
switch (rstat)
{
case ReceiptStatusFail:
return_status = ReceiptStatusFail;
break;
case ReceiptStatusPending:
return_status = ReceiptStatusPending;
break;
case ReceiptStatusPendingFinished:
return rstat;
case ReceiptStatusNotProcessed:
break;
case ReceiptStatusOK:
if (return_status == ReceiptStatusNotProcessed)
return_status = rstat;
break;
case ReceiptStatusAbort:
return ReceiptStatusAbort;
case ReceiptStatusFinished:
return ReceiptStatusOK;
}
}
}
}
catch (...)
{
throw;
// return_status = ReceiptStatusAbort;
}
receiveDepth--;
return return_status;
}
// number of currently registered recipients
virtual int Count()
{
LockList();
return recipient_list.size();
UnlockList();
}
// number of sent messages.
int SentMessageCount()
{
return sentMessageCount;
}
// ascertain if a receipt status can be interpreted as failure.
static bool Failed(ReceiptStatus receiptStatus)
{
//
// failed is either Fail or Abort.
// NotProcessed isn't a failure because it hasn't been processed.
return receiptStatus == ReceiptStatusFail
|| receiptStatus == ReceiptStatusAbort;
}
};
}
}
#endif

View File

@ -0,0 +1,68 @@
#ifndef NOTIFICATIONS_hxx
#define NOTIFICATIONS_hxx
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : simgear notifications
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017
*
* Version : $Header: $
*
* Copyright © 2002 - 2017 Richard Harrison All Rights Reserved.
*
*---------------------------------------------------------------------------*/
#include "INotification.hxx"
namespace simgear
{
namespace Notifications
{
class MainLoopNotification : public simgear::Emesary::INotification
{
public:
enum Type { Started, Stopped, Begin, End };
MainLoopNotification(Type v) : Type(v) {}
virtual Type GetValue() { return Type; }
virtual const char *GetType() { return "MainLoop"; }
protected:
Type Type;
};
class NasalGarbageCollectionConfigurationNotification : public simgear::Emesary::INotification
{
public:
NasalGarbageCollectionConfigurationNotification(bool canWait, bool active) : CanWait(canWait), Active(active) {}
virtual bool GetCanWait() { return CanWait; }
virtual bool GetActive() { return Active; }
virtual const char *GetType() { return "NasalGarbageCollectionConfiguration"; }
virtual bool SetWait(bool wait) {
if (wait == CanWait)
return false;
CanWait = wait;
return true;
}
virtual bool SetActive(bool active) {
if (active == Active)
return false;
Active = active;
return true;
}
public:
bool CanWait;
bool Active;
};
}
}
#endif

View File

@ -0,0 +1,126 @@
////////////////////////////////////////////////////////////////////////
// Test harness for Emesary.
////////////////////////////////////////////////////////////////////////
#include <simgear_config.h>
#include <simgear/compiler.h>
#include <iostream>
#include <simgear/emesary/emesary.hxx>
using std::cout;
using std::cerr;
using std::endl;
std::atomic<int> nthread = 0;
std::atomic<int> noperations = 0;
const int MaxIterations = 9999;
class TestThreadNotification : public simgear::Emesary::INotification
{
protected:
const char *baseValue;
public:
TestThreadNotification(const char *v) : baseValue(v) {}
virtual const char* GetType () { return baseValue; }
};
class TestThreadRecipient : public simgear::Emesary::IReceiver
{
public:
TestThreadRecipient() : receiveCount(0)
{
}
std::atomic<int> receiveCount;
virtual simgear::Emesary::ReceiptStatus Receive(simgear::Emesary::INotification &n)
{
if (n.GetType() == (const char*)this)
{
TestThreadNotification *tn = dynamic_cast<TestThreadNotification *>(&n);
receiveCount++;
TestThreadNotification onwardNotification("AL");
simgear::Emesary::GlobalTransmitter::instance()->NotifyAll(onwardNotification);
return simgear::Emesary::ReceiptStatusOK;
}
return simgear::Emesary::ReceiptStatusOK;
}
};
class EmesaryTestThread : public SGThread
{
protected:
virtual void run() {
int threadId = nthread.fetch_add(1);
//System.Threading.Interlocked.Increment(ref nthread);
//var rng = new Random();
TestThreadRecipient r;
char temp[100];
sprintf(temp, "Notif %d", threadId);
printf("starting thread %s\n", temp);
TestThreadNotification tn((const char*)&r);
for (int i = 0; i < MaxIterations; i++)
{
simgear::Emesary::GlobalTransmitter::instance()->Register(r);
simgear::Emesary::GlobalTransmitter::instance()->NotifyAll(tn);
simgear::Emesary::GlobalTransmitter::instance()->DeRegister(r);
//System.Threading.Thread.Sleep(rng.Next(MaxSleep));
noperations++;
}
printf("%s invocations %d\n", temp, (int)r.receiveCount);
printf("finish thread %s\n", temp);
}
};
class EmesaryTest
{
public:
void Emesary_MultiThreadTransmitterTest()
{
int num_threads = 12;
std::list<EmesaryTestThread*> threads;
for (int i = 0; i < num_threads; i++)
{
EmesaryTestThread *thread = new EmesaryTestThread();
threads.push_back(thread);
thread->start();
}
for (std::list<EmesaryTestThread*>::iterator i = threads.begin(); i != threads.end(); i++)
{
(*i)->join();
}
}
};
void testEmesaryThreaded()
{
TestThreadRecipient r;
TestThreadNotification tn((const char*)&r);
simgear::Emesary::GlobalTransmitter::instance()->Register(r);
for (int i = 0; i < MaxIterations*MaxIterations; i++)
{
simgear::Emesary::GlobalTransmitter::instance()->NotifyAll(tn);
//System.Threading.Thread.Sleep(rng.Next(MaxSleep));
noperations++;
}
simgear::Emesary::GlobalTransmitter::instance()->DeRegister(r);
printf("invocations %d\n", simgear::Emesary::GlobalTransmitter::instance()->SentMessageCount());
EmesaryTest t;
t.Emesary_MultiThreadTransmitterTest();
}
int main(int ac, char ** av)
{
testEmesaryThreaded();
std::cout << "all tests passed" << std::endl;
return 0;
}