diff --git a/simgear/threads/SGThread.cxx b/simgear/threads/SGThread.cxx index 0ba71f40..c5686e51 100644 --- a/simgear/threads/SGThread.cxx +++ b/simgear/threads/SGThread.cxx @@ -25,6 +25,7 @@ #endif #include +#include #include "SGThread.hxx" @@ -416,3 +417,98 @@ SGWaitCondition::broadcast() { _privateData->broadcast(); } + + +SGExclusiveThread::SGExclusiveThread() : + _started(false), _terminated(false), last_await_time(0), + dataReady(false), complete(true), process_ran(false), process_running(false) + { + } + + SGExclusiveThread::~SGExclusiveThread() + { + + } + + void SGExclusiveThread::release() { + std::unique_lock lck(mutex_); + if (!complete) { + SG_LOG(SG_NASAL, SG_ALERT, "[SGExclusiveThread] not finished - skipping"); + return; + } + if (!complete.exchange(false)) + SG_LOG(SG_NASAL, SG_ALERT, "[SGExclusiveThread] concurrent failure (2)"); + if (dataReady.exchange(true)) + SG_LOG(SG_NASAL, SG_ALERT, "[SGExclusiveThread] concurrent failure (1)"); + condVar.notify_one(); + } + void SGExclusiveThread::wait() { + std::unique_lock lck(mutex_); + if (!dataReady) + { + do + { + condVar.wait(lck); + } while (!dataReady); + } + } + void SGExclusiveThread::clearAwaitCompletionTime() { + last_await_time = 0; + } + void SGExclusiveThread::awaitCompletion() { + timestamp.stamp(); + std::unique_lock lck(Cmutex_); + if (!complete) + { + do { + CcondVar.wait(lck); + } while (!complete.load()); + } + + if (process_ran) { + last_await_time = timestamp.elapsedUSec(); + process_ran = 0; + } + } + + void SGExclusiveThread::setCompletion() { + std::unique_lock lck(Cmutex_); + if (!dataReady.exchange(false)) + SG_LOG(SG_NASAL, SG_ALERT, "[SGExclusiveThread] atomic operation on dataReady failed (5)\n"); + + if (complete.exchange(true)) + SG_LOG(SG_NASAL, SG_ALERT, "[SGExclusiveThread] atomic operation on complete failed (5)\n"); + CcondVar.notify_one(); + } + void SGExclusiveThread::run() + { + process_running = true; + while (!_terminated) { + wait(); + process_ran = process(); + setCompletion(); + } + process_running = false; + _terminated = false; + _started = false; + } + + void SGExclusiveThread::terminate() { + _terminated = true; + } + bool SGExclusiveThread::stop() + { + return true; + } + void SGExclusiveThread::ensure_running() + { + if (!_started) + { + _started = true; + start(); + } + } + bool SGExclusiveThread::is_running() + { + return process_running; + } diff --git a/simgear/threads/SGThread.hxx b/simgear/threads/SGThread.hxx index d905c624..e5610f4b 100644 --- a/simgear/threads/SGThread.hxx +++ b/simgear/threads/SGThread.hxx @@ -23,7 +23,11 @@ #ifndef SGTHREAD_HXX_INCLUDED #define SGTHREAD_HXX_INCLUDED 1 +#include +#include +#include #include +#include /** * Encapsulate generic threading methods. @@ -184,4 +188,41 @@ private: PrivateData* _privateData; }; +/// +/// an exclusive thread is one that is designed for frame processing; +/// it has the ability to synchronise such that the caller can await +/// the processing to finish. +class SGExclusiveThread : public SGThread{ +private: + std::mutex mutex_; + std::condition_variable condVar; + SGTimeStamp timestamp; + std::mutex Cmutex_; + std::condition_variable CcondVar; + + bool _started; + bool _terminated; + int last_await_time; + + std::atomic dataReady; + std::atomic complete; + std::atomic process_ran; + std::atomic process_running; + +public: + SGExclusiveThread(); + virtual ~SGExclusiveThread(); + void release(); + void wait(); + void clearAwaitCompletionTime(); + virtual void awaitCompletion(); + void setCompletion(); + virtual int process() = 0; + virtual void run(); + void terminate(); + bool stop(); + void ensure_running(); + bool is_running(); +}; + #endif /* SGTHREAD_HXX_INCLUDED */