Added Emesary to SimGear Core

This commit is contained in:
Richard Harrison 2019-06-03 23:32:34 +02:00
parent 9ac3c1a394
commit 8eb51e813f
6 changed files with 583 additions and 0 deletions

View File

@ -6,6 +6,7 @@ foreach( mylibfolder
bvh
debug
embedded_resources
emesary
ephemeris
io
magvar

View File

@ -0,0 +1,28 @@
include (SimGearComponent)
set(HEADERS
emesary.hxx
notifications.hxx
)
set(SOURCES
emesary.cxx
)
simgear_component(emesary emesary "${SOURCES}" "${HEADERS}")
if(ENABLE_TESTS)
add_executable(test_emesary test_emesary.cxx)
set_target_properties(test_emesary PROPERTIES
COMPILE_DEFINITIONS "SRC_DIR=\"${CMAKE_CURRENT_SOURCE_DIR}\"" )
target_link_libraries(test_emesary ${TEST_LIBS})
add_test(emesary ${EXECUTABLE_OUTPUT_PATH}/test_emesary)
endif(ENABLE_TESTS)

View File

@ -0,0 +1,27 @@
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : Templated version of Emesary
* :
* :
* :
* :
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002
*
* Version : $Header: $
*
* Copyright © 2002 Richard Harrison All Rights Reserved.
*
*---------------------------------------------------------------------------*/
#include "simgear/emesary/Emesary.hxx"
simgear::Emesary::Transmitter GlobalTransmitter;

325
simgear/emesary/Emesary.hxx Normal file
View File

@ -0,0 +1,325 @@
#pragma once
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : Provides generic inter-object communication. For an object to receive a message it
* : must first register with a Transmitter, such as GlobalTransmitter, and implement the
* : IReceiver interface. That's it.
* : To send a message use a Transmitter with an object. That's all there is to it.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017
*
* Version : $Header: $
*
* Copyright © 2002 - 2017 Richard Harrison All Rights Reserved.
*
*---------------------------------------------------------------------------*/
#include <typeinfo>
#include <string>
#include <list>
#include <set>
#include <vector>
#include <atomic>
#include <simgear/threads/SGThread.hxx>
namespace simgear
{
namespace Emesary
{
enum ReceiptStatus
{
/// <summary>
/// Processing completed successfully
/// </summary>
ReceiptStatusOK = 0,
/// <summary>
/// Individual item failure
/// </summary>
ReceiptStatusFail = 1,
/// <summary>
/// Fatal error; stop processing any further recipieints of this message. Implicitly fail
/// </summary>
ReceiptStatusAbort = 2,
/// <summary>
/// Definitive completion - do not send message to any further recipieints
/// </summary>
ReceiptStatusFinished = 3,
/// <summary>
/// Return value when method doesn't process a message.
/// </summary>
ReceiptStatusNotProcessed = 4,
/// <summary>
/// Message has been sent but the return status cannot be determined as it has not been processed by the recipient.
/// </summary>
/// <notes>
/// For example a queue or outgoing bridge
/// </notes>
ReceiptStatusPending = 5,
/// <summary>
/// Message has been definitively handled but the return value cannot be determined. The message will not be sent any further
/// </summary>
/// <notes>
/// For example a point to point forwarding bridge
/// </notes>
ReceiptStatusPendingFinished = 6,
};
/// <summary>
/// Interface (base class) for all notifications. The value is an opaque pointer that may be used to store anything, although
/// often it is more convenient to
/// </summary>
class INotification
{
public:
virtual const char *GetType() = 0;
};
/// <summary>
/// Interface (base class) for a recipeint.
/// </summary>
class IReceiver
{
public:
/// <summary>
/// Receive notifiction - must be implemented
/// </summary>
virtual ReceiptStatus Receive(INotification& message) = 0;
/// <summary>
/// Called when registered at a transmitter
/// </summary>
virtual void OnRegisteredAtTransmitter(class Transmitter *p)
{
}
/// <summary>
/// Called when de-registered at a transmitter
/// </summary>
virtual void OnDeRegisteredAtTransmitter(class Transmitter *p)
{
}
};
/// <summary>
/// Interface (base clasee) for a transmitter.
/// Transmits Message derived objects. Each instance of this class provides a
/// databus to which any number of receivers can attach to.
/// </summary>
class ITransmitter
{
public:
/*
* Registers a recipient to receive message from this transmitter
*/
virtual void Register(IReceiver& R) = 0;
/*
* Removes a recipient from from this transmitter
*/
virtual void DeRegister(IReceiver& R) = 0;
/*
* Notify all registered recipients. Stop when receipt status of abort or finished are received.
* The receipt status from this method will be
* - OK > message handled
* - Fail > message not handled. A status of Abort from a recipient will result in our status
* being fail as Abort means that the message was not and cannot be handled, and
* allows for usages such as access controls.
*/
virtual ReceiptStatus NotifyAll(INotification& M) = 0;
/// <summary>
/// number of recipients
/// </summary>
virtual int Count() = 0;
};
/**
* Description: Transmits Message derived objects. Each instance of this class provides a
* databus to which any number of receivers can attach to.
*
* Messages may be inherited and customised between individual systems.
*/
class Transmitter : public ITransmitter
{
protected:
typedef std::list<IReceiver *> RecipientList;
RecipientList recipient_list;
RecipientList deleted_recipients;
int CurrentRecipientIndex = 0;
SGMutex _lock;
std::atomic<int> receiveDepth;
std::atomic<int> sentMessageCount;
void UnlockList()
{
_lock.unlock();
}
void LockList()
{
_lock.lock();
}
public:
Transmitter() : receiveDepth(0), sentMessageCount(0)
{
}
virtual ~Transmitter()
{
}
/**
* Registers an object to receive messsages from this transmitter.
* This object is added to the top of the list of objects to be notified. This is deliberate as
* the sequence of registration and message receipt can influence the way messages are processing
* when ReceiptStatus of Abort or Finished are encountered. So it was a deliberate decision that the
* most recently registered recipients should process the messages/events first.
*/
virtual void Register(IReceiver& r)
{
LockList();
recipient_list.push_back(&r);
r.OnRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &r) != deleted_recipients.end())
deleted_recipients.remove(&r);
UnlockList();
}
/*
* Removes an object from receving message from this transmitter
*/
virtual void DeRegister(IReceiver& R)
{
LockList();
//printf("Remove %x\n", &R);
if (recipient_list.size())
{
if (std::find(recipient_list.begin(), recipient_list.end(), &R) != recipient_list.end())
{
recipient_list.remove(&R);
R.OnDeRegisteredAtTransmitter(this);
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &R) == deleted_recipients.end())
deleted_recipients.push_back(&R);
}
}
UnlockList();
}
/*
* Notify all registered recipients. Stop when receipt status of abort or finished are received.
* The receipt status from this method will be
* - OK > message handled
* - Fail > message not handled. A status of Abort from a recipient will result in our status
* being fail as Abort means that the message was not and cannot be handled, and
* allows for usages such as access controls.
* NOTE: When I first designed Emesary I always intended to have message routing and the ability
* for each recipient to specify an area of interest to allow performance improvements
* however this has not yet been implemented - but the concept is still there and
* could be implemented by extending the IReceiver interface to allow for this.
*/
virtual ReceiptStatus NotifyAll(INotification& M)
{
ReceiptStatus return_status = ReceiptStatusNotProcessed;
//printf("Begin receive %d : %x\n", (int)receiveDepth, M);
//fflush(stdout);
sentMessageCount++;
try
{
LockList();
if (receiveDepth == 0)
deleted_recipients.clear();
receiveDepth++;
std::vector<IReceiver*> temp(recipient_list.size());
int idx = 0;
for (RecipientList::iterator i = recipient_list.begin(); i != recipient_list.end(); i++)
{
temp[idx++] = *i;
}
UnlockList();
int tempSize = temp.size();
for (int index = 0; index < tempSize; index++)
{
IReceiver* R = temp[index];
LockList();
if (deleted_recipients.size())
{
if (std::find(deleted_recipients.begin(), deleted_recipients.end(), R) != deleted_recipients.end())
{
UnlockList();
continue;
}
}
UnlockList();
if (R)
{
ReceiptStatus rstat = R->Receive(M);
switch (rstat)
{
case ReceiptStatusFail:
return_status = ReceiptStatusFail;
break;
case ReceiptStatusPending:
return_status = ReceiptStatusPending;
break;
case ReceiptStatusPendingFinished:
return rstat;
case ReceiptStatusNotProcessed:
break;
case ReceiptStatusOK:
if (return_status == ReceiptStatusNotProcessed)
return_status = rstat;
break;
case ReceiptStatusAbort:
return ReceiptStatusAbort;
case ReceiptStatusFinished:
return ReceiptStatusOK;
}
}
}
}
catch (...)
{
throw;
// return_status = ReceiptStatusAbort;
}
receiveDepth--;
//printf("End receive %d : %x\n", (int) receiveDepth, M);
return return_status;
}
virtual int Count()
{
LockList();
return recipient_list.size();
UnlockList();
}
int SentMessageCount()
{
return sentMessageCount;
}
static bool Failed(ReceiptStatus receiptStatus)
{
//
// failed is either Fail or Abort.
// NotProcessed isn't a failure because it hasn't been processed.
return receiptStatus == ReceiptStatusFail
|| receiptStatus == ReceiptStatusAbort;
}
};
Transmitter GlobalTransmitter;
}
}

View File

@ -0,0 +1,76 @@
/*---------------------------------------------------------------------------
*
* Title : Emesary - class based inter-object communication
*
* File Type : Implementation File
*
* Description : Provides generic inter-object communication. For an object to receive a message it
* : must first register with a Transmitter, such as GlobalTransmitter, and implement the
* : IReceiver interface. That's it.
* : To send a message use a Transmitter with an object. That's all there is to it.
*
* References : http://www.chateau-logic.com/content/class-based-inter-object-communication
*
* Author : Richard Harrison (richard@zaretto.com)
*
* Creation Date : 18 March 2002, rewrite 2017
*
* Version : $Header: $
*
* Copyright © 2002 - 2017 Richard Harrison All Rights Reserved.
*
*---------------------------------------------------------------------------*/
#include <typeinfo>
#include <string>
#include <list>
#include <set>
#include <vector>
#include <Windows.h>
#include <process.h>
#include <atomic>
#include <simgear/emesary/emesary.hxx>
namespace simgear
{
namespace Notifications
{
class MainLoopNotification : public simgear::Emesary::INotification
{
public:
enum Type { Started, Stopped, Begin, End };
MainLoopNotification(Type v) : Type(v) {}
virtual Type GetValue() { return Type; }
virtual const char *GetType() { return "MainLoop"; }
protected:
Type Type;
};
class NasalGarbageCollectionConfigurationNotification : public simgear::Emesary::INotification
{
public:
NasalGarbageCollectionConfigurationNotification(bool canWait, bool active) : CanWait(canWait), Active(active) {}
virtual bool GetCanWait() { return CanWait; }
virtual bool GetActive() { return Active; }
virtual const char *GetType() { return "NasalGarbageCollectionConfiguration"; }
virtual bool SetWait(bool wait) {
if (wait == CanWait)
return false;
CanWait = wait;
return true;
}
virtual bool SetActive(bool active) {
if (active == Active)
return false;
Active = active;
return true;
}
public:
bool CanWait;
bool Active;
};
}
}

View File

@ -0,0 +1,126 @@
////////////////////////////////////////////////////////////////////////
// Test harness for Emesary.
////////////////////////////////////////////////////////////////////////
#include <simgear_config.h>
#include <simgear/compiler.h>
#include <iostream>
#include <simgear/emesary/emesary.hxx>
using std::cout;
using std::cerr;
using std::endl;
std::atomic<int> nthread = 0;
std::atomic<int> noperations = 0;
const int MaxIterations = 9999;
class TestThreadNotification : public simgear::Emesary::INotification
{
protected:
const char *baseValue;
public:
TestThreadNotification(const char *v) : baseValue(v) {}
virtual const char* GetType () { return baseValue; }
};
class TestThreadRecipient : public simgear::Emesary::IReceiver
{
public:
TestThreadRecipient() : receiveCount(0)
{
}
std::atomic<int> receiveCount;
virtual simgear::Emesary::ReceiptStatus Receive(simgear::Emesary::INotification &n)
{
if (n.GetType() == (const char*)this)
{
TestThreadNotification *tn = dynamic_cast<TestThreadNotification *>(&n);
receiveCount++;
TestThreadNotification onwardNotification("AL");
simgear::Emesary::GlobalTransmitter.NotifyAll(onwardNotification);
return simgear::Emesary::ReceiptStatusOK;
}
return simgear::Emesary::ReceiptStatusOK;
}
};
class EmesaryTestThread : public SGThread
{
protected:
virtual void run() {
int threadId = nthread.fetch_add(1);
//System.Threading.Interlocked.Increment(ref nthread);
//var rng = new Random();
TestThreadRecipient r;
char temp[100];
sprintf(temp, "Notif %d", threadId);
printf("starting thread %s\n", temp);
TestThreadNotification tn((const char*)&r);
for (int i = 0; i < MaxIterations; i++)
{
simgear::Emesary::GlobalTransmitter.Register(r);
simgear::Emesary::GlobalTransmitter.NotifyAll(tn);
simgear::Emesary::GlobalTransmitter.DeRegister(r);
//System.Threading.Thread.Sleep(rng.Next(MaxSleep));
noperations++;
}
printf("%s invocations %d\n", temp, (int)r.receiveCount);
printf("finish thread %s\n", temp);
}
};
class EmesaryTest
{
public:
void Emesary_MultiThreadTransmitterTest()
{
int num_threads = 12;
std::list<EmesaryTestThread*> threads;
for (int i = 0; i < num_threads; i++)
{
EmesaryTestThread *thread = new EmesaryTestThread();
threads.push_back(thread);
thread->start();
}
for (std::list<EmesaryTestThread*>::iterator i = threads.begin(); i != threads.end(); i++)
{
(*i)->join();
}
}
};
void testEmesaryThreaded()
{
TestThreadRecipient r;
TestThreadNotification tn((const char*)&r);
simgear::Emesary::GlobalTransmitter.Register(r);
for (int i = 0; i < MaxIterations*MaxIterations; i++)
{
simgear::Emesary::GlobalTransmitter.NotifyAll(tn);
//System.Threading.Thread.Sleep(rng.Next(MaxSleep));
noperations++;
}
simgear::Emesary::GlobalTransmitter.DeRegister(r);
printf("invocations %d\n", simgear::Emesary::GlobalTransmitter.SentMessageCount());
EmesaryTest t;
t.Emesary_MultiThreadTransmitterTest();
}
int main(int ac, char ** av)
{
testEmesaryThreaded();
std::cout << "all tests passed" << std::endl;
return 0;
}