Async lookup attempt #3 - use OpenThreads instead - I'm going to kill off SGThread imminently.

This commit is contained in:
James Turner 2011-08-30 15:14:14 +01:00
parent eafea28266
commit 427d6c3316
7 changed files with 250 additions and 52 deletions

View File

@ -32,6 +32,9 @@ AM_INIT_AUTOMAKE([dist-bzip2])
AC_ARG_ENABLE(headless,
AS_HELP_STRING([--enable-headless],[Enable only packages for headless build]))
AC_ARG_ENABLE(osgdebug,
AS_HELP_STRING([--enable-osgdebug],[Enable debug OSG libraries]))
AC_MSG_CHECKING([for headless mode])
AC_MSG_RESULT([$enable_headless])
@ -437,6 +440,8 @@ AM_CONDITIONAL(EXTGL_NEEDED, test "x$ac_cv_header_windows_h" = "xyes")
CXXCPP="g++ -E"
AC_LANG_PUSH(C++)
LIBS="$base_LIBS"
# OpenSceneGraph
case "${host}" in
*-apple-darwin*)
@ -497,6 +502,10 @@ if test "x$ac_cv_header_osg_Version" != "xyes" -o "x$ac_cv_lib_OpenThreads_OpenT
fi
fi
osg_LIBS="$LIBS"
AC_SUBST(osg_LIBS)
LIBS="$base_LIBS"
AC_CHECK_HEADER(boost/version.hpp)
if test "x$ac_cv_header_boost_version_hpp" != "xyes"; then
echo

View File

@ -37,17 +37,19 @@ set(SOURCES
simgear_component(io io "${SOURCES}" "${HEADERS}")
add_executable(test_sock socktest.cxx)
target_link_libraries(test_sock sgio sgstructure sgdebug)
target_link_libraries(test_sock sgio sgstructure sgdebug ${OPENTHREADS_LIBRARY})
add_executable(test_http test_HTTP.cxx)
target_link_libraries(test_http
sgio sgstructure sgtiming sgmisc sgdebug
${RT_LIBRARY})
${RT_LIBRARY}
${OPENTHREADS_LIBRARY})
add_test(http ${EXECUTABLE_OUTPUT_PATH}/test_http)
add_executable(httpget httpget.cxx)
target_link_libraries(httpget
sgio sgstructure sgtiming sgmisc sgdebug
${RT_LIBRARY})
${RT_LIBRARY}
${OPENTHREADS_LIBRARY})

View File

@ -46,6 +46,7 @@ tcp_server_LDADD = \
$(top_builddir)/simgear/misc/libsgmisc.a \
-lz \
$(network_LIBS) \
$(osg_LIBS) \
$(base_LIBS)
tcp_client_SOURCES = tcp_client.cxx
@ -58,6 +59,7 @@ tcp_client_LDADD = \
$(top_builddir)/simgear/misc/libsgmisc.a \
-lz \
$(network_LIBS) \
$(osg_LIBS) \
$(base_LIBS)
socktest_SOURCES = socktest.cxx
@ -70,6 +72,7 @@ socktest_LDADD = \
$(top_builddir)/simgear/misc/libsgmisc.a \
-lz \
$(network_LIBS) \
$(osg_LIBS) \
$(base_LIBS)
lowtest_SOURCES = lowtest.cxx
@ -80,7 +83,7 @@ lowtest_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \
$(base_LIBS) -lz
$(base_LIBS) -lz $(osg_LIBS)
decode_binobj_SOURCES = decode_binobj.cxx
@ -89,4 +92,4 @@ decode_binobj_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \
$(base_LIBS) -lz
$(base_LIBS) -lz $(osg_LIBS)

View File

