Asynchronous host lookups+caching, attempt #2

This commit is contained in:
James Turner 2011-08-30 12:51:03 +01:00
parent 959791ffde
commit 1cb9a79fd4
6 changed files with 235 additions and 50 deletions

View File

@ -37,17 +37,17 @@ set(SOURCES
simgear_component(io io "${SOURCES}" "${HEADERS}") simgear_component(io io "${SOURCES}" "${HEADERS}")
add_executable(test_sock socktest.cxx) add_executable(test_sock socktest.cxx)
target_link_libraries(test_sock sgio sgstructure sgdebug) target_link_libraries(test_sock sgio sgstructure sgdebug sgthreads)
add_executable(test_http test_HTTP.cxx) add_executable(test_http test_HTTP.cxx)
target_link_libraries(test_http target_link_libraries(test_http
sgio sgstructure sgtiming sgmisc sgdebug sgio sgstructure sgtiming sgmisc sgdebug sgthreads
${RT_LIBRARY}) ${RT_LIBRARY})
add_test(http ${EXECUTABLE_OUTPUT_PATH}/test_http) add_test(http ${EXECUTABLE_OUTPUT_PATH}/test_http)
add_executable(httpget httpget.cxx) add_executable(httpget httpget.cxx)
target_link_libraries(httpget target_link_libraries(httpget
sgio sgstructure sgtiming sgmisc sgdebug sgio sgstructure sgtiming sgmisc sgdebug sgthreads
${RT_LIBRARY}) ${RT_LIBRARY})

View File

@ -44,6 +44,7 @@ tcp_server_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \ $(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \ $(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \ $(top_builddir)/simgear/misc/libsgmisc.a \
$(top_builddir)/simgear/threads/libsgthreads.a \
-lz \ -lz \
$(network_LIBS) \ $(network_LIBS) \
$(base_LIBS) $(base_LIBS)
@ -56,6 +57,7 @@ tcp_client_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \ $(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \ $(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \ $(top_builddir)/simgear/misc/libsgmisc.a \
$(top_builddir)/simgear/threads/libsgthreads.a \
-lz \ -lz \
$(network_LIBS) \ $(network_LIBS) \
$(base_LIBS) $(base_LIBS)
@ -68,6 +70,7 @@ socktest_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \ $(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \ $(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \ $(top_builddir)/simgear/misc/libsgmisc.a \
$(top_builddir)/simgear/threads/libsgthreads.a \
-lz \ -lz \
$(network_LIBS) \ $(network_LIBS) \
$(base_LIBS) $(base_LIBS)
@ -80,6 +83,7 @@ lowtest_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \ $(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \ $(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \ $(top_builddir)/simgear/misc/libsgmisc.a \
$(top_builddir)/simgear/threads/libsgthreads.a \
$(base_LIBS) -lz $(base_LIBS) -lz
decode_binobj_SOURCES = decode_binobj.cxx decode_binobj_SOURCES = decode_binobj.cxx
@ -89,4 +93,5 @@ decode_binobj_LDADD = \
$(top_builddir)/simgear/debug/libsgdebug.a \ $(top_builddir)/simgear/debug/libsgdebug.a \
$(top_builddir)/simgear/bucket/libsgbucket.a \ $(top_builddir)/simgear/bucket/libsgbucket.a \
$(top_builddir)/simgear/misc/libsgmisc.a \ $(top_builddir)/simgear/misc/libsgmisc.a \
$(top_builddir)/simgear/threads/libsgthreads.a \
$(base_LIBS) -lz $(base_LIBS) -lz

View File

@ -57,8 +57,161 @@
#define socklen_t int #define socklen_t int
#endif #endif
#include <map>
#include <simgear/debug/logstream.hxx> #include <simgear/debug/logstream.hxx>
#include <simgear/structure/exception.hxx> #include <simgear/structure/exception.hxx>
#include <simgear/threads/SGThread.hxx>
namespace {
class Resolver : public SGThread
{
public:
static Resolver* instance()
{
if (!static_instance) {
static_instance = new Resolver;
atexit(&Resolver::cleanup);
static_instance->start();
}
return static_instance;
}
static void cleanup()
{
static_instance->cancel();
}
Resolver()
{
// take the lock initially, thread will wait upon it once running
_lock.lock();
}
simgear::IPAddress* lookup(const string& host)
{
simgear::IPAddress* result = NULL;
_lock.lock();
AddressCache::iterator it = _cache.find(host);
if (it == _cache.end()) {
_cache[host] = NULL; // mark as needing looked up
_wait.signal(); // if the thread was sleeping, poke it
} else {
result = it->second;
}
_lock.unlock();
return result;
}
simgear::IPAddress* lookupSync(const string& host)
{
simgear::IPAddress* result = NULL;
_lock.lock();
AddressCache::iterator it = _cache.find(host);
if (it == _cache.end()) {
_lock.unlock();
result = new simgear::IPAddress;
bool ok = lookupHost(host.c_str(), *result);
_lock.lock();
if (ok) {
_cache[host] = result; // mark as needing looked up
} else {
delete result;
result = NULL;
}
} else { // found in cache, easy
result = it->second;
}
_lock.unlock();
return result;
}
protected:
/**
* run method waits on a condition (_wait), and when awoken,
* finds any unresolved entries in _cache, resolves them, and goes
* back to sleep.
*/
virtual void run()
{
while (true) {
_wait.wait(_lock);
AddressCache::iterator it;
for (it = _cache.begin(); it != _cache.end(); ++it) {
if (it->second == NULL) {
string h = it->first;
_lock.unlock();
simgear::IPAddress* addr = new simgear::IPAddress;
// may take seconds or even minutes!
lookupHost(h.c_str(), *addr);
_lock.lock();
// cahce may have changed while we had the lock released -
// so iterators may be invalid: restart the traversal
it = _cache.begin();
_cache[h] = addr;
} // of found un-resolved entry
} // of un-resolved address iteration
} // of thread run loop
}
private:
static Resolver* static_instance;
/**
* The actual synchronous, blocking host lookup function
* do *not* call this with any locks (mutexs) held, since depending
* on local system configuration / network availability, it
* may block for seconds or minutes.
*/
bool lookupHost(const char* host, simgear::IPAddress& addr)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET;
bool ok = false;
struct addrinfo* result0 = NULL;
int err = getaddrinfo(host, NULL, &hints, &result0);
if (err) {
SG_LOG(SG_IO, SG_WARN, "getaddrinfo failed for '" << host << "' : " << gai_strerror(err));
return false;
} else {
struct addrinfo* result;
for (result = result0; result != NULL; result = result->ai_next) {
if (result->ai_family != AF_INET) { // only accept IP4 for the moment
continue;
}
if (result->ai_addrlen != addr.getAddrLen()) {
SG_LOG(SG_IO, SG_ALERT, "mismatch in socket address sizes: got " <<
result->ai_addrlen << ", expected " << addr.getAddrLen());
continue;
}
memcpy(addr.getAddr(), result->ai_addr, result->ai_addrlen);
ok = true;
break;
} // of getaddrinfo results iteration
} // of getaddrinfo succeeded
freeaddrinfo(result0);
return ok;
}
SGMutex _lock;
SGPthreadCond _wait;
typedef std::map<string, simgear::IPAddress*> AddressCache;
AddressCache _cache;
};
Resolver* Resolver::static_instance = NULL;
} // of anonymous namespace
namespace simgear namespace simgear
{ {
@ -115,33 +268,12 @@ void IPAddress::set ( const char* host, int port )
return; return;
} }
struct addrinfo hints; // check the cache
memset(&hints, 0, sizeof(struct addrinfo)); IPAddress* cached = Resolver::instance()->lookupSync(host);
hints.ai_family = AF_INET; if (cached) {
memcpy(addr, cached->getAddr(), cached->getAddrLen());
}
struct addrinfo* result0 = NULL;
int err = getaddrinfo(host, NULL, &hints, &result0);
if (err) {
SG_LOG(SG_IO, SG_WARN, "getaddrinfo failed for '" << host << "' : " << gai_strerror(err));
} else {
struct addrinfo* result;
for (result = result0; result != NULL; result = result->ai_next) {
if (result->ai_family != AF_INET) { // only accept IP4 for the moment
continue;
}
if (result->ai_addrlen != getAddrLen()) {
SG_LOG(SG_IO, SG_ALERT, "mismatch in socket address sizes: got " <<
result->ai_addrlen << ", expected " << getAddrLen());
continue;
}
memcpy(addr, result->ai_addr, result->ai_addrlen);
break;
} // of getaddrinfo results iteration
} // of getaddrinfo succeeded
freeaddrinfo(result0);
addr->sin_port = htons (port); // fix up port after getaddrinfo addr->sin_port = htons (port); // fix up port after getaddrinfo
} }
@ -152,6 +284,17 @@ IPAddress::~IPAddress()
} }
} }
bool IPAddress::lookupNonblocking(const char* host, IPAddress& addr)
{
IPAddress* cached = Resolver::instance()->lookup(host);
if (!cached) {
return false;
}
addr = *cached;
return true;
}
/* Create a string object representing an IP address. /* Create a string object representing an IP address.
This is always a string of the form 'dd.dd.dd.dd' (with variable This is always a string of the form 'dd.dd.dd.dd' (with variable
size numbers). */ size numbers). */
@ -176,6 +319,11 @@ unsigned int IPAddress::getPort() const
return ntohs(addr->sin_port); return ntohs(addr->sin_port);
} }
void IPAddress::setPort(int port)
{
addr->sin_port = htons(port);
}
unsigned int IPAddress::getFamily () const unsigned int IPAddress::getFamily () const
{ {
return addr->sin_family; return addr->sin_family;
@ -215,6 +363,11 @@ unsigned int IPAddress::getAddrLen() const
struct sockaddr* IPAddress::getAddr() const struct sockaddr* IPAddress::getAddr() const
{ {
if (addr == NULL) {
addr = (struct sockaddr_in*) malloc(sizeof(struct sockaddr_in));
memset(addr, 0, sizeof(struct sockaddr_in));
}
return (struct sockaddr*) addr; return (struct sockaddr*) addr;
} }

View File

@ -40,18 +40,22 @@ namespace simgear
*/ */
class IPAddress class IPAddress
{ {
struct sockaddr_in* addr; mutable struct sockaddr_in* addr;
public: public:
IPAddress () : addr(0) {} IPAddress () : addr(0) {}
IPAddress ( const char* host, int port ) ; IPAddress ( const char* host, int port ) ;
~IPAddress(); ~IPAddress();
static bool lookupNonblocking(const char* host, IPAddress& addr);
IPAddress( const IPAddress& other ); IPAddress( const IPAddress& other );
const IPAddress& operator=(const IPAddress& other); const IPAddress& operator=(const IPAddress& other);
void set ( const char* host, int port ) ; void set ( const char* host, int port ) ;
const char* getHost () const ; const char* getHost () const ;
unsigned int getPort() const ; unsigned int getPort() const ;
void setPort(int port);
unsigned int getIP () const ; unsigned int getIP () const ;
unsigned int getFamily () const ; unsigned int getFamily () const ;
static const char* getLocalHost () ; static const char* getLocalHost () ;
@ -69,7 +73,7 @@ public:
class Socket class Socket
{ {
int handle ; int handle ;
public: public:
Socket () ; Socket () ;

View File

@ -38,6 +38,7 @@
#include <simgear/debug/logstream.hxx> #include <simgear/debug/logstream.hxx>
namespace simgear { namespace simgear {
static NetChannel* channels = 0 ; static NetChannel* channels = 0 ;
@ -46,6 +47,7 @@ NetChannel::NetChannel ()
{ {
closed = true ; closed = true ;
connected = false ; connected = false ;
resolving_host = false;
accepting = false ; accepting = false ;
write_blocked = false ; write_blocked = false ;
should_delete = false ; should_delete = false ;
@ -83,7 +85,6 @@ NetChannel::setHandle (int handle, bool is_connected)
close () ; close () ;
Socket::setHandle ( handle ) ; Socket::setHandle ( handle ) ;
connected = is_connected ; connected = is_connected ;
//if ( connected ) this->handleConnect();
closed = false ; closed = false ;
} }
@ -107,21 +108,12 @@ NetChannel::listen ( int backlog )
} }
int int
NetChannel::connect ( const char* host, int port ) NetChannel::connect ( const char* h, int p )
{ {
int result = Socket::connect ( host, port ) ; host = h;
if (result == 0) { port = p;
connected = true ; resolving_host = true;
//this->handleConnect(); return handleResolve();
return 0;
} else if (isNonBlockingError ()) {
return 0;
} else {
// some other error condition
this->handleError (result);
close();
return -1;
}
} }
int int
@ -189,12 +181,10 @@ NetChannel::handleReadEvent (void)
if (accepting) { if (accepting) {
if (!connected) { if (!connected) {
connected = true ; connected = true ;
//this->handleConnect();
} }
this->handleAccept(); this->handleAccept();
} else if (!connected) { } else if (!connected) {
connected = true ; connected = true ;
//this->handleConnect();
this->handleRead(); this->handleRead();
} else { } else {
this->handleRead(); this->handleRead();
@ -206,12 +196,35 @@ NetChannel::handleWriteEvent (void)
{ {
if (!connected) { if (!connected) {
connected = true ; connected = true ;
//this->handleConnect();
} }
write_blocked = false ; write_blocked = false ;
this->handleWrite(); this->handleWrite();
} }
int
NetChannel::handleResolve()
{
IPAddress addr;
if (!IPAddress::lookupNonblocking(host.c_str(), addr)) {
return 0; // not looked up yet, wait longer
}
resolving_host = false;
addr.setPort(port);
int result = Socket::connect ( &addr ) ;
if (result == 0) {
connected = true ;
return 0;
} else if (isNonBlockingError ()) {
return 0;
} else {
// some other error condition
handleError (result);
close();
return -1;
}
}
bool bool
NetChannel::poll (unsigned int timeout) NetChannel::poll (unsigned int timeout)
{ {
@ -236,6 +249,12 @@ NetChannel::poll (unsigned int timeout)
} }
else if ( ! ch -> closed ) else if ( ! ch -> closed )
{ {
if (ch -> resolving_host )
{
ch -> handleResolve();
continue;
}
nopen++ ; nopen++ ;
if (ch -> readable()) { if (ch -> readable()) {
assert(nreads<MAX_SOCKETS); assert(nreads<MAX_SOCKETS);

View File

@ -54,14 +54,17 @@
#define SG_NET_CHANNEL_H #define SG_NET_CHANNEL_H
#include <simgear/io/raw_socket.hxx> #include <simgear/io/raw_socket.hxx>
#include <string>
namespace simgear namespace simgear
{ {
class NetChannel : public Socket class NetChannel : public Socket
{ {
bool closed, connected, accepting, write_blocked, should_delete ; bool closed, connected, accepting, write_blocked, should_delete, resolving_host ;
NetChannel* next_channel ; NetChannel* next_channel ;
std::string host;
int port;
friend bool netPoll (unsigned int timeout); friend bool netPoll (unsigned int timeout);
@ -96,6 +99,7 @@ public:
void handleReadEvent (void); void handleReadEvent (void);
void handleWriteEvent (void); void handleWriteEvent (void);
int handleResolve (void);
// These are meant to be overridden. // These are meant to be overridden.
virtual void handleClose (void) { virtual void handleClose (void) {