Remove non-CURL HTTP option.

This commit is contained in:
James Turner 2016-05-07 10:11:40 +01:00
parent 6e4da15a58
commit 616826ab69
8 changed files with 8 additions and 1217 deletions

View File

@ -117,7 +117,6 @@ option(ENABLE_RTI "Set to ON to build SimGear with RTI support" OFF)
option(ENABLE_TESTS "Set to OFF to disable building SimGear's test applications" ON)
option(ENABLE_SOUND "Set to OFF to disable building SimGear's sound support" ON)
option(ENABLE_PKGUTIL "Set to ON to build the sg_pkgutil application (default)" ON)
option(ENABLE_CURL "Set to ON to use libCurl as the HTTP client backend" OFF)
option(ENABLE_DNS "Set to ON to use udns library and DNS service resolver" ON)
if (MSVC)
@ -210,11 +209,7 @@ else()
endif(SIMGEAR_HEADLESS)
find_package(ZLIB REQUIRED)
if (ENABLE_CURL)
find_package(CURL REQUIRED)
message(STATUS "Curl HTTP client: ENABLED")
endif()
find_package(CURL REQUIRED)
if (SYSTEM_EXPAT)
message(STATUS "Requested to use system Expat library, forcing SIMGEAR_SHARED to true")

View File

@ -50,11 +50,6 @@ set(SOURCES
HTTPRepository.cxx
)
if (NOT ENABLE_CURL)
list(APPEND SOURCES HTTPContentDecode.cxx)
list(APPEND HEADERS HTTPContentDecode.hxx)
endif()
if(ENABLE_DNS)
list(APPEND SOURCES DNSClient.cxx)
list(APPEND HEADERS DNSClient.hxx)

View File