@ -57,9 +57,166 @@
#define socklen_t int
#endif
#include <map>
#include <simgear/debug/logstream.hxx>
#include <simgear/structure/exception.hxx>
#include <OpenThreads/Thread>
#include <OpenThreads/Mutex>
#include <OpenThreads/Condition>
namespace {
class Resolver : public OpenThreads::Thread
{
public:
static Resolver* instance()
{
if (!static_instance) {
OpenThreads::Thread::Init();
static_instance = new Resolver;
atexit(&Resolver::cleanup);
static_instance->start();
}
return static_instance;
}
static void cleanup()
{
static_instance->cancel();
}
Resolver()
{
// take the lock initially, thread will wait upon it once running
_lock.lock();
}
simgear::IPAddress* lookup(const string& host)
{
simgear::IPAddress* result = NULL;
_lock.lock();
AddressCache::iterator it = _cache.find(host);
if (it == _cache.end()) {
_cache[host] = NULL; // mark as needing looked up
_wait.signal(); // if the thread was sleeping, poke it
} else {
result = it->second;
}
_lock.unlock();
return result;
}
simgear::IPAddress* lookupSync(const string& host)
{
simgear::IPAddress* result = NULL;
_lock.lock();
AddressCache::iterator it = _cache.find(host);
if (it == _cache.end()) {
_lock.unlock();
result = new simgear::IPAddress;
bool ok = lookupHost(host.c_str(), *result);
_lock.lock();
if (ok) {
_cache[host] = result; // mark as needing looked up
} else {
delete result;
result = NULL;
}
} else { // found in cache, easy
result = it->second;
}
_lock.unlock();
return result;
}
protected:
/**
* run method waits on a condition (_wait), and when awoken,
* finds any unresolved entries in _cache, resolves them, and goes
* back to sleep.
*/
virtual void run()
{
while (true) {
_wait.wait(&_lock);
AddressCache::iterator it;
for (it = _cache.begin(); it != _cache.end(); ++it) {
if (it->second == NULL) {
string h = it->first;
_lock.unlock();
simgear::IPAddress* addr = new simgear::IPAddress;
// may take seconds or even minutes!
lookupHost(h.c_str(), *addr);
_lock.lock();
// cahce may have changed while we had the lock released -
// so iterators may be invalid: restart the traversal
it = _cache.begin();
_cache[h] = addr;
} // of found un-resolved entry
} // of un-resolved address iteration
} // of thread run loop
}
private:
static Resolver* static_instance;
/**
* The actual synchronous, blocking host lookup function
* do *not* call this with any locks (mutexs) held, since depending
* on local system configuration / network availability, it
* may block for seconds or minutes.
*/
bool lookupHost(const char* host, simgear::IPAddress& addr)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET;
bool ok = false;
struct addrinfo* result0 = NULL;
int err = getaddrinfo(host, NULL, &hints, &result0);
if (err) {
SG_LOG(SG_IO, SG_WARN, "getaddrinfo failed for '" << host << "' : " << gai_strerror(err));
return false;
} else {
struct addrinfo* result;
for (result = result0; result != NULL; result = result->ai_next) {
if (result->ai_family != AF_INET) { // only accept IP4 for the moment
continue;
}
if (result->ai_addrlen != addr.getAddrLen()) {
SG_LOG(SG_IO, SG_ALERT, "mismatch in socket address sizes: got " <<
result->ai_addrlen << ", expected " << addr.getAddrLen());
continue;
}
memcpy(addr.getAddr(), result->ai_addr, result->ai_addrlen);
ok = true;
break;
} // of getaddrinfo results iteration
} // of getaddrinfo succeeded
freeaddrinfo(result0);
return ok;
}
OpenThreads::Mutex _lock;
OpenThreads::Condition _wait;
typedef std::map<string, simgear::IPAddress*> AddressCache;
AddressCache _cache;
};
Resolver* Resolver::static_instance = NULL;
} // of anonymous namespace
namespace simgear
{
@ -115,33 +272,12 @@ void IPAddress::set ( const char* host, int port )
return;
}
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET;
// check the cache
IPAddress* cached = Resolver::instance()->lookupSync(host);
if (cached) {
memcpy(addr, cached->getAddr(), cached->getAddrLen());
}
struct addrinfo* result0 = NULL;
int err = getaddrinfo(host, NULL, &hints, &result0);
if (err) {
SG_LOG(SG_IO, SG_WARN, "getaddrinfo failed for '" << host << "' : " << gai_strerror(err));
} else {
struct addrinfo* result;
for (result = result0; result != NULL; result = result->ai_next) {
if (result->ai_family != AF_INET) { // only accept IP4 for the moment
continue;
}
if (result->ai_addrlen != getAddrLen()) {
SG_LOG(SG_IO, SG_ALERT, "mismatch in socket address sizes: got " <<
result->ai_addrlen << ", expected " << getAddrLen());
continue;
}
memcpy(addr, result->ai_addr, result->ai_addrlen);
break;
} // of getaddrinfo results iteration
} // of getaddrinfo succeeded
freeaddrinfo(result0);
addr->sin_port = htons (port); // fix up port after getaddrinfo
}
@ -152,6 +288,17 @@ IPAddress::~IPAddress()
}
}
bool IPAddress::lookupNonblocking(const char* host, IPAddress& addr)
{
IPAddress* cached = Resolver::instance()->lookup(host);
if (!cached) {
return false;
}
addr = *cached;
return true;
}
/* Create a string object representing an IP address.
This is always a string of the form 'dd.dd.dd.dd' (with variable
size numbers). */
@ -176,6 +323,11 @@ unsigned int IPAddress::getPort() const
return ntohs(addr->sin_port);
}
void IPAddress::setPort(int port)
{
addr->sin_port = htons(port);
}
unsigned int IPAddress::getFamily () const
{
return addr->sin_family;
@ -215,6 +367,11 @@ unsigned int IPAddress::getAddrLen() const
struct sockaddr* IPAddress::getAddr() const
{
if (addr == NULL) {
addr = (struct sockaddr_in*) malloc(sizeof(struct sockaddr_in));
memset(addr, 0, sizeof(struct sockaddr_in));
}
return (struct sockaddr*) addr;
}

View File

@ -40,18 +40,22 @@ namespace simgear
*/
class IPAddress
{
struct sockaddr_in* addr;
mutable struct sockaddr_in* addr;
public:
IPAddress () : addr(0) {}
IPAddress ( const char* host, int port ) ;
~IPAddress();
static bool lookupNonblocking(const char* host, IPAddress& addr);
IPAddress( const IPAddress& other );
const IPAddress& operator=(const IPAddress& other);
void set ( const char* host, int port ) ;
const char* getHost () const ;
unsigned int getPort() const ;
void setPort(int port);
unsigned int getIP () const ;
unsigned int getFamily () const ;
static const char* getLocalHost () ;
@ -69,7 +73,7 @@ public:
class Socket
{
int handle ;
public:
Socket () ;

View File

@ -38,6 +38,7 @@
#include <simgear/debug/logstream.hxx>
namespace simgear {
static NetChannel* channels = 0 ;
@ -46,6 +47,7 @@ NetChannel::NetChannel ()
{
closed = true ;
connected = false ;
resolving_host = false;
accepting = false ;
write_blocked = false ;
should_delete = false ;
@ -83,7 +85,6 @@ NetChannel::setHandle (int handle, bool is_connected)
close () ;
Socket::setHandle ( handle ) ;
connected = is_connected ;
//if ( connected ) this->handleConnect();
closed = false ;
}
@ -107,21 +108,12 @@ NetChannel::listen ( int backlog )
}
int
NetChannel::connect ( const char* host, int port )
NetChannel::connect ( const char* h, int p )
{
int result = Socket::connect ( host, port ) ;
if (result == 0) {
connected = true ;
//this->handleConnect();
return 0;
} else if (isNonBlockingError ()) {
return 0;
} else {
// some other error condition
this->handleError (result);
close();
return -1;
}
host = h;
port = p;
resolving_host = true;
return handleResolve();
}
int
@ -189,12 +181,10 @@ NetChannel::handleReadEvent (void)
if (accepting) {
if (!connected) {
connected = true ;
//this->handleConnect();
}
this->handleAccept();
} else if (!connected) {
connected = true ;
//this->handleConnect();
this->handleRead();
} else {
this->handleRead();
@ -206,12 +196,35 @@ NetChannel::handleWriteEvent (void)
{
if (!connected) {
connected = true ;
//this->handleConnect();
}
write_blocked = false ;
this->handleWrite();
}
int
NetChannel::handleResolve()
{
IPAddress addr;
if (!IPAddress::lookupNonblocking(host.c_str(), addr)) {
return 0; // not looked up yet, wait longer
}
resolving_host = false;
addr.setPort(port);
int result = Socket::connect ( &addr ) ;
if (result == 0) {
connected = true ;
return 0;
} else if (isNonBlockingError ()) {
return 0;
} else {
// some other error condition
handleError (result);
close();
return -1;
}
}
bool
NetChannel::poll (unsigned int timeout)
{
@ -236,6 +249,12 @@ NetChannel::poll (unsigned int timeout)
}
else if ( ! ch -> closed )
{
if (ch -> resolving_host )
{
ch -> handleResolve();
continue;
}
nopen++ ;
if (ch -> readable()) {
assert(nreads<MAX_SOCKETS);

View File

@ -54,14 +54,17 @@
#define SG_NET_CHANNEL_H
#include <simgear/io/raw_socket.hxx>
#include <string>
namespace simgear
{
class NetChannel : public Socket
{
bool closed, connected, accepting, write_blocked, should_delete ;
bool closed, connected, accepting, write_blocked, should_delete, resolving_host ;
NetChannel* next_channel ;
std::string host;
int port;
friend bool netPoll (unsigned int timeout);
@ -96,6 +99,7 @@ public:
void handleReadEvent (void);
void handleWriteEvent (void);
int handleResolve (void);
// These are meant to be overridden.
virtual void handleClose (void) {