Merge branch 'next' of git.gitorious.org:fg/simgear into next

This commit is contained in:
Martin Spott 2011-09-07 17:42:54 +02:00
commit ed7f2df04e
10 changed files with 544 additions and 391 deletions

View File

@ -1487,6 +1487,14 @@
RelativePath="..\..\simgear\threads\SGQueue.hxx" RelativePath="..\..\simgear\threads\SGQueue.hxx"
> >
</File> </File>
<File
RelativePath="..\..\simgear\threads\SGThread.hxx"
>
</File>
<File
RelativePath="..\..\simgear\threads\SGThread.cxx"
>
</File>
</Filter> </Filter>
<Filter <Filter
Name="Lib_sgstructure" Name="Lib_sgstructure"

View File

@ -2,7 +2,7 @@
include (SimGearComponent) include (SimGearComponent)
set(HEADERS set(HEADERS
iochannel.hxx iochannel.hxx
lowlevel.hxx lowlevel.hxx
raw_socket.hxx raw_socket.hxx
@ -18,7 +18,7 @@ set(HEADERS
HTTPRequest.hxx HTTPRequest.hxx
) )
set(SOURCES set(SOURCES
iochannel.cxx iochannel.cxx
lowlevel.cxx lowlevel.cxx
raw_socket.cxx raw_socket.cxx
@ -37,19 +37,20 @@ set(SOURCES
simgear_component(io io "${SOURCES}" "${HEADERS}") simgear_component(io io "${SOURCES}" "${HEADERS}")
add_executable(test_sock socktest.cxx) add_executable(test_sock socktest.cxx)
target_link_libraries(test_sock sgio sgstructure sgdebug ${OPENTHREADS_LIBRARY}) target_link_libraries(test_sock sgio sgstructure sgthreads sgdebug
${CMAKE_THREAD_LIBS_INIT}
${RT_LIBRARY})
add_executable(test_http test_HTTP.cxx) add_executable(test_http test_HTTP.cxx)
target_link_libraries(test_http target_link_libraries(test_http
sgio sgstructure sgtiming sgmisc sgdebug sgio sgstructure sgthreads sgtiming sgmisc sgdebug
${RT_LIBRARY} ${CMAKE_THREAD_LIBS_INIT}
${OPENTHREADS_LIBRARY}) ${RT_LIBRARY})
add_test(http ${EXECUTABLE_OUTPUT_PATH}/test_http) add_test(http ${EXECUTABLE_OUTPUT_PATH}/test_http)
add_executable(httpget httpget.cxx) add_executable(httpget httpget.cxx)
target_link_libraries(httpget target_link_libraries(httpget
sgio sgstructure sgtiming sgmisc sgdebug sgio sgstructure sgthreads sgtiming sgmisc sgdebug
${RT_LIBRARY} ${CMAKE_THREAD_LIBS_INIT}
${OPENTHREADS_LIBRARY}) ${RT_LIBRARY})

View File

@ -1,20 +1,20 @@
/* /*
simgear::Socket, adapted from PLIB Socket by James Turner simgear::Socket, adapted from PLIB Socket by James Turner
Copyright (C) 2010 James Turner Copyright (C) 2010 James Turner
PLIB - A Suite of Portable Game Libraries PLIB - A Suite of Portable Game Libraries
Copyright (C) 1998,2002 Steve Baker Copyright (C) 1998,2002 Steve Baker
This library is free software; you can redistribute it and/or This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version. version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful, This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details. Library General Public License for more details.
You should have received a copy of the GNU Library General Public You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the Free Software License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
@ -47,7 +47,7 @@
# include <sys/socket.h> # include <sys/socket.h>
# include <netinet/in.h> # include <netinet/in.h>
# include <arpa/inet.h> # include <arpa/inet.h>
# include <sys/time.h> # include <sys/time.h>
# include <unistd.h> # include <unistd.h>
# include <netdb.h> # include <netdb.h>
# include <fcntl.h> # include <fcntl.h>
@ -61,40 +61,43 @@
#include <simgear/debug/logstream.hxx> #include <simgear/debug/logstream.hxx>
#include <simgear/structure/exception.hxx> #include <simgear/structure/exception.hxx>
#include <simgear/threads/SGThread.hxx>
#include <OpenThreads/Thread>
#include <OpenThreads/Mutex>
#include <OpenThreads/Condition>
namespace { namespace {
class Resolver : public OpenThreads::Thread class Resolver : public SGThread
{ {
public: public:
static Resolver* instance() static Resolver* instance()
{ {
if (!static_instance) { if (!static_instance) {
OpenThreads::Thread::Init();
static_instance = new Resolver; static_instance = new Resolver;
atexit(&Resolver::cleanup); atexit(&Resolver::cleanup);
static_instance->start(); static_instance->start();
} }
return static_instance; return static_instance;
} }
static void cleanup() static void cleanup()
{ {
static_instance->cancel(); static_instance->shutdown();
static_instance->join();
} }
Resolver() Resolver() :
_done(false)
{ {
// take the lock initially, thread will wait upon it once running
_lock.lock();
} }
void shutdown()
{
_lock.lock();
_done = true;
_wait.signal();
_lock.unlock();
}
simgear::IPAddress* lookup(const string& host) simgear::IPAddress* lookup(const string& host)
{ {
simgear::IPAddress* result = NULL; simgear::IPAddress* result = NULL;
@ -109,7 +112,7 @@ public:
_lock.unlock(); _lock.unlock();
return result; return result;
} }
simgear::IPAddress* lookupSync(const string& host) simgear::IPAddress* lookupSync(const string& host)
{ {
simgear::IPAddress* result = NULL; simgear::IPAddress* result = NULL;
@ -140,31 +143,33 @@ protected:
*/ */
virtual void run() virtual void run()
{ {
while (true) { _lock.lock();
_wait.wait(&_lock); while (!_done) {
AddressCache::iterator it; AddressCache::iterator it;
for (it = _cache.begin(); it != _cache.end(); ++it) { for (it = _cache.begin(); it != _cache.end(); ++it) {
if (it->second == NULL) { if (it->second == NULL) {
string h = it->first; string h = it->first;
_lock.unlock(); _lock.unlock();
simgear::IPAddress* addr = new simgear::IPAddress; simgear::IPAddress* addr = new simgear::IPAddress;
// may take seconds or even minutes! // may take seconds or even minutes!
lookupHost(h.c_str(), *addr); lookupHost(h.c_str(), *addr);
_lock.lock(); _lock.lock();
// cahce may have changed while we had the lock released - // cahce may have changed while we had the lock released -
// so iterators may be invalid: restart the traversal // so iterators may be invalid: restart the traversal
it = _cache.begin(); it = _cache.begin();
_cache[h] = addr; _cache[h] = addr;
} // of found un-resolved entry } // of found un-resolved entry
} // of un-resolved address iteration } // of un-resolved address iteration
_wait.wait(_lock);
} // of thread run loop } // of thread run loop
_lock.unlock();
} }
private: private:
static Resolver* static_instance; static Resolver* static_instance;
/** /**
* The actual synchronous, blocking host lookup function * The actual synchronous, blocking host lookup function
* do *not* call this with any locks (mutexs) held, since depending * do *not* call this with any locks (mutexs) held, since depending
@ -177,7 +182,7 @@ private:
memset(&hints, 0, sizeof(struct addrinfo)); memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET; hints.ai_family = AF_INET;
bool ok = false; bool ok = false;
struct addrinfo* result0 = NULL; struct addrinfo* result0 = NULL;
int err = getaddrinfo(host, NULL, &hints, &result0); int err = getaddrinfo(host, NULL, &hints, &result0);
if (err) { if (err) {
@ -205,21 +210,22 @@ private:
freeaddrinfo(result0); freeaddrinfo(result0);
return ok; return ok;
} }
OpenThreads::Mutex _lock; SGMutex _lock;
OpenThreads::Condition _wait; SGWaitCondition _wait;
typedef std::map<string, simgear::IPAddress*> AddressCache; typedef std::map<string, simgear::IPAddress*> AddressCache;
AddressCache _cache; AddressCache _cache;
bool _done;
}; };
Resolver* Resolver::static_instance = NULL; Resolver* Resolver::static_instance = NULL;
} // of anonymous namespace } // of anonymous namespace
namespace simgear namespace simgear
{ {
IPAddress::IPAddress ( const char* host, int port ) IPAddress::IPAddress ( const char* host, int port )
{ {
set ( host, port ) ; set ( host, port ) ;
@ -266,18 +272,18 @@ void IPAddress::set ( const char* host, int port )
addr->sin_addr.s_addr = INADDR_ANY; addr->sin_addr.s_addr = INADDR_ANY;
return; return;
} }
if (strcmp(host, "<broadcast>") == 0) { if (strcmp(host, "<broadcast>") == 0) {
addr->sin_addr.s_addr = INADDR_BROADCAST; addr->sin_addr.s_addr = INADDR_BROADCAST;
return; return;
} }
// check the cache // check the cache
IPAddress* cached = Resolver::instance()->lookupSync(host); IPAddress* cached = Resolver::instance()->lookupSync(host);
if (cached) { if (cached) {
memcpy(addr, cached->getAddr(), cached->getAddrLen()); memcpy(addr, cached->getAddr(), cached->getAddrLen());
} }
addr->sin_port = htons (port); // fix up port after getaddrinfo addr->sin_port = htons (port); // fix up port after getaddrinfo
} }
@ -289,12 +295,12 @@ IPAddress::~IPAddress()
} }
bool IPAddress::lookupNonblocking(const char* host, IPAddress& addr) bool IPAddress::lookupNonblocking(const char* host, IPAddress& addr)
{ {
IPAddress* cached = Resolver::instance()->lookup(host); IPAddress* cached = Resolver::instance()->lookup(host);
if (!cached) { if (!cached) {
return false; return false;
} }
addr = *cached; addr = *cached;
return true; return true;
} }
@ -313,9 +319,9 @@ const char* IPAddress::getHost () const
return buf; return buf;
} }
unsigned int IPAddress::getIP () const unsigned int IPAddress::getIP () const
{ {
return addr->sin_addr.s_addr; return addr->sin_addr.s_addr;
} }
unsigned int IPAddress::getPort() const unsigned int IPAddress::getPort() const
@ -328,9 +334,9 @@ void IPAddress::setPort(int port)
addr->sin_port = htons(port); addr->sin_port = htons(port);
} }
unsigned int IPAddress::getFamily () const unsigned int IPAddress::getFamily () const
{ {
return addr->sin_family; return addr->sin_family;
} }
const char* IPAddress::getLocalHost () const char* IPAddress::getLocalHost ()
@ -371,7 +377,7 @@ struct sockaddr* IPAddress::getAddr() const
addr = (struct sockaddr_in*) malloc(sizeof(struct sockaddr_in)); addr = (struct sockaddr_in*) malloc(sizeof(struct sockaddr_in));
memset(addr, 0, sizeof(struct sockaddr_in)); memset(addr, 0, sizeof(struct sockaddr_in));
} }
return (struct sockaddr*) addr; return (struct sockaddr*) addr;
} }
@ -456,7 +462,7 @@ void Socket::setBroadcast ( bool broadcast )
} else { } else {
result = ::setsockopt( handle, SOL_SOCKET, SO_BROADCAST, NULL, 0 ); result = ::setsockopt( handle, SOL_SOCKET, SO_BROADCAST, NULL, 0 );
} }
if ( result < 0 ) { if ( result < 0 ) {
throw sg_exception("Socket::setBroadcast failed"); throw sg_exception("Socket::setBroadcast failed");
} }
@ -476,7 +482,7 @@ int Socket::bind ( const char* host, int port )
} }
#endif #endif
// 224.0.0.0 - 239.255.255.255 are multicast // 224.0.0.0 - 239.255.255.255 are multicast
// Usage of 239.x.x.x is recommended for local scope // Usage of 239.x.x.x is recommended for local scope
// Reference: http://tools.ietf.org/html/rfc5771 // Reference: http://tools.ietf.org/html/rfc5771
if( ntohl(addr.getIP()) >= 0xe0000000 && ntohl(addr.getIP()) <= 0xefffffff ) { if( ntohl(addr.getIP()) >= 0xe0000000 && ntohl(addr.getIP()) <= 0xefffffff ) {
@ -486,7 +492,7 @@ int Socket::bind ( const char* host, int port )
a.sin_addr.S_un.S_addr = INADDR_ANY; a.sin_addr.S_un.S_addr = INADDR_ANY;
a.sin_family = AF_INET; a.sin_family = AF_INET;
a.sin_port = htons(port); a.sin_port = htons(port);
if( (result = ::bind(handle,(const sockaddr*)&a,sizeof(a))) < 0 ) { if( (result = ::bind(handle,(const sockaddr*)&a,sizeof(a))) < 0 ) {
SG_LOG(SG_IO, SG_ALERT, "bind(any:" << port << ") failed. Errno " << errno << " (" << strerror(errno) << ")"); SG_LOG(SG_IO, SG_ALERT, "bind(any:" << port << ") failed. Errno " << errno << " (" << strerror(errno) << ")");
return result; return result;
@ -636,7 +642,7 @@ int Socket::select ( Socket** reads, Socket** writes, int timeout )
{ {
fd_set r,w; fd_set r,w;
int retval; int retval;
FD_ZERO (&r); FD_ZERO (&r);
FD_ZERO (&w); FD_ZERO (&w);
@ -674,7 +680,7 @@ int Socket::select ( Socket** reads, Socket** writes, int timeout )
// It bothers me that select()'s first argument does not appear to // It bothers me that select()'s first argument does not appear to
// work as advertised... [it hangs like this if called with // work as advertised... [it hangs like this if called with
// anything less than FD_SETSIZE, which seems wasteful?] // anything less than FD_SETSIZE, which seems wasteful?]
// Note: we ignore the 'exception' fd_set - I have never had a // Note: we ignore the 'exception' fd_set - I have never had a
// need to use it. The name is somewhat misleading - the only // need to use it. The name is somewhat misleading - the only
// thing I have ever seen it used for is to detect urgent data - // thing I have ever seen it used for is to detect urgent data -

View File

@ -1,6 +1,6 @@
#include "StringTable.hxx" #include "StringTable.hxx"
#include <OpenThreads/ScopedLock> #include <simgear/threads/SGGuard.hxx>
namespace simgear namespace simgear
{ {
@ -8,8 +8,7 @@ using namespace std;
const string* StringTable::insert(const string& str) const string* StringTable::insert(const string& str)
{ {
using namespace OpenThreads; SGGuard<SGMutex> lock(_mutex);
ScopedLock<Mutex> lock(_mutex);
StringContainer::iterator it = _strings.insert(str).first; StringContainer::iterator it = _strings.insert(str).first;
return &*it; return &*it;
} }

View File

@ -3,7 +3,7 @@
#include <string> #include <string>
#include <OpenThreads/Mutex> #include <simgear/threads/SGThread.hxx>
#include <boost/multi_index_container.hpp> #include <boost/multi_index_container.hpp>
#include <boost/multi_index/hashed_index.hpp> #include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/identity.hpp> #include <boost/multi_index/identity.hpp>
@ -21,7 +21,7 @@ class StringTable
{ {
const std::string* insert(const std::string& str); const std::string* insert(const std::string& str);
private: private:
OpenThreads::Mutex _mutex; SGMutex _mutex;
StringContainer _strings; StringContainer _strings;
}; };
} }

View File

@ -11,13 +11,12 @@
#include <memory> #include <memory>
#include <simgear/props/props_io.hxx> #include <simgear/props/props_io.hxx>
#include <OpenThreads/Mutex>
#include <OpenThreads/ScopedLock>
#include "commands.hxx" #include "commands.hxx"
#include <simgear/math/SGMath.hxx> #include <simgear/math/SGMath.hxx>
#include <simgear/structure/exception.hxx> #include <simgear/structure/exception.hxx>
#include <simgear/threads/SGThread.hxx>
#include <simgear/threads/SGGuard.hxx>
#include <simgear/debug/logstream.hxx> #include <simgear/debug/logstream.hxx>
@ -36,7 +35,7 @@ SGCommandMgr::~SGCommandMgr ()
// no-op // no-op
} }
OpenThreads::Mutex SGCommandMgr::_instanceMutex; SGMutex SGCommandMgr::_instanceMutex;
SGCommandMgr* SGCommandMgr*
SGCommandMgr::instance() SGCommandMgr::instance()
@ -45,7 +44,7 @@ SGCommandMgr::instance()
if (mgr.get()) if (mgr.get())
return mgr.get(); return mgr.get();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_instanceMutex); SGGuard<SGMutex> lock(_instanceMutex);
if (mgr.get()) if (mgr.get())
return mgr.get(); return mgr.get();
@ -85,8 +84,8 @@ SGCommandMgr::execute (const std::string &name, const SGPropertyNode * arg) cons
command_t command = getCommand(name); command_t command = getCommand(name);
if (command == 0) if (command == 0)
return false; return false;
try { try {
return (*command)(arg); return (*command)(arg);
} catch (sg_exception& e) { } catch (sg_exception& e) {

View File

@ -17,8 +17,7 @@
#include <map> #include <map>
#include <vector> #include <vector>
#include <OpenThreads/Mutex> #include <simgear/threads/SGThread.hxx>
#include <simgear/math/sg_types.hxx> #include <simgear/math/sg_types.hxx>
#include <simgear/props/props.hxx> #include <simgear/props/props.hxx>
@ -107,7 +106,7 @@ private:
typedef std::map<std::string,command_t> command_map; typedef std::map<std::string,command_t> command_map;
command_map _commands; command_map _commands;
static OpenThreads::Mutex _instanceMutex; static SGMutex _instanceMutex;
}; };

View File

@ -5,9 +5,8 @@
#include <cassert> #include <cassert>
#include <queue> #include <queue>
#include <OpenThreads/Mutex> #include "SGGuard.hxx"
#include <OpenThreads/ScopedLock> #include "SGThread.hxx"
#include <OpenThreads/Condition>
/** /**
* SGQueue defines an interface for a FIFO. * SGQueue defines an interface for a FIFO.
@ -66,7 +65,7 @@ public:
protected: protected:
/** /**
* *
*/ */
std::queue<T> fifo; std::queue<T> fifo;
}; };
@ -74,7 +73,7 @@ protected:
/** /**
* A simple thread safe queue. All access functions are guarded with a mutex. * A simple thread safe queue. All access functions are guarded with a mutex.
*/ */
template<class T, class SGLOCK=OpenThreads::Mutex> template<class T>
class SGLockedQueue : public SGQueue<T> class SGLockedQueue : public SGQueue<T>
{ {
public: public:
@ -95,7 +94,7 @@ public:
* @return bool True if queue is empty, otherwisr false. * @return bool True if queue is empty, otherwisr false.
*/ */
virtual bool empty() { virtual bool empty() {
OpenThreads::ScopedLock<SGLOCK> g(mutex); SGGuard<SGMutex> g(mutex);
return this->fifo.empty(); return this->fifo.empty();
} }
@ -105,7 +104,7 @@ public:
* @param T object to add. * @param T object to add.
*/ */
virtual void push( const T& item ) { virtual void push( const T& item ) {
OpenThreads::ScopedLock<SGLOCK> g(mutex); SGGuard<SGMutex> g(mutex);
this->fifo.push( item ); this->fifo.push( item );
} }
@ -115,7 +114,7 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T front() { virtual T front() {
OpenThreads::ScopedLock<SGLOCK> g(mutex); SGGuard<SGMutex> g(mutex);
assert( ! this->fifo.empty() ); assert( ! this->fifo.empty() );
T item = this->fifo.front(); T item = this->fifo.front();
return item; return item;
@ -127,7 +126,7 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T pop() { virtual T pop() {
OpenThreads::ScopedLock<SGLOCK> g(mutex); SGGuard<SGMutex> g(mutex);
//if (fifo.empty()) throw NoSuchElementException(); //if (fifo.empty()) throw NoSuchElementException();
assert( ! this->fifo.empty() ); assert( ! this->fifo.empty() );
// if (fifo.empty()) // if (fifo.empty())
@ -146,7 +145,7 @@ public:
* @return size_t size of queue. * @return size_t size of queue.
*/ */
virtual size_t size() { virtual size_t size() {
OpenThreads::ScopedLock<SGLOCK> g(mutex); SGGuard<SGMutex> g(mutex);
return this->fifo.size(); return this->fifo.size();
} }
@ -155,7 +154,7 @@ private:
/** /**
* Mutex to serialise access. * Mutex to serialise access.
*/ */
SGLOCK mutex; SGMutex mutex;
private: private:
// Prevent copying. // Prevent copying.
@ -182,10 +181,10 @@ public:
~SGBlockingQueue() {} ~SGBlockingQueue() {}
/** /**
* *
*/ */
virtual bool empty() { virtual bool empty() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
return this->fifo.empty(); return this->fifo.empty();
} }
@ -195,7 +194,7 @@ public:
* @param T object to add. * @param T object to add.
*/ */
virtual void push( const T& item ) { virtual void push( const T& item ) {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
this->fifo.push( item ); this->fifo.push( item );
not_empty.signal(); not_empty.signal();
} }
@ -207,7 +206,7 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T front() { virtual T front() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
assert(this->fifo.empty() != true); assert(this->fifo.empty() != true);
//if (fifo.empty()) throw ?? //if (fifo.empty()) throw ??
@ -223,10 +222,10 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T pop() { virtual T pop() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
while (this->fifo.empty()) while (this->fifo.empty())
not_empty.wait(&mutex); not_empty.wait(mutex);
assert(this->fifo.empty() != true); assert(this->fifo.empty() != true);
//if (fifo.empty()) throw ?? //if (fifo.empty()) throw ??
@ -242,7 +241,7 @@ public:
* @return size_t size of queue. * @return size_t size of queue.
*/ */
virtual size_t size() { virtual size_t size() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
return this->fifo.size(); return this->fifo.size();
} }
@ -251,12 +250,12 @@ private:
/** /**
* Mutex to serialise access. * Mutex to serialise access.
*/ */
OpenThreads::Mutex mutex; SGMutex mutex;
/** /**
* Condition to signal when queue not empty. * Condition to signal when queue not empty.
*/ */
OpenThreads::Condition not_empty; SGWaitCondition not_empty;
private: private:
// Prevent copying. // Prevent copying.
@ -284,18 +283,18 @@ public:
~SGBlockingDeque() {} ~SGBlockingDeque() {}
/** /**
* *
*/ */
virtual void clear() { virtual void clear() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
this->queue.clear(); this->queue.clear();
} }
/** /**
* *
*/ */
virtual bool empty() { virtual bool empty() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
return this->queue.empty(); return this->queue.empty();
} }
@ -305,7 +304,7 @@ public:
* @param T object to add. * @param T object to add.
*/ */
virtual void push_front( const T& item ) { virtual void push_front( const T& item ) {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
this->queue.push_front( item ); this->queue.push_front( item );
not_empty.signal(); not_empty.signal();
} }
@ -316,7 +315,7 @@ public:
* @param T object to add. * @param T object to add.
*/ */
virtual void push_back( const T& item ) { virtual void push_back( const T& item ) {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
this->queue.push_back( item ); this->queue.push_back( item );
not_empty.signal(); not_empty.signal();
} }
@ -328,7 +327,7 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T front() { virtual T front() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
assert(this->queue.empty() != true); assert(this->queue.empty() != true);
//if (queue.empty()) throw ?? //if (queue.empty()) throw ??
@ -344,10 +343,10 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T pop_front() { virtual T pop_front() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
while (this->queue.empty()) while (this->queue.empty())
not_empty.wait(&mutex); not_empty.wait(mutex);
assert(this->queue.empty() != true); assert(this->queue.empty() != true);
//if (queue.empty()) throw ?? //if (queue.empty()) throw ??
@ -364,10 +363,10 @@ public:
* @return T next available object. * @return T next available object.
*/ */
virtual T pop_back() { virtual T pop_back() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
while (this->queue.empty()) while (this->queue.empty())
not_empty.wait(&mutex); not_empty.wait(mutex);
assert(this->queue.empty() != true); assert(this->queue.empty() != true);
//if (queue.empty()) throw ?? //if (queue.empty()) throw ??
@ -383,7 +382,7 @@ public:
* @return size_t size of queue. * @return size_t size of queue.
*/ */
virtual size_t size() { virtual size_t size() {
OpenThreads::ScopedLock<OpenThreads::Mutex> g(mutex); SGGuard<SGMutex> g(mutex);
return this->queue.size(); return this->queue.size();
} }
@ -392,12 +391,12 @@ private:
/** /**
* Mutex to serialise access. * Mutex to serialise access.
*/ */
OpenThreads::Mutex mutex; SGMutex mutex;
/** /**
* Condition to signal when queue not empty. * Condition to signal when queue not empty.
*/ */
OpenThreads::Condition not_empty; SGWaitCondition not_empty;
private: private:
// Prevent copying. // Prevent copying.

View File

@ -1,107 +1,410 @@
#include <simgear/compiler.h> // SGThread - Simple pthread class wrappers.
//
// Written by Bernie Bright, started April 2001.
//
// Copyright (C) 2001 Bernard Bright - bbright@bigpond.net.au
// Copyright (C) 2011 Mathias Froehlich
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation; either version 2 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
#if defined(_MSC_VER) || defined(__MINGW32__) #ifdef HAVE_CONFIG_H
# include <time.h> # include <simgear_config.h>
#else
# if defined ( sgi ) && !defined( __GNUC__ )
// This works around a bug triggered when using MipsPro 7.4.1
// and (at least) IRIX 6.5.20
# include <iostream>
# endif
# include <sys/time.h>
#endif
#if _MSC_VER >= 1300
# include <winsock2.h>
#endif #endif
#include <simgear/compiler.h>
#include "SGThread.hxx" #include "SGThread.hxx"
void* #ifdef _WIN32
start_handler( void* arg )
/////////////////////////////////////////////////////////////////////////////
/// win32 threads
/////////////////////////////////////////////////////////////////////////////
#include <list>
#include <windows.h>
struct SGThread::PrivateData {
PrivateData() :
_handle(INVALID_HANDLE_VALUE)
{
}
~PrivateData()
{
if (_handle == INVALID_HANDLE_VALUE)
return;
CloseHandle(_handle);
_handle = INVALID_HANDLE_VALUE;
}
static DWORD WINAPI start_routine(LPVOID data)
{
SGThread* thread = reinterpret_cast<SGThread*>(data);
thread->run();
return 0;
}
bool start(SGThread& thread)
{
if (_handle != INVALID_HANDLE_VALUE)
return false;
_handle = CreateThread(0, 0, start_routine, &thread, 0, 0);
if (_handle == INVALID_HANDLE_VALUE)
return false;
return true;
}
void join()
{
if (_handle == INVALID_HANDLE_VALUE)
return;
DWORD ret = WaitForSingleObject(_handle, 0);
if (ret != WAIT_OBJECT_0)
return;
CloseHandle(_handle);
_handle = INVALID_HANDLE_VALUE;
}
HANDLE _handle;
};
struct SGMutex::PrivateData {
PrivateData()
{
InitializeCriticalSection((LPCRITICAL_SECTION)&_criticalSection);
}
~PrivateData()
{
DeleteCriticalSection((LPCRITICAL_SECTION)&_criticalSection);
}
void lock(void)
{
EnterCriticalSection((LPCRITICAL_SECTION)&_criticalSection);
}
void unlock(void)
{
LeaveCriticalSection((LPCRITICAL_SECTION)&_criticalSection);
}
CRITICAL_SECTION _criticalSection;
};
struct SGWaitCondition::PrivateData {
~PrivateData(void)
{
// The waiters list should be empty anyway
_mutex.lock();
while (!_pool.empty()) {
CloseHandle(_pool.front());
_pool.pop_front();
}
_mutex.unlock();
}
void signal(void)
{
_mutex.lock();
if (!_waiters.empty())
SetEvent(_waiters.back());
_mutex.unlock();
}
void broadcast(void)
{
_mutex.lock();
for (std::list<HANDLE>::iterator i = _waiters.begin(); i != _waiters.end(); ++i)
SetEvent(*i);
_mutex.unlock();
}
bool wait(SGMutex::PrivateData& externalMutex, DWORD msec)
{
_mutex.lock();
if (_pool.empty())
_waiters.push_front(CreateEvent(NULL, FALSE, FALSE, NULL));
else
_waiters.splice(_waiters.begin(), _pool, _pool.begin());
std::list<HANDLE>::iterator i = _waiters.begin();
_mutex.unlock();
externalMutex.unlock();
DWORD result = WaitForSingleObject(*i, msec);
externalMutex.lock();
_mutex.lock();
if (result != WAIT_OBJECT_0)
result = WaitForSingleObject(*i, 0);
_pool.splice(_pool.begin(), _waiters, i);
_mutex.unlock();
return result == WAIT_OBJECT_0;
}
void wait(SGMutex::PrivateData& externalMutex)
{
wait(externalMutex, INFINITE);
}
// Protect the list of waiters
SGMutex::PrivateData _mutex;
std::list<HANDLE> _waiters;
std::list<HANDLE> _pool;
};
#else
/////////////////////////////////////////////////////////////////////////////
/// posix threads
/////////////////////////////////////////////////////////////////////////////
#include <pthread.h>
#include <cassert>
#include <cerrno>
#include <sys/time.h>
struct SGThread::PrivateData {
PrivateData() :
_started(false)
{
}
~PrivateData()
{
// If we are still having a started thread and nobody waited,
// now detach ...
if (!_started)
return;
pthread_detach(_thread);
}
static void *start_routine(void* data)
{
SGThread* thread = reinterpret_cast<SGThread*>(data);
thread->run();
return 0;
}
bool start(SGThread& thread)
{
if (_started)
return false;
int ret = pthread_create(&_thread, 0, start_routine, &thread);
if (0 != ret)
return false;
_started = true;
return true;
}
void join()
{
if (!_started)
return;
pthread_join(_thread, 0);
_started = false;
}
pthread_t _thread;
bool _started;
};
struct SGMutex::PrivateData {
PrivateData()
{
int err = pthread_mutex_init(&_mutex, 0);
assert(err == 0);
(void)err;
}
~PrivateData()
{
int err = pthread_mutex_destroy(&_mutex);
assert(err == 0);
(void)err;
}
void lock(void)
{
int err = pthread_mutex_lock(&_mutex);
assert(err == 0);
(void)err;
}
void unlock(void)
{
int err = pthread_mutex_unlock(&_mutex);
assert(err == 0);
(void)err;
}
pthread_mutex_t _mutex;
};
struct SGWaitCondition::PrivateData {
PrivateData(void)
{
int err = pthread_cond_init(&_condition, NULL);
assert(err == 0);
(void)err;
}
~PrivateData(void)
{
int err = pthread_cond_destroy(&_condition);
assert(err == 0);
(void)err;
}
void signal(void)
{
int err = pthread_cond_signal(&_condition);
assert(err == 0);
(void)err;
}
void broadcast(void)
{
int err = pthread_cond_broadcast(&_condition);
assert(err == 0);
(void)err;
}
void wait(SGMutex::PrivateData& mutex)
{
int err = pthread_cond_wait(&_condition, &mutex._mutex);
assert(err == 0);
(void)err;
}
bool wait(SGMutex::PrivateData& mutex, unsigned msec)
{
struct timespec ts;
#ifdef HAVE_CLOCK_GETTIME
if (0 != clock_gettime(CLOCK_REALTIME, &ts))
return false;
#else
struct timeval tv;
if (0 != gettimeofday(&tv, NULL))
return false;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
#endif
ts.tv_nsec += 1000000*(msec % 1000);
if (1000000000 <= ts.tv_nsec) {
ts.tv_nsec -= 1000000000;
ts.tv_sec += 1;
}
ts.tv_sec += msec / 1000;
int evalue = pthread_cond_timedwait(&_condition, &mutex._mutex, &ts);
if (evalue == 0)
return true;
assert(evalue == ETIMEDOUT);
return false;
}
pthread_cond_t _condition;
};
#endif
SGThread::SGThread() :
_privateData(new PrivateData)
{ {
SGThread* thr = static_cast<SGThread*>(arg); }
thr->run();
return 0; SGThread::~SGThread()
{
delete _privateData;
_privateData = 0;
}
bool
SGThread::start()
{
return _privateData->start(*this);
} }
void void
SGThread::set_cancel( cancel_t mode ) SGThread::join()
{ {
switch (mode) _privateData->join();
{ }
case CANCEL_DISABLE:
pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 ); SGMutex::SGMutex() :
break; _privateData(new PrivateData)
case CANCEL_DEFERRED: {
pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 ); }
pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
break; SGMutex::~SGMutex()
case CANCEL_IMMEDIATE: {
pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, 0 ); delete _privateData;
pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 ); _privateData = 0;
break; }
default:
break; void
} SGMutex::lock()
{
_privateData->lock();
}
void
SGMutex::unlock()
{
_privateData->unlock();
}
SGWaitCondition::SGWaitCondition() :
_privateData(new PrivateData)
{
}
SGWaitCondition::~SGWaitCondition()
{
delete _privateData;
_privateData = 0;
}
void
SGWaitCondition::wait(SGMutex& mutex)
{
_privateData->wait(*mutex._privateData);
} }
bool bool
SGMutex::trylock() SGWaitCondition::wait(SGMutex& mutex, unsigned msec)
{ {
int status = pthread_mutex_lock( &mutex ); return _privateData->wait(*mutex._privateData, msec);
if (status == EBUSY)
{
return false;
}
assert( status == 0 );
return true;
} }
#if defined(_MSC_VER) || defined(__MINGW32__) void
int gettimeofday(struct timeval* tp, void* tzp) { SGWaitCondition::signal()
LARGE_INTEGER t;
if(QueryPerformanceCounter(&t)) {
/* hardware supports a performance counter */
LARGE_INTEGER f;
QueryPerformanceFrequency(&f);
tp->tv_sec = t.QuadPart/f.QuadPart;
tp->tv_usec = ((float)t.QuadPart/f.QuadPart*1000*1000)
- (tp->tv_sec*1000*1000);
} else {
/* hardware doesn't support a performance counter, so get the
time in a more traditional way. */
DWORD t;
t = timeGetTime();
tp->tv_sec = t / 1000;
tp->tv_usec = t % 1000;
}
/* 0 indicates that the call succeeded. */
return 0;
}
#endif
bool
SGPthreadCond::wait( SGMutex& mutex, unsigned long ms )
{ {
struct timeval now; _privateData->signal();
::gettimeofday( &now, 0 );
// Wait time is now + ms milliseconds
unsigned int sec = ms / 1000;
unsigned int nsec = (ms % 1000) * 1000;
struct timespec abstime;
abstime.tv_sec = now.tv_sec + sec;
abstime.tv_nsec = now.tv_usec*1000 + nsec;
int status = pthread_cond_timedwait( &cond, &mutex.mutex, &abstime );
if (status == ETIMEDOUT)
{
return false;
}
assert( status == 0 );
return true;
} }
void
SGWaitCondition::broadcast()
{
_privateData->broadcast();
}

View File

@ -3,6 +3,7 @@
// Written by Bernie Bright, started April 2001. // Written by Bernie Bright, started April 2001.
// //
// Copyright (C) 2001 Bernard Bright - bbright@bigpond.net.au // Copyright (C) 2001 Bernard Bright - bbright@bigpond.net.au
// Copyright (C) 2011 Mathias Froehlich
// //
// This program is free software; you can redistribute it and/or // This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License as // modify it under the terms of the GNU General Public License as
@ -18,41 +19,18 @@
// along with this program; if not, write to the Free Software // along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
// //
// $Id$
#ifndef SGTHREAD_HXX_INCLUDED #ifndef SGTHREAD_HXX_INCLUDED
#define SGTHREAD_HXX_INCLUDED 1 #define SGTHREAD_HXX_INCLUDED 1
#include <simgear/compiler.h> #include <simgear/compiler.h>
#include <pthread.h>
#include <cassert>
#include <cerrno>
class SGThread;
extern "C" {
void* start_handler( void* );
};
/** /**
* Encapsulate generic threading methods. * Encapsulate generic threading methods.
* Users derive a class from SGThread and implement the run() member function. * Users derive a class from SGThread and implement the run() member function.
*/ */
class SGThread class SGThread {
{
public: public:
/**
* SGThread cancelation modes.
*/
enum cancel_t
{
CANCEL_DISABLE = 0,
CANCEL_DEFERRED,
CANCEL_IMMEDIATE
};
public:
/** /**
* Create a new thread object. * Create a new thread object.
* When a SGThread object is created it does not begin execution * When a SGThread object is created it does not begin execution
@ -62,18 +40,9 @@ public:
/** /**
* Start the underlying thread of execution. * Start the underlying thread of execution.
* @param cpu An optional parameter to specify on which CPU to run this
* thread (only supported on IRIX at this time).
* @return Pthread error code if execution fails, otherwise returns 0. * @return Pthread error code if execution fails, otherwise returns 0.
*/ */
int start( unsigned cpu = 0 ); bool start();
/**
* Sends a cancellation request to the underlying thread. The target
* thread will either ignore the request, honor it immediately or defer
* it until it reaches a cancellation point.
*/
void cancel();
/** /**
* Suspends the exection of the calling thread until this thread * Suspends the exection of the calling thread until this thread
@ -89,82 +58,31 @@ protected:
*/ */
virtual ~SGThread(); virtual ~SGThread();
/**
* Set the threads cancellation mode.
* @param mode The required cancellation mode.
*/
void set_cancel( cancel_t mode );
/** /**
* All threads execute by deriving the run() method of SGThread. * All threads execute by deriving the run() method of SGThread.
* If this function terminates then the thread also terminates. * If this function terminates then the thread also terminates.
*/ */
virtual void run() = 0; virtual void run() = 0;
private:
/**
* Pthread thread identifier.
*/
pthread_t tid;
friend void* start_handler( void* );
private: private:
// Disable copying. // Disable copying.
SGThread( const SGThread& ); SGThread(const SGThread&);
SGThread& operator=( const SGThread& ); SGThread& operator=(const SGThread&);
struct PrivateData;
PrivateData* _privateData;
friend struct PrivateData;
}; };
inline class SGWaitCondition;
SGThread::SGThread()
{
}
inline
SGThread::~SGThread()
{
}
inline int
SGThread::start( unsigned cpu )
{
int status = pthread_create( &tid, 0, start_handler, this );
assert( status == 0 );
(void)status;
#if defined( sgi )
if ( !status && !cpu )
pthread_setrunon_np( cpu );
#endif
return status;
}
inline void
SGThread::join()
{
int status = pthread_join( tid, 0 );
assert( status == 0 );
(void)status;
}
inline void
SGThread::cancel()
{
int status = pthread_cancel( tid );
assert( status == 0 );
(void)status;
}
/** /**
* A mutex is used to protect a section of code such that at any time * A mutex is used to protect a section of code such that at any time
* only a single thread can execute the code. * only a single thread can execute the code.
*/ */
class SGMutex class SGMutex {
{
friend class SGPthreadCond;
public: public:
/** /**
* Create a new mutex. * Create a new mutex.
* Under Linux this is a 'fast' mutex. * Under Linux this is a 'fast' mutex.
@ -186,86 +104,46 @@ public:
* mutex is already locked and owned by the calling thread, the calling * mutex is already locked and owned by the calling thread, the calling
* thread is suspended until the mutex is unlocked, effectively causing * thread is suspended until the mutex is unlocked, effectively causing
* the calling thread to deadlock. * the calling thread to deadlock.
*
* @see SGMutex::trylock
*/ */
void lock(); void lock();
/**
* Try to lock the mutex for the current thread. Behaves like lock except
* that it doesn't block the calling thread.
* @return true if mutex was successfully locked, otherwise false.
* @see SGMutex::lock
*/
bool trylock();
/** /**
* Unlock this mutex. * Unlock this mutex.
* It is assumed that the mutex is locked and owned by the calling thread. * It is assumed that the mutex is locked and owned by the calling thread.
*/ */
void unlock(); void unlock();
protected: private:
struct PrivateData;
PrivateData* _privateData;
/** friend class SGWaitCondition;
* Pthread mutex.
*/
pthread_mutex_t mutex;
}; };
inline SGMutex::SGMutex()
{
int status = pthread_mutex_init( &mutex, 0 );
assert( status == 0 );
(void)status;
}
inline SGMutex::~SGMutex()
{
int status = pthread_mutex_destroy( &mutex );
assert( status == 0 );
(void)status;
}
inline void SGMutex::lock()
{
int status = pthread_mutex_lock( &mutex );
assert( status == 0 );
(void)status;
}
inline void SGMutex::unlock()
{
int status = pthread_mutex_unlock( &mutex );
assert( status == 0 );
(void)status;
}
/** /**
* A condition variable is a synchronization device that allows threads to * A condition variable is a synchronization device that allows threads to
* suspend execution until some predicate on shared data is satisfied. * suspend execution until some predicate on shared data is satisfied.
* A condition variable is always associated with a mutex to avoid race * A condition variable is always associated with a mutex to avoid race
* conditions. * conditions.
*/ */
class SGPthreadCond class SGWaitCondition {
{
public: public:
/** /**
* Create a new condition variable. * Create a new condition variable.
*/ */
SGPthreadCond(); SGWaitCondition();
/** /**
* Destroy the condition object. * Destroy the condition object.
*/ */
~SGPthreadCond(); ~SGWaitCondition();
/** /**
* Wait for this condition variable to be signaled. * Wait for this condition variable to be signaled.
* *
* @param SGMutex& reference to a locked mutex. * @param SGMutex& reference to a locked mutex.
*/ */
void wait( SGMutex& ); void wait(SGMutex&);
/** /**
* Wait for this condition variable to be signaled for at most * Wait for this condition variable to be signaled for at most
@ -274,9 +152,9 @@ public:
* @param mutex reference to a locked mutex. * @param mutex reference to a locked mutex.
* @param ms milliseconds to wait for a signal. * @param ms milliseconds to wait for a signal.
* *
* @return * @return
*/ */
bool wait( SGMutex& mutex, unsigned long ms ); bool wait(SGMutex& mutex, unsigned msec);
/** /**
* Wake one thread waiting on this condition variable. * Wake one thread waiting on this condition variable.
@ -294,50 +172,11 @@ public:
private: private:
// Disable copying. // Disable copying.
SGPthreadCond(const SGPthreadCond& ); SGWaitCondition(const SGWaitCondition&);
SGPthreadCond& operator=(const SGPthreadCond& ); SGWaitCondition& operator=(const SGWaitCondition&);
private: struct PrivateData;
PrivateData* _privateData;
/**
* The Pthread conditon variable.
*/
pthread_cond_t cond;
}; };
inline SGPthreadCond::SGPthreadCond()
{
int status = pthread_cond_init( &cond, 0 );
assert( status == 0 );
(void)status;
}
inline SGPthreadCond::~SGPthreadCond()
{
int status = pthread_cond_destroy( &cond );
assert( status == 0 );
(void)status;
}
inline void SGPthreadCond::signal()
{
int status = pthread_cond_signal( &cond );
assert( status == 0 );
(void)status;
}
inline void SGPthreadCond::broadcast()
{
int status = pthread_cond_broadcast( &cond );
assert( status == 0 );
(void)status;
}
inline void SGPthreadCond::wait( SGMutex& mutex )
{
int status = pthread_cond_wait( &cond, &mutex.mutex );
assert( status == 0 );
(void)status;
}
#endif /* SGTHREAD_HXX_INCLUDED */ #endif /* SGTHREAD_HXX_INCLUDED */