@ -38,11 +38,7 @@
#include <simgear/simgear_config.h>
#if defined(ENABLE_CURL)
#include <curl/multi.h>
#else
#include <simgear/io/HTTPContentDecode.hxx>
#endif
#include <curl/multi.h>
#include <simgear/io/sg_netChat.hxx>
@ -76,7 +72,6 @@ typedef std::list<Request_ptr> RequestList;
class Client::ClientPrivate
{
public:
#if defined(ENABLE_CURL)
CURLM* curlMulti;
void createCurlMulti()
@ -96,11 +91,6 @@ public:
typedef std::map<Request_ptr, CURL*> RequestCurlMap;
RequestCurlMap requests;
#else
NetChannelPoller poller;
// connections by host (potentially more than one)
ConnectionDict connections;
#endif
std::string userAgent;
std::string proxy;
@ -118,649 +108,6 @@ public:
uint64_t totalBytesDownloaded;
};
#if !defined(ENABLE_CURL)
class Connection : public NetChat
{
public:
Connection(Client* pr, const std::string& conId) :
client(pr),
state(STATE_CLOSED),
port(DEFAULT_HTTP_PORT),
_connectionId(conId),
_maxPipelineLength(255)
{
}
virtual ~Connection()
{
}
virtual void handleBufferRead (NetBuffer& buffer)
{
if( !activeRequest || !activeRequest->isComplete() )
return NetChat::handleBufferRead(buffer);
// Request should be aborted (signaled by setting its state to complete).
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
setState(STATE_GETTING_BODY);
responseComplete();
}
void setServer(const std::string& h, short p)
{
host = h;
port = p;
}
void setMaxPipelineLength(unsigned int m)
{
_maxPipelineLength = m;
}
// socket-level errors
virtual void handleError(int error)
{
const char* errStr = strerror(error);
SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
<< errStr << ")");
debugDumpRequests();
if (!activeRequest)
{
// connection level failure, eg name lookup or routing
// we won't have an active request yet, so let's fail all of the
// requests since we presume it's a systematic failure for
// the host in question
BOOST_FOREACH(Request_ptr req, sentRequests) {
req->setFailure(error, errStr);
}
BOOST_FOREACH(Request_ptr req, queuedRequests) {
req->setFailure(error, errStr);
}
sentRequests.clear();
queuedRequests.clear();
}
NetChat::handleError(error);
if (activeRequest) {
activeRequest->setFailure(error, errStr);
activeRequest = NULL;
_contentDecoder.reset();
}
setState(STATE_SOCKET_ERROR);
}
void handleTimeout()
{
handleError(ETIMEDOUT);
}
virtual void handleClose()
{
NetChat::handleClose();
// closing of the connection from the server side when getting the body,
bool canCloseState = (state == STATE_GETTING_BODY);
bool isCancelling = (state == STATE_CANCELLING);
if (canCloseState && activeRequest) {
// check bodyTransferSize matches how much we actually transferred
if (bodyTransferSize > 0) {
if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
<< "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
}
}
// force state here, so responseComplete can avoid closing the
// socket again
SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
setState(STATE_CLOSED);
responseComplete();
} else {
if (state == STATE_WAITING_FOR_RESPONSE) {
SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
<< sentRequests.front()->url());
assert(!sentRequests.empty());
sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
// no active request, but don't restore the front sent one
sentRequests.erase(sentRequests.begin());
}
if (activeRequest && !isCancelling) {
activeRequest->setFailure(500, "server closed connection");
// remove the failed request from sentRequests, so it does
// not get restored
RequestList::iterator it = std::find(sentRequests.begin(),
sentRequests.end(), activeRequest);
if (it != sentRequests.end()) {
sentRequests.erase(it);
}
activeRequest = NULL;
_contentDecoder.reset();
}
setState(STATE_CLOSED);
}
if (sentRequests.empty()) {
return;
}
// restore sent requests to the queue, so they will be re-sent
// when the connection opens again
queuedRequests.insert(queuedRequests.begin(),
sentRequests.begin(), sentRequests.end());
sentRequests.clear();
}
void queueRequest(const Request_ptr& r)
{
queuedRequests.push_back(r);
tryStartNextRequest();
}
void cancelRequest(const Request_ptr& r)
{
RequestList::iterator it = std::find(sentRequests.begin(),
sentRequests.end(), r);
if (it != sentRequests.end()) {
sentRequests.erase(it);
if ((r == activeRequest) || !activeRequest) {
// either the cancelling request is active, or we're in waiting
// for response state - close now
setState(STATE_CANCELLING);
close();
setState(STATE_CLOSED);
activeRequest = NULL;
_contentDecoder.reset();
} else if (activeRequest) {
SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
// has been sent but not active, let the active finish and
// then close. Otherwise cancelling request #2 would mess up
// active transfer #1
activeRequest->setCloseAfterComplete();
}
} // of request has been sent
// simpler case, not sent yet just remove from the queue
it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
if (it != queuedRequests.end()) {
queuedRequests.erase(it);
}
}
void beginResponse()
{
assert(!sentRequests.empty());
assert(state == STATE_WAITING_FOR_RESPONSE);
activeRequest = sentRequests.front();
try {
SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
activeRequest->responseStart(buffer);
} catch (sg_exception& e) {
handleError(EIO);
return;
}
setState(STATE_GETTING_HEADERS);
buffer.clear();
if (activeRequest->responseCode() == 204) {
noMessageBody = true;
} else if (activeRequest->method() == "HEAD") {
noMessageBody = true;
} else {
noMessageBody = false;
}
bodyTransferSize = -1;
chunkedTransfer = false;
_contentDecoder.reset();
}
void tryStartNextRequest()
{
while( !queuedRequests.empty()
&& queuedRequests.front()->isComplete() )
queuedRequests.pop_front();
if (queuedRequests.empty()) {
idleTime.stamp();
return;
}
if (sentRequests.size() >= _maxPipelineLength) {
return;
}
if (state == STATE_CLOSED) {
if (!connectToHost()) {
setState(STATE_SOCKET_ERROR);
return;
}
SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
setTerminator("\r\n");
setState(STATE_IDLE);
}
Request_ptr r = queuedRequests.front();
r->requestStart();
std::stringstream headerData;
std::string path = r->path();
assert(!path.empty());
std::string query = r->query();
std::string bodyData;
if (!client->proxyHost().empty()) {
path = r->scheme() + "://" + r->host() + r->path();
}
if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
headerData << r->method() << " " << path << " HTTP/1.1\r\n";
bodyData = query.substr(1); // URL-encode, drop the leading '?'
headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
headerData << "Content-Length:" << bodyData.size() << "\r\n";
} else {
headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
if( r->hasBodyData() )
{
headerData << "Content-Length:" << r->bodyLength() << "\r\n";
headerData << "Content-Type:" << r->bodyType() << "\r\n";
}
}
headerData << "Host: " << r->hostAndPort() << "\r\n";
headerData << "User-Agent:" << client->userAgent() << "\r\n";
headerData << "Accept-Encoding: deflate, gzip\r\n";
if (!client->proxyAuth().empty()) {
headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
}
BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
headerData << h.first << ": " << h.second << "\r\n";
}
headerData << "\r\n"; // final CRLF to terminate the headers
if (!bodyData.empty()) {
headerData << bodyData;
}
bool ok = push(headerData.str().c_str());
if (!ok) {
SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
// we've over-stuffed the socket, give up for now, let things
// drain down before trying to start any more requests.
return;
}
if( r->hasBodyData() )
for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
{
char buf[4096];
size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
if( len )
{
if( !bufferSend(buf, len) )
{
SG_LOG(SG_IO,
SG_WARN,
"overflow the HTTP::Connection output buffer");
state = STATE_SOCKET_ERROR;
return;
}
body_bytes_sent += len;
}
else
{
SG_LOG(SG_IO,
SG_WARN,
"HTTP asynchronous request body generation is unsupported");
break;
}
}
SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
// successfully sent, remove from queue, and maybe send the next
queuedRequests.pop_front();
sentRequests.push_back(r);
if (state == STATE_IDLE) {
setState(STATE_WAITING_FOR_RESPONSE);
}
// pipelining, let's maybe send the next request right away
tryStartNextRequest();
}
virtual void collectIncomingData(const char* s, int n)
{
idleTime.stamp();
client->receivedBytes(static_cast<unsigned int>(n));
if( (state == STATE_GETTING_BODY)
|| (state == STATE_GETTING_CHUNKED_BYTES) )
_contentDecoder.receivedBytes(s, n);
else
buffer.append(s, n);
}
virtual void foundTerminator(void)
{
idleTime.stamp();
switch (state) {
case STATE_WAITING_FOR_RESPONSE:
beginResponse();
break;
case STATE_GETTING_HEADERS:
processHeader();
buffer.clear();
break;
case STATE_GETTING_BODY:
responseComplete();
break;
case STATE_GETTING_CHUNKED:
processChunkHeader();
break;
case STATE_GETTING_CHUNKED_BYTES:
setTerminator("\r\n");
setState(STATE_GETTING_CHUNKED);
buffer.clear();
break;
case STATE_GETTING_TRAILER:
processTrailer();
buffer.clear();
break;
case STATE_IDLE:
SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
default:
break;
}
}
bool hasIdleTimeout() const
{
if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
return false;
}
assert(sentRequests.empty());
bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
return isTimedOut;
}
bool hasErrorTimeout() const
{
if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
return false;
}
bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
return isTimedOut;
}
bool hasError() const
{
return (state == STATE_SOCKET_ERROR);
}
bool shouldStartNext() const
{
return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
}
bool isActive() const
{
return !queuedRequests.empty() || !sentRequests.empty();
}
std::string connectionId() const
{
return _connectionId;
}
void debugDumpRequests() const
{
SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
<< "; state=" << state << ")");
if (activeRequest) {
SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
} else {
SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
}
BOOST_FOREACH(Request_ptr req, sentRequests) {
SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
}
BOOST_FOREACH(Request_ptr req, queuedRequests) {
SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
}
}
private:
enum ConnectionState {
STATE_IDLE = 0,
STATE_WAITING_FOR_RESPONSE,
STATE_GETTING_HEADERS,
STATE_GETTING_BODY,
STATE_GETTING_CHUNKED,
STATE_GETTING_CHUNKED_BYTES,
STATE_GETTING_TRAILER,
STATE_SOCKET_ERROR,
STATE_CANCELLING, ///< cancelling an acitve request
STATE_CLOSED ///< connection should be closed now
};
void setState(ConnectionState newState)
{
if (state == newState) {
return;
}
state = newState;
}
bool connectToHost()
{
SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
if (!open()) {
SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
return false;
}
if (connect(host.c_str(), port) != 0) {
SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
return false;
}
return true;
}
void processHeader()
{
std::string h = strutils::simplify(buffer);
if (h.empty()) { // blank line terminates headers
headersComplete();
return;
}
int colonPos = buffer.find(':');
if (colonPos < 0) {
SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
return;
}
std::string key = strutils::simplify(buffer.substr(0, colonPos));
std::string lkey = boost::to_lower_copy(key);
std::string value = strutils::strip(buffer.substr(colonPos + 1));
// only consider these if getting headers (as opposed to trailers
// of a chunked transfer)
if (state == STATE_GETTING_HEADERS) {
if (lkey == "content-length") {
int sz = strutils::to_int(value);
if (bodyTransferSize <= 0) {
bodyTransferSize = sz;
}
activeRequest->setResponseLength(sz);
} else if (lkey == "transfer-length") {
bodyTransferSize = strutils::to_int(value);
} else if (lkey == "transfer-encoding") {
processTransferEncoding(value);
} else if (lkey == "content-encoding") {
_contentDecoder.setEncoding(value);
}
}
activeRequest->responseHeader(lkey, value);
}
void processTransferEncoding(const std::string& te)
{
if (te == "chunked") {
chunkedTransfer = true;
} else {
SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
// failure
}
}
void processChunkHeader()
{
if (buffer.empty()) {
// blank line after chunk data
return;
}
int chunkSize = 0;
int semiPos = buffer.find(';');
if (semiPos >= 0) {
// extensions ignored for the moment
chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
} else {
chunkSize = strutils::to_int(buffer, 16);
}
buffer.clear();
if (chunkSize == 0) { // trailer start
setState(STATE_GETTING_TRAILER);
return;
}
setState(STATE_GETTING_CHUNKED_BYTES);
setByteCount(chunkSize);
}
void processTrailer()
{
if (buffer.empty()) {
// end of trailers
responseComplete();
return;
}
// process as a normal header
processHeader();
}
void headersComplete()
{
activeRequest->responseHeadersComplete();
_contentDecoder.initWithRequest(activeRequest);
if (!activeRequest->serverSupportsPipelining()) {
SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
_maxPipelineLength = 1;
}
if (chunkedTransfer) {
setState(STATE_GETTING_CHUNKED);
} else if (noMessageBody || (bodyTransferSize == 0)) {
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
setState(STATE_GETTING_BODY);
responseComplete();
} else {
setByteCount(bodyTransferSize); // may be -1, that's fine
setState(STATE_GETTING_BODY);
}
}
void responseComplete()
{
Request_ptr completedRequest = activeRequest;
_contentDecoder.finish();
assert(sentRequests.front() == activeRequest);
sentRequests.pop_front();
bool doClose = activeRequest->closeAfterComplete();
activeRequest = NULL;
if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
if (doClose) {
SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
// this will bring us into handleClose() above, which updates
// state to STATE_CLOSED
close();
// if we have additional requests waiting, try to start them now
tryStartNextRequest();
}
}
if (state != STATE_CLOSED) {
setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
}
// notify request after we change state, so this connection is idle
// if completion triggers other requests (which is likely)
completedRequest->responseComplete();
client->requestFinished(this);
setTerminator("\r\n");
}
Client* client;
Request_ptr activeRequest;
ConnectionState state;
std::string host;
short port;
std::string buffer;
int bodyTransferSize;
SGTimeStamp idleTime;
bool chunkedTransfer;
bool noMessageBody;
RequestList queuedRequests;
RequestList sentRequests;
ContentDecoder _contentDecoder;
std::string _connectionId;
unsigned int _maxPipelineLength;
};
#endif // of !ENABLE_CURL
Client::Client() :
d(new ClientPrivate)
{
@ -773,7 +120,7 @@ Client::Client() :
d->totalBytesDownloaded = 0;
d->maxPipelineDepth = 5;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
#if defined(ENABLE_CURL)
static bool didInitCurlGlobal = false;
if (!didInitCurlGlobal) {
curl_global_init(CURL_GLOBAL_ALL);
@ -781,48 +128,33 @@ Client::Client() :
}
d->createCurlMulti();
#endif
}
Client::~Client()
{
#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
#endif
}
void Client::setMaxConnections(unsigned int maxCon)
{
d->maxConnections = maxCon;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
#endif
}
void Client::setMaxHostConnections(unsigned int maxHostCon)
{
d->maxHostConnections = maxHostCon;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
#endif
}
void Client::setMaxPipelineDepth(unsigned int depth)
{
d->maxPipelineDepth = depth;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
#else
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ) {
it->second->setMaxPipelineLength(depth);
}
#endif
}
void Client::update(int waitTimeout)
{
#if defined(ENABLE_CURL)
int remainingActive, messagesInQueue;
curl_multi_perform(d->curlMulti, &remainingActive);
@ -865,53 +197,6 @@ void Client::update(int waitTimeout)
}
} // of curl message processing loop
SGTimeStamp::sleepForMSec(waitTimeout);
#else
if (!d->poller.hasChannels() && (waitTimeout > 0)) {
SGTimeStamp::sleepForMSec(waitTimeout);
} else {
d->poller.poll(waitTimeout);
}
bool waitingRequests = !d->pendingRequests.empty();
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ) {
Connection* con = it->second;
if (con->hasIdleTimeout() ||
con->hasError() ||
con->hasErrorTimeout() ||
(!con->isActive() && waitingRequests))
{
if (con->hasErrorTimeout()) {
// tell the connection we're timing it out
con->handleTimeout();
}
// connection has been idle for a while, clean it up
// (or if we have requests waiting for a different host,
// or an error condition
ConnectionDict::iterator del = it++;
delete del->second;
d->connections.erase(del);
} else {
if (it->second->shouldStartNext()) {
it->second->tryStartNextRequest();
}
++it;
}
} // of connection iteration
if (waitingRequests && (d->connections.size() < d->maxConnections)) {
RequestList waiting(d->pendingRequests);
d->pendingRequests.clear();
// re-submit all waiting requests in order; this takes care of
// finding multiple pending items targetted to the same (new)
// connection
BOOST_FOREACH(Request_ptr req, waiting) {
makeRequest(req);
}
}
#endif
}
void Client::makeRequest(const Request_ptr& r)
@ -926,7 +211,6 @@ void Client::makeRequest(const Request_ptr& r)
r->_client = this;
#if defined(ENABLE_CURL)
ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
assert(rit == d->requests.end());
@ -1004,89 +288,10 @@ void Client::makeRequest(const Request_ptr& r)
// this seems premature, but we don't have a callback from Curl we could
// use to trigger when the requst is actually sent.
r->requestStart();
#else
if( r->url().find("http://") != 0 ) {
r->setFailure(EINVAL, "only HTTP protocol is supported");
return;
}
std::string host = r->host();
int port = r->port();
if (!d->proxy.empty()) {
host = d->proxy;
port = d->proxyPort;
}
Connection* con = NULL;
std::stringstream ss;
ss << host << "-" << port;
std::string connectionId = ss.str();
bool havePending = !d->pendingRequests.empty();
bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
ConnectionDict::iterator consEnd = d->connections.end();
// assign request to an existing Connection.
// various options exist here, examined in order
ConnectionDict::iterator it = d->connections.find(connectionId);
if (atConnectionsLimit && (it == consEnd)) {
// maximum number of connections active, queue this request
// when a connection goes inactive, we'll start this one
d->pendingRequests.push_back(r);
return;
}
// scan for an idle Connection to the same host (likely if we're
// retrieving multiple resources from the same host in quick succession)
// if we have pending requests (waiting for a free Connection), then
// force new requests on this id to always use the first Connection
// (instead of the random selection below). This ensures that when
// there's pressure on the number of connections to keep alive, one
// host can't DoS every other.
int count = 0;
for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
if (havePending || !it->second->isActive()) {
con = it->second;
break;
}
}
bool atHostConnectionsLimit = (count >= d->maxHostConnections);
if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
// all current connections are busy (active), and we don't
// have free connections to allocate, so let's assign to
// an existing one randomly. Ideally we'd used whichever one will
// complete first but we don't have that info.
int index = rand() % count;
for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
con = it->second;
}
// allocate a new connection object
if (!con) {
static int connectionSuffx = 0;
std::stringstream ss;
ss << connectionId << "-" << connectionSuffx++;
SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
con = new Connection(this, ss.str());
con->setServer(host, port);
con->setMaxPipelineLength(d->maxPipelineDepth);
d->poller.addChannel(con);
d->connections.insert(d->connections.end(),
ConnectionDict::value_type(connectionId, con));
}
SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
con->queueRequest(r);
#endif
}
void Client::cancelRequest(const Request_ptr &r, std::string reason)
{
#if defined(ENABLE_CURL)
ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
if(it == d->requests.end()) {
// already being removed, presumably inside ::update()
@ -1102,12 +307,7 @@ void Client::cancelRequest(const Request_ptr &r, std::string reason)
curl_easy_cleanup(it->second);
d->requests.erase(it);
#else
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ++it) {
(it->second)->cancelRequest(r);
}
#endif
r->setFailure(-1, reason);
}
@ -1164,16 +364,7 @@ void Client::setProxy( const std::string& proxy,
bool Client::hasActiveRequests() const
{
#if defined(ENABLE_CURL)
return !d->requests.empty();
#else
ConnectionDict::const_iterator it = d->connections.begin();
for (; it != d->connections.end(); ++it) {
if (it->second->isActive()) return true;
}
return false;
#endif
}
void Client::receivedBytes(unsigned int count)
@ -1277,35 +468,18 @@ size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems
void Client::debugDumpRequests()
{
#if defined(ENABLE_CURL)
SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
for (; it != d->requests.end(); ++it) {
SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
}
SG_LOG(SG_IO, SG_INFO, "==");
#else
SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ++it) {
it->second->debugDumpRequests();
}
SG_LOG(SG_IO, SG_INFO, "==");
#endif
}
void Client::clearAllConnections()
{
#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
d->createCurlMulti();
#else
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ++it) {
delete it->second;
}
d->connections.clear();
#endif
}
} // of namespace HTTP

View File

@ -1,271 +0,0 @@
// Written by James Turner
//
// Copyright (C) 2013 James Turner <zakalawe@mac.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
#include "HTTPContentDecode.hxx"
#include <cassert>
#include <cstdlib> // rand()
#include <cstring> // for memset, memcpy
#include <simgear/debug/logstream.hxx>
#include <simgear/structure/exception.hxx>
#include <simgear/io/lowlevel.hxx> // for sgEndian stuff
namespace simgear
{
namespace HTTP
{
const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
// see http://www.ietf.org/rfc/rfc1952.txt for these values and
// detailed description of the logic
const int GZIP_HEADER_ID1 = 31;
const int GZIP_HEADER_ID2 = 139;
const int GZIP_HEADER_METHOD_DEFLATE = 8;
const unsigned int GZIP_HEADER_SIZE = 10;
const int GZIP_HEADER_FEXTRA = 1 << 2;
const int GZIP_HEADER_FNAME = 1 << 3;
const int GZIP_HEADER_COMMENT = 1 << 4;
const int GZIP_HEADER_CRC = 1 << 1;
ContentDecoder::ContentDecoder() :
_output(NULL),
_zlib(NULL),
_input(NULL),
_inputAllocated(0)
{
reset();
}
ContentDecoder::~ContentDecoder()
{
free(_output);
free(_input);
free(_zlib);
}
void ContentDecoder::setEncoding(const std::string& encoding)
{
if (encoding == "gzip") {
_contentDeflate = true;
_needGZipHeader = true;
} else if (encoding == "deflate") {
_contentDeflate = true;
_needGZipHeader = false;
} else if (encoding != "identity") {
SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << encoding);
}
}
void ContentDecoder::reset()
{
_request = NULL;
_contentDeflate = false;
_needGZipHeader = false;
_inputSize = 0;
_totalReceivedBytes = 0;
}
void ContentDecoder::initWithRequest(Request_ptr req)
{
_request = req;
if (!_contentDeflate) {
return;
}
if (!_zlib) {
_zlib = (z_stream*) malloc(sizeof(z_stream));
}
memset(_zlib, 0, sizeof(z_stream));
if (!_output) {
_output = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
}
_inputSize = 0;
// NULLs means we'll get default alloc+free methods
// which is absolutely fine
_zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
_zlib->next_out = _output;
if (inflateInit2(_zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
}
}
void ContentDecoder::finish()
{
if (_contentDeflate) {
runDecoder();
inflateEnd(_zlib);
}
}
void ContentDecoder::receivedBytes(const char* n, size_t s)
{
_totalReceivedBytes += s;
if (!_contentDeflate) {
_request->processBodyBytes(n, s);
return;
}
// allocate more space if needed (this will only happen rarely once the
// buffer has hit something proportionate to the server's compression
// window size)
size_t requiredSize = _inputSize + s;
if (requiredSize > _inputAllocated) {
reallocateInputBuffer(requiredSize);
}
// copy newly recieved bytes into the buffer
memcpy(_input + _inputSize, n, s);
_inputSize += s;
if (_needGZipHeader && !consumeGZipHeader()) {
// still waiting on the full GZIP header, so done
return;
}
runDecoder();
}
void ContentDecoder::consumeBytes(size_t consumed)
{
assert(_inputSize >= consumed);
// move existing (consumed) bytes down
if (consumed > 0) {
size_t newSize = _inputSize - consumed;
memmove(_input, _input + consumed, newSize);
_inputSize = newSize;
}
}
void ContentDecoder::reallocateInputBuffer(size_t newSize)
{
_input = (unsigned char*) realloc(_input, newSize);
_inputAllocated = newSize;
}
void ContentDecoder::runDecoder()
{
_zlib->next_in = (unsigned char*) _input;
_zlib->avail_in = _inputSize;
int writtenSize;
// loop, running zlib() inflate and sending output bytes to
// our request body handler. Keep calling inflate until no bytes are
// written, and ZLIB has consumed all available input
do {
_zlib->next_out = _output;
_zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
int result = inflate(_zlib, Z_NO_FLUSH);
if (result == Z_OK || result == Z_STREAM_END) {
// nothing to do
} else if (result == Z_BUF_ERROR) {
// transient error, fall through
} else {
// _error = result;
return;
}
writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - _zlib->avail_out;
if (writtenSize > 0) {
_request->processBodyBytes((char*) _output, writtenSize);
}
if (result == Z_STREAM_END) {
break;
}
} while ((_zlib->avail_in > 0) || (writtenSize > 0));
// update input buffers based on what we consumed
consumeBytes(_inputSize - _zlib->avail_in);
}
bool ContentDecoder::consumeGZipHeader()
{
size_t avail = _inputSize;
if (avail < GZIP_HEADER_SIZE) {
return false; // need more header bytes
}
if ((_input[0] != GZIP_HEADER_ID1) ||
(_input[1] != GZIP_HEADER_ID2) ||
(_input[2] != GZIP_HEADER_METHOD_DEFLATE))
{
return false; // invalid GZip header
}
char flags = _input[3];
unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
if (flags & GZIP_HEADER_FEXTRA) {
gzipHeaderSize += 2;
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(_input + GZIP_HEADER_FEXTRA));
if ( sgIsBigEndian() ) {
sgEndianSwap( &extraHeaderBytes );
}
gzipHeaderSize += extraHeaderBytes;
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
}
#if 0
if (flags & GZIP_HEADER_FNAME) {
gzipHeaderSize++;
while (gzipHeaderSize <= avail) {
if (_input[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
if (flags & GZIP_HEADER_COMMENT) {
gzipHeaderSize++;
while (gzipHeaderSize <= avail) {
if (_input[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
#endif
if (flags & GZIP_HEADER_CRC) {
gzipHeaderSize += 2;
}
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
consumeBytes(gzipHeaderSize);
_needGZipHeader = false;
return true;
}
} // of namespace HTTP
} // of namespace simgear

View File

@ -1,75 +0,0 @@
// Written by James Turner
//
// Copyright (C) 2013 James Turner <zakalawe@mac.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
#ifndef SG_HTTP_CONTENT_DECODER_HXX
#define SG_HTTP_CONTENT_DECODER_HXX
#include <string>
#include <zlib.h>
#include <simgear/io/HTTPRequest.hxx>
namespace simgear
{
namespace HTTP
{
class ContentDecoder
{
public:
ContentDecoder();
~ContentDecoder();
void reset();
void initWithRequest(Request_ptr req);
void finish();
void setEncoding(const std::string& encoding);
void receivedBytes(const char* n, size_t s);
size_t getTotalReceivedBytes() const
{ return _totalReceivedBytes; }
private:
bool consumeGZipHeader();
void runDecoder();
void consumeBytes(size_t consumed);
void reallocateInputBuffer(size_t newSize);
Request_ptr _request;
unsigned char* _output;
z_stream* _zlib;
unsigned char* _input;
size_t _inputAllocated, _inputSize;
bool _contentDeflate, _needGZipHeader;
size_t _totalReceivedBytes;
};
} // of namespace HTTP
} // of namespace simgear
#endif // of SG_HTTP_CONTENT_DECODER_HXX

View File

@ -18,9 +18,7 @@
#include <simgear/timing/timestamp.hxx>
#include <simgear/debug/logstream.hxx>
#if defined(ENABLE_CURL)
#include <curl/multi.h>
#endif
using std::cout;
using std::cerr;
@ -535,11 +533,7 @@ int main(int argc, char* argv[])
#if defined(ENABLE_CURL)
const int HOST_NOT_FOUND_CODE = CURLE_COULDNT_RESOLVE_HOST;
#else
const int HOST_NOT_FOUND_CODE = ENOENT;
#endif
COMPARE(tr->responseCode(), HOST_NOT_FOUND_CODE);
}
@ -551,11 +545,7 @@ int main(int argc, char* argv[])
cl.makeRequest(tr);
waitForFailed(&cl, tr);
#if defined(ENABLE_CURL)
const int SERVER_NO_DATA_CODE = CURLE_GOT_NOTHING;
#else
const int SERVER_NO_DATA_CODE = 500;
#endif
COMPARE(tr->responseCode(), SERVER_NO_DATA_CODE);
}
@ -572,7 +562,6 @@ cout << "testing proxy close" << endl;
COMPARE(tr->bodyData, string(body2, body2Size));
}
#if defined(ENABLE_CURL)
{
cl.setProxy("localhost", 2000, "johndoe:swordfish");
TestRequest* tr = new TestRequest("http://www.google.com/test3");
@ -583,7 +572,6 @@ cout << "testing proxy close" << endl;
COMPARE(tr->responseBytesReceived(), body2Size);
COMPARE(tr->bodyData, string(body2, body2Size));
}
#endif
// pipelining
cout << "testing HTTP 1.1 pipelining" << endl;

View File

@ -18,9 +18,7 @@
#include <simgear/timing/timestamp.hxx>
#include <simgear/debug/logstream.hxx>
#if defined(ENABLE_CURL)
#include <curl/multi.h>
#endif
using std::cout;
using std::cerr;
@ -533,13 +531,7 @@ int main(int argc, char* argv[])
cl.makeRequest(tr);
waitForFailed(&cl, tr);
#if defined(ENABLE_CURL)
const int HOST_NOT_FOUND_CODE = CURLE_COULDNT_RESOLVE_HOST;
#else
const int HOST_NOT_FOUND_CODE = ENOENT;
#endif
COMPARE(tr->responseCode(), HOST_NOT_FOUND_CODE);
}
@ -551,11 +543,7 @@ int main(int argc, char* argv[])
cl.makeRequest(tr);
waitForFailed(&cl, tr);
#if defined(ENABLE_CURL)
const int SERVER_NO_DATA_CODE = CURLE_GOT_NOTHING;
#else
const int SERVER_NO_DATA_CODE = 500;
#endif
COMPARE(tr->responseCode(), SERVER_NO_DATA_CODE);
}
@ -572,7 +560,6 @@ cout << "testing proxy close" << endl;
COMPARE(tr->bodyData, string(body2, body2Size));
}
#if defined(ENABLE_CURL)
{
cl.setProxy("localhost", 2000, "johndoe:swordfish");
TestRequest* tr = new TestRequest("http://www.google.com/test3");
@ -583,7 +570,6 @@ cout << "testing proxy close" << endl;
COMPARE(tr->responseBytesReceived(), body2Size);
COMPARE(tr->bodyData, string(body2, body2Size));
}
#endif
// pipelining
cout << "testing HTTP 1.1 pipelining" << endl;

View File

@ -18,4 +18,3 @@
#cmakedefine SYSTEM_EXPAT
#cmakedefine ENABLE_SOUND
#cmakedefine ENABLE_CURL