hla: lift the event loop logic from RTI to HLAFederate.

This commit is contained in:
Mathias Froehlich 2011-10-02 10:11:10 +02:00
parent 9e27511ef9
commit f665431132
5 changed files with 267 additions and 114 deletions

View File

@ -1,4 +1,4 @@
// Copyright (C) 2009 - 2010 Mathias Froehlich - Mathias.Froehlich@web.de
// Copyright (C) 2009 - 2011 Mathias Froehlich - Mathias.Froehlich@web.de
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -27,7 +27,8 @@
namespace simgear {
HLAFederate::HLAFederate()
HLAFederate::HLAFederate() :
_version(RTI13)
{
}
@ -370,7 +371,17 @@ HLAFederate::enableTimeConstrained()
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->enableTimeConstrained();
if (!_rtiFederate->enableTimeConstrained()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Could not enable time constrained!");
return false;
}
while (!_rtiFederate->getTimeConstrainedEnabled()) {
_rtiFederate->processMessage();
}
return true;
}
bool
@ -390,7 +401,17 @@ HLAFederate::enableTimeRegulation(const SGTimeStamp& lookahead)
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->enableTimeRegulation(lookahead);
if (!_rtiFederate->enableTimeRegulation(lookahead)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Could not enable time regulation!");
return false;
}
while (!_rtiFederate->getTimeRegulationEnabled()) {
_rtiFederate->processMessage();
}
return true;
}
bool
@ -404,23 +425,73 @@ HLAFederate::disableTimeRegulation()
}
bool
HLAFederate::timeAdvanceRequestBy(const SGTimeStamp& dt)
HLAFederate::modifyLookahead(const SGTimeStamp& timeStamp)
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->timeAdvanceRequestBy(dt);
return _rtiFederate->modifyLookahead(timeStamp);
}
bool
HLAFederate::timeAdvanceRequest(const SGTimeStamp& dt)
HLAFederate::timeAdvanceBy(const SGTimeStamp& timeIncrement)
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->timeAdvanceRequest(dt);
SGTimeStamp timeStamp;
if (!_rtiFederate->queryFederateTime(timeStamp)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Could not query federate time!");
return false;
}
if (!_rtiFederate->timeAdvanceRequest(timeStamp + timeIncrement)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Time advance request failed!");
return false;
}
return processMessages();
}
bool
HLAFederate::timeAdvance(const SGTimeStamp& timeStamp)
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
if (!_rtiFederate->timeAdvanceRequest(timeStamp)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Time advance request failed!");
return false;
}
return processMessages();
}
bool
HLAFederate::timeAdvanceAvailable()
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
SGTimeStamp timeStamp;
if (!_rtiFederate->queryGALT(timeStamp)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Could not query GALT!");
return false;
}
if (!_rtiFederate->timeAdvanceRequestAvailable(timeStamp)) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Time advance request failed!");
return false;
}
return processMessages();
}
bool
@ -433,16 +504,6 @@ HLAFederate::queryFederateTime(SGTimeStamp& timeStamp)
return _rtiFederate->queryFederateTime(timeStamp);
}
bool
HLAFederate::modifyLookahead(const SGTimeStamp& timeStamp)
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->modifyLookahead(timeStamp);
}
bool
HLAFederate::queryLookahead(SGTimeStamp& timeStamp)
{
@ -454,13 +515,41 @@ HLAFederate::queryLookahead(SGTimeStamp& timeStamp)
}
bool
HLAFederate::tick()
HLAFederate::processMessage()
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->tick();
return _rtiFederate->processMessage();
}
bool
HLAFederate::processMessage(const SGTimeStamp& timeout)
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->processMessages(timeout.toSecs(), 0);
}
bool
HLAFederate::processMessages()
{
if (!_rtiFederate.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
while (_rtiFederate->getTimeAdvancePending()) {
_rtiFederate->processMessage();
}
// Now flush just what is left
while (!_rtiFederate->processMessages(0, 0));
return true;
}
bool
@ -470,7 +559,7 @@ HLAFederate::tick(const double& minimum, const double& maximum)
SG_LOG(SG_NETWORK, SG_WARN, "HLA: Accessing unconnected federate!");
return false;
}
return _rtiFederate->tick(minimum, maximum);
return _rtiFederate->processMessages(minimum, maximum);
}
bool

View File

@ -1,4 +1,4 @@
// Copyright (C) 2009 - 2010 Mathias Froehlich - Mathias.Froehlich@web.de
// Copyright (C) 2009 - 2011 Mathias Froehlich - Mathias.Froehlich@web.de
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -82,21 +82,42 @@ public:
bool resignDestroyFederationExecution();
/// Time management
bool enableTimeConstrained();
bool disableTimeConstrained();
bool enableTimeRegulation(const SGTimeStamp& lookahead);
bool disableTimeRegulation();
bool modifyLookahead(const SGTimeStamp& lookahead);
bool timeAdvanceRequestBy(const SGTimeStamp& dt);
bool timeAdvanceRequest(const SGTimeStamp& dt);
/// Advance the logical time by the given time increment.
/// Depending on the time constrained mode, this might
/// block until the time advance is granted.
bool timeAdvanceBy(const SGTimeStamp& timeIncrement);
/// Advance the logical time to the given time.
/// Depending on the time constrained mode, this might
/// block until the time advance is granted.
bool timeAdvance(const SGTimeStamp& timeStamp);
/// Advance the logical time as far as time advances are available.
/// This call should not block and advance the logical time
/// as far as currently possible.
bool timeAdvanceAvailable();
bool queryFederateTime(SGTimeStamp& timeStamp);
bool modifyLookahead(const SGTimeStamp& timeStamp);
bool queryLookahead(SGTimeStamp& timeStamp);
/// Process messages
bool tick();
/// Process one messsage
bool processMessage();
/// Process one message but do not wait longer than the relative timeout.
bool processMessage(const SGTimeStamp& timeout);
/// Process messages until the federate can proceed with the
/// next simulation step. That is flush all pending messages and
/// depending on the time constrained mode process messages until
/// a pending time advance is granted.
bool processMessages();
/// Legacy tick call
bool tick(const double& minimum, const double& maximum);
class ObjectModelFactory {
@ -128,13 +149,21 @@ public:
const HLAInteractionClass* getInteractionClass(const std::string& name) const;
private:
HLAFederate(const HLAFederate&);
HLAFederate& operator=(const HLAFederate&);
/// The underlying interface to the rti implementation
SGSharedPtr<RTIFederate> _rtiFederate;
/// Parameters required to connect to an rti
Version _version;
std::list<std::string> _connectArguments;
/// Parameters for the federation execution
std::string _federationExecutionName;
std::string _federationObjectModel;
/// Parameters for the federate
std::string _federateType;
std::string _federateName;

View File

@ -579,7 +579,6 @@ private:
RTI13Federate::RTI13Federate(const std::list<std::string>& stringList) :
_joined(false),
_tickTimeout(10),
_ambassador(new RTI13Ambassador),
_federateAmbassador(new FederateAmbassador)
{
@ -728,13 +727,9 @@ RTI13Federate::registerFederationSynchronizationPoint(const std::string& label,
}
bool
RTI13Federate::waitForFederationSynchronizationPointAnnounced(const std::string& label)
RTI13Federate::getFederationSynchronizationPointAnnounced(const std::string& label)
{
while (!_federateAmbassador->getFederationSynchronizationPointAnnounced(label)) {
_ambassador->tick(_tickTimeout, 0);
_federateAmbassador->processQueues();
}
return true;
return _federateAmbassador->getFederationSynchronizationPointAnnounced(label);
}
bool
@ -766,13 +761,9 @@ RTI13Federate::synchronizationPointAchieved(const std::string& label)
}
bool
RTI13Federate::waitForFederationSynchronized(const std::string& label)
RTI13Federate::getFederationSynchronized(const std::string& label)
{
while (!_federateAmbassador->getFederationSynchronized(label)) {
_ambassador->tick(_tickTimeout, 0);
_federateAmbassador->processQueues();
}
return true;
return _federateAmbassador->getFederationSynchronized(label);
}
bool
@ -816,11 +807,6 @@ RTI13Federate::enableTimeConstrained()
return false;
}
while (!_federateAmbassador->_timeConstrainedEnabled) {
_ambassador->tick(_tickTimeout, 0);
_federateAmbassador->processQueues();
}
return true;
}
@ -863,6 +849,12 @@ RTI13Federate::disableTimeConstrained()
return true;
}
bool
RTI13Federate::getTimeConstrainedEnabled()
{
return _federateAmbassador->_timeConstrainedEnabled;
}
bool
RTI13Federate::enableTimeRegulation(const SGTimeStamp& lookahead)
{
@ -910,11 +902,6 @@ RTI13Federate::enableTimeRegulation(const SGTimeStamp& lookahead)
return false;
}
while (!_federateAmbassador->_timeRegulationEnabled) {
_ambassador->tick(_tickTimeout, 0);
_federateAmbassador->processQueues();
}
return true;
}
@ -958,19 +945,44 @@ RTI13Federate::disableTimeRegulation()
}
bool
RTI13Federate::timeAdvanceRequestBy(const SGTimeStamp& dt)
RTI13Federate::modifyLookahead(const SGTimeStamp& timeStamp)
{
if (!_ambassador.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not disable time regulation at unconnected federate.");
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead.");
return false;
}
SGTimeStamp fedTime = _federateAmbassador->_federateTime + dt;
return timeAdvanceRequest(fedTime);
try {
_ambassador->modifyLookahead(timeStamp);
} catch (RTI::InvalidLookahead& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::FederateNotExecutionMember& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::ConcurrentAccessAttempted& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::SaveInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::RestoreInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::RTIinternalError& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
}
return true;
}
bool
RTI13Federate::timeAdvanceRequest(const SGTimeStamp& fedTime)
RTI13Federate::getTimeRegulationEnabled()
{
return _federateAmbassador->_timeRegulationEnabled;
}
bool
RTI13Federate::timeAdvanceRequest(const SGTimeStamp& timeStamp)
{
if (!_ambassador.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not disable time regulation at unconnected federate.");
@ -978,7 +990,7 @@ RTI13Federate::timeAdvanceRequest(const SGTimeStamp& fedTime)
}
try {
_ambassador->timeAdvanceRequest(fedTime);
_ambassador->timeAdvanceRequest(timeStamp);
_federateAmbassador->_timeAdvancePending = true;
} catch (RTI::InvalidFederationTime& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
@ -1012,14 +1024,61 @@ RTI13Federate::timeAdvanceRequest(const SGTimeStamp& fedTime)
return false;
}
while (_federateAmbassador->_timeAdvancePending) {
_ambassador->tick(_tickTimeout, 0);
_federateAmbassador->processQueues();
return true;
}
bool
RTI13Federate::timeAdvanceRequestAvailable(const SGTimeStamp& timeStamp)
{
if (!_ambassador.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not disable time regulation at unconnected federate.");
return false;
}
try {
_ambassador->timeAdvanceRequestAvailable(timeStamp);
_federateAmbassador->_timeAdvancePending = true;
} catch (RTI::InvalidFederationTime& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::FederationTimeAlreadyPassed& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::TimeAdvanceAlreadyInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::EnableTimeRegulationPending& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::EnableTimeConstrainedPending& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::FederateNotExecutionMember& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::ConcurrentAccessAttempted& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::SaveInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::RestoreInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
} catch (RTI::RTIinternalError& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not resign federation execution: " << e._name << " " << e._reason);
return false;
}
return true;
}
bool
RTI13Federate::getTimeAdvancePending()
{
return _federateAmbassador->_timeAdvancePending;
}
bool
RTI13Federate::queryFederateTime(SGTimeStamp& timeStamp)
{
@ -1049,37 +1108,6 @@ RTI13Federate::queryFederateTime(SGTimeStamp& timeStamp)
return true;
}
bool
RTI13Federate::modifyLookahead(const SGTimeStamp& timeStamp)
{
if (!_ambassador.valid()) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead.");
return false;
}
try {
_ambassador->modifyLookahead(timeStamp);
} catch (RTI::InvalidLookahead& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::FederateNotExecutionMember& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::ConcurrentAccessAttempted& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::SaveInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::RestoreInProgress& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
} catch (RTI::RTIinternalError& e) {
SG_LOG(SG_NETWORK, SG_WARN, "RTI: Could not modify lookahead: " << e._name << " " << e._reason);
return false;
}
return true;
}
bool
RTI13Federate::queryLookahead(SGTimeStamp& timeStamp)
{
@ -1168,7 +1196,7 @@ RTI13Federate::queryLITS(SGTimeStamp& timeStamp)
}
bool
RTI13Federate::tick()
RTI13Federate::processMessage()
{
bool result = _ambassador->tick();
_federateAmbassador->processQueues();
@ -1176,10 +1204,17 @@ RTI13Federate::tick()
}
bool
RTI13Federate::tick(const double& minimum, const double& maximum)
RTI13Federate::processMessages(const double& minimum, const double& maximum)
{
bool result = _ambassador->tick(minimum, maximum);
bool result = _ambassador->tick(minimum, 0);
_federateAmbassador->processQueues();
if (!result)
return false;
SGTimeStamp timeStamp = SGTimeStamp::now() + SGTimeStamp::fromSec(maximum);
do {
result = _ambassador->tick(0, 0);
_federateAmbassador->processQueues();
} while (result && SGTimeStamp::now() <= timeStamp);
return result;
}

View File

@ -48,30 +48,32 @@ public:
/// Synchronization Point handling
virtual bool registerFederationSynchronizationPoint(const std::string& label, const RTIData& tag);
virtual bool waitForFederationSynchronizationPointAnnounced(const std::string& label);
virtual bool getFederationSynchronizationPointAnnounced(const std::string& label);
virtual bool synchronizationPointAchieved(const std::string& label);
virtual bool waitForFederationSynchronized(const std::string& label);
virtual bool getFederationSynchronized(const std::string& label);
/// Time management
virtual bool enableTimeConstrained();
virtual bool disableTimeConstrained();
virtual bool getTimeConstrainedEnabled();
virtual bool enableTimeRegulation(const SGTimeStamp& lookahead);
virtual bool disableTimeRegulation();
virtual bool modifyLookahead(const SGTimeStamp& timeStamp);
virtual bool getTimeRegulationEnabled();
virtual bool timeAdvanceRequestBy(const SGTimeStamp& dt);
virtual bool timeAdvanceRequest(const SGTimeStamp& fedTime);
virtual bool timeAdvanceRequest(const SGTimeStamp& timeStamp);
virtual bool timeAdvanceRequestAvailable(const SGTimeStamp& timeStamp);
virtual bool getTimeAdvancePending();
virtual bool queryFederateTime(SGTimeStamp& timeStamp);
virtual bool modifyLookahead(const SGTimeStamp& timeStamp);
virtual bool queryLookahead(SGTimeStamp& timeStamp);
virtual bool queryGALT(SGTimeStamp& timeStamp);
virtual bool queryLITS(SGTimeStamp& timeStamp);
/// Process messages
virtual bool tick();
virtual bool tick(const double& minimum, const double& maximum);
virtual bool processMessage();
virtual bool processMessages(const double& minimum, const double& maximum);
virtual RTI13ObjectClass* createObjectClass(const std::string& name, HLAObjectClass* hlaObjectClass);
@ -86,10 +88,6 @@ private:
RTI::FederateHandle _federateHandle;
bool _joined;
/// The timeout for the single callback tick function in
/// syncronous operations that need to wait for a callback
double _tickTimeout;
/// RTI connection
SGSharedPtr<RTI13Ambassador> _ambassador;

View File

@ -1,4 +1,4 @@
// Copyright (C) 2009 - 2010 Mathias Froehlich - Mathias.Froehlich@web.de
// Copyright (C) 2009 - 2011 Mathias Froehlich - Mathias.Froehlich@web.de
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -49,30 +49,32 @@ public:
/// Synchronization Point handling
virtual bool registerFederationSynchronizationPoint(const std::string& label, const RTIData& tag) = 0;
virtual bool waitForFederationSynchronizationPointAnnounced(const std::string& label) = 0;
virtual bool getFederationSynchronizationPointAnnounced(const std::string& label) = 0;
virtual bool synchronizationPointAchieved(const std::string& label) = 0;
virtual bool waitForFederationSynchronized(const std::string& label) = 0;
virtual bool getFederationSynchronized(const std::string& label) = 0;
/// Time management
virtual bool enableTimeConstrained() = 0;
virtual bool disableTimeConstrained() = 0;
virtual bool getTimeConstrainedEnabled() = 0;
virtual bool enableTimeRegulation(const SGTimeStamp& lookahead) = 0;
virtual bool disableTimeRegulation() = 0;
virtual bool modifyLookahead(const SGTimeStamp& timeStamp) = 0;
virtual bool getTimeRegulationEnabled() = 0;
virtual bool timeAdvanceRequestBy(const SGTimeStamp& dt) = 0;
virtual bool timeAdvanceRequest(const SGTimeStamp& fedTime) = 0;
virtual bool timeAdvanceRequestAvailable(const SGTimeStamp& timeStamp) = 0;
virtual bool getTimeAdvancePending() = 0;
virtual bool queryFederateTime(SGTimeStamp& timeStamp) = 0;
virtual bool modifyLookahead(const SGTimeStamp& timeStamp) = 0;
virtual bool queryLookahead(SGTimeStamp& timeStamp) = 0;
virtual bool queryGALT(SGTimeStamp& timeStamp) = 0;
virtual bool queryLITS(SGTimeStamp& timeStamp) = 0;
/// Process messages
virtual bool tick() = 0;
virtual bool tick(const double& minimum, const double& maximum) = 0;
virtual bool processMessage() = 0;
virtual bool processMessages(const double& minimum, const double& maximum) = 0;
virtual RTIObjectClass* createObjectClass(const std::string& name, HLAObjectClass* hlaObjectClass) = 0;
// virtual RTIInteractionClass* createInteractionClass(const std::string& name) = 0;