Refactor HTTP content-encoding support.

Move content-encoding handler into its own file, which 
simplifies the main code. As part of this, fix a bug where we
didn't flush the ZLIB buffers on response completion.
This commit is contained in:
James Turner 2013-10-05 23:26:35 +01:00
parent d658b5fc38
commit 2f023803e7
5 changed files with 374 additions and 195 deletions

View File

@ -16,6 +16,7 @@ set(HEADERS
sg_socket_udp.hxx
HTTPClient.hxx
HTTPRequest.hxx
HTTPContentDecode.hxx
DAVMultiStatus.hxx
SVNRepository.hxx
SVNDirectory.hxx
@ -36,6 +37,7 @@ set(SOURCES
sg_socket_udp.cxx
HTTPClient.cxx
HTTPRequest.cxx
HTTPContentDecode.cxx
DAVMultiStatus.cxx
SVNRepository.cxx
SVNDirectory.cxx

View File

@ -34,10 +34,8 @@
#include <boost/foreach.hpp>
#include <boost/algorithm/string/case_conv.hpp>
#include <zlib.h>
#include <simgear/io/sg_netChat.hxx>
#include <simgear/io/lowlevel.hxx>
#include <simgear/io/HTTPContentDecode.hxx>
#include <simgear/misc/strutils.hxx>
#include <simgear/compiler.h>
#include <simgear/debug/logstream.hxx>
@ -65,19 +63,6 @@ namespace HTTP
extern const int DEFAULT_HTTP_PORT = 80;
const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
const unsigned int MAX_INFLIGHT_REQUESTS = 32;
const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
// see http://www.ietf.org/rfc/rfc1952.txt for these values and
// detailed description of the logic
const int GZIP_HEADER_ID1 = 31;
const int GZIP_HEADER_ID2 = 139;
const int GZIP_HEADER_METHOD_DEFLATE = 8;
const unsigned int GZIP_HEADER_SIZE = 10;
const int GZIP_HEADER_FEXTRA = 1 << 2;
const int GZIP_HEADER_FNAME = 1 << 3;
const int GZIP_HEADER_COMMENT = 1 << 4;
const int GZIP_HEADER_CRC = 1 << 1;
class Connection;
typedef std::multimap<std::string, Connection*> ConnectionDict;
@ -109,23 +94,12 @@ public:
Connection(Client* pr) :
client(pr),
state(STATE_CLOSED),
port(DEFAULT_HTTP_PORT),
zlibInflateBuffer(NULL),
zlibInflateBufferSize(0),
zlibOutputBuffer(NULL)
port(DEFAULT_HTTP_PORT)
{
}
virtual ~Connection()
{
if (zlibInflateBuffer) {
free(zlibInflateBuffer);
}
if (zlibOutputBuffer) {
free(zlibOutputBuffer);
}
}
void setServer(const string& h, short p)
@ -159,6 +133,7 @@ public:
SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
activeRequest->setFailure(error, "socket error");
activeRequest = NULL;
_contentDecoder.reset();
}
state = STATE_SOCKET_ERROR;
@ -186,6 +161,7 @@ public:
sentRequests.erase(it);
}
activeRequest = NULL;
_contentDecoder.reset();
}
state = STATE_CLOSED;
@ -209,6 +185,7 @@ public:
SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
activeRequest->setFailure(ETIMEDOUT, "socket timeout");
activeRequest = NULL;
_contentDecoder.reset();
}
state = STATE_SOCKET_ERROR;
@ -240,7 +217,7 @@ public:
bodyTransferSize = -1;
chunkedTransfer = false;
contentGZip = contentDeflate = false;
_contentDecoder.reset();
}
void tryStartNextRequest()
@ -349,132 +326,12 @@ public:
client->receivedBytes(static_cast<unsigned int>(n));
if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
if (contentGZip || contentDeflate) {
expandCompressedData(s, n);
} else {
activeRequest->processBodyBytes(s, n);
}
_contentDecoder.receivedBytes(s, n);
} else {
buffer += string(s, n);
}
}
void expandCompressedData(const char* s, int n)
{
int reqSize = n + zlib.avail_in;
if (reqSize > zlibInflateBufferSize) {
// reallocate
unsigned char* newBuf = (unsigned char*) malloc(reqSize);
memcpy(newBuf, zlib.next_in, zlib.avail_in);
memcpy(newBuf + zlib.avail_in, s, n);
free(zlibInflateBuffer);
zlibInflateBuffer = newBuf;
zlibInflateBufferSize = reqSize;
} else {
// important to use memmove here, since it's very likely
// the source and destination ranges overlap
memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
}
zlib.next_in = (unsigned char*) zlibInflateBuffer;
zlib.avail_in = reqSize;
zlib.next_out = zlibOutputBuffer;
zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
if (contentGZip && !handleGZipHeader()) {
return;
}
int writtenSize = 0;
do {
int result = inflate(&zlib, Z_NO_FLUSH);
if (result == Z_OK || result == Z_STREAM_END) {
// nothing to do
} else {
SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
return;
}
writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
if (result == Z_STREAM_END) {
break;
}
} while ((writtenSize == 0) && (zlib.avail_in > 0));
if (writtenSize > 0) {
activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
}
}
bool handleGZipHeader()
{
// we clear this down to contentDeflate once the GZip header has been seen
if (zlib.avail_in < GZIP_HEADER_SIZE) {
return false; // need more header bytes
}
if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
(zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
(zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
{
return false; // invalid GZip header
}
char flags = zlibInflateBuffer[3];
unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
if (flags & GZIP_HEADER_FEXTRA) {
gzipHeaderSize += 2;
if (zlib.avail_in < gzipHeaderSize) {
return false; // need more header bytes
}
unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
if ( sgIsBigEndian() ) {
sgEndianSwap( &extraHeaderBytes );
}
gzipHeaderSize += extraHeaderBytes;
if (zlib.avail_in < gzipHeaderSize) {
return false; // need more header bytes
}
}
if (flags & GZIP_HEADER_FNAME) {
gzipHeaderSize++;
while (gzipHeaderSize <= zlib.avail_in) {
if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
if (flags & GZIP_HEADER_COMMENT) {
gzipHeaderSize++;
while (gzipHeaderSize <= zlib.avail_in) {
if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
if (flags & GZIP_HEADER_CRC) {
gzipHeaderSize += 2;
}
if (zlib.avail_in < gzipHeaderSize) {
return false; // need more header bytes
}
zlib.next_in += gzipHeaderSize;
zlib.avail_in -= gzipHeaderSize;
// now we've processed the GZip header, can decode as deflate
contentGZip = false;
contentDeflate = true;
return true;
}
virtual void foundTerminator(void)
{
idleTime.stamp();
@ -572,34 +429,6 @@ private:
string h = strutils::simplify(buffer);
if (h.empty()) { // blank line terminates headers
headersComplete();
if (contentGZip || contentDeflate) {
memset(&zlib, 0, sizeof(z_stream));
if (!zlibOutputBuffer) {
zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
}
// NULLs means we'll get default alloc+free methods
// which is absolutely fine
zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
zlib.next_out = zlibOutputBuffer;
if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
}
}
if (chunkedTransfer) {
state = STATE_GETTING_CHUNKED;
} else if (noMessageBody || (bodyTransferSize == 0)) {
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
state = STATE_GETTING_BODY;
responseComplete();
} else {
setByteCount(bodyTransferSize); // may be -1, that's fine
state = STATE_GETTING_BODY;
}
return;
}
@ -628,13 +457,7 @@ private:
} else if (lkey == "transfer-encoding") {
processTransferEncoding(value);
} else if (lkey == "content-encoding") {
if (value == "gzip") {
contentGZip = true;
} else if (value == "deflate") {
contentDeflate = true;
} else if (value != "identity") {
SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
}
_contentDecoder.setEncoding(value);
}
}
@ -692,14 +515,25 @@ private:
void headersComplete()
{
activeRequest->responseHeadersComplete();
_contentDecoder.initWithRequest(activeRequest);
if (chunkedTransfer) {
state = STATE_GETTING_CHUNKED;
} else if (noMessageBody || (bodyTransferSize == 0)) {
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
state = STATE_GETTING_BODY;
responseComplete();
} else {
setByteCount(bodyTransferSize); // may be -1, that's fine
state = STATE_GETTING_BODY;
}
}
void responseComplete()
{
Request_ptr completedRequest = activeRequest;
if (contentDeflate) {
inflateEnd(&zlib);
}
_contentDecoder.finish();
assert(sentRequests.front() == activeRequest);
sentRequests.pop_front();
@ -753,15 +587,11 @@ private:
bool chunkedTransfer;
bool noMessageBody;
int requestBodyBytesToSend;
z_stream zlib;
unsigned char* zlibInflateBuffer;
int zlibInflateBufferSize;
unsigned char* zlibOutputBuffer;
bool contentGZip, contentDeflate;
RequestList queuedRequests;
RequestList sentRequests;
ContentDecoder _contentDecoder;
};
Client::Client() :

View File

@ -0,0 +1,274 @@
// Written by James Turner
//
// Copyright (C) 2013 James Turner <zakalawe@mac.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
#include "HTTPContentDecode.hxx"
#include <cassert>
#include <cstdlib> // rand()
#include <iostream>
#include <simgear/debug/logstream.hxx>
#include <simgear/structure/exception.hxx>
#include <simgear/io/lowlevel.hxx> // for sgEndian stuff
namespace simgear
{
namespace HTTP
{
const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
// see http://www.ietf.org/rfc/rfc1952.txt for these values and
// detailed description of the logic
const int GZIP_HEADER_ID1 = 31;
const int GZIP_HEADER_ID2 = 139;
const int GZIP_HEADER_METHOD_DEFLATE = 8;
const unsigned int GZIP_HEADER_SIZE = 10;
const int GZIP_HEADER_FEXTRA = 1 << 2;
const int GZIP_HEADER_FNAME = 1 << 3;
const int GZIP_HEADER_COMMENT = 1 << 4;
const int GZIP_HEADER_CRC = 1 << 1;
ContentDecoder::ContentDecoder() :
_output(NULL),
_zlib(NULL),
_input(NULL),
_inputAllocated(0),
_inputSize(0)
{
}
ContentDecoder::~ContentDecoder()
{
free(_output);
free(_input);
free(_zlib);
}
void ContentDecoder::setEncoding(const std::string& encoding)
{
std::cout << "setEncoding:" << encoding << std::endl;
if (encoding == "gzip") {
_contentDeflate = true;
_needGZipHeader = true;
} else if (encoding == "deflate") {
_contentDeflate = true;
_needGZipHeader = false;
} else if (encoding != "identity") {
SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << encoding);
}
}
void ContentDecoder::reset()
{
_request = NULL;
_contentDeflate = false;
_needGZipHeader = false;
_inputSize = 0;
}
void ContentDecoder::initWithRequest(Request_ptr req)
{
_request = req;
if (!_contentDeflate) {
return;
}
if (!_zlib) {
_zlib = (z_stream*) malloc(sizeof(z_stream));
}
memset(_zlib, 0, sizeof(z_stream));
if (!_output) {
_output = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
}
_inputSize = 0;
// NULLs means we'll get default alloc+free methods
// which is absolutely fine
_zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
_zlib->next_out = _output;
if (inflateInit2(_zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
}
}
void ContentDecoder::finish()
{
if (_contentDeflate) {
runDecoder();
inflateEnd(_zlib);
}
}
void ContentDecoder::receivedBytes(const char* n, size_t s)
{
if (!_contentDeflate) {
_request->processBodyBytes(n, s);
return;
}
// allocate more space if needed (this will only happen rarely once the
// buffer has hit something proportionate to the server's compression
// window size)
size_t requiredSize = _inputSize + s;
if (requiredSize > _inputAllocated) {
reallocateInputBuffer(requiredSize);
}
// copy newly recieved bytes into the buffer
memcpy(_input + _inputSize, n, s);
_inputSize += s;
if (_needGZipHeader && !consumeGZipHeader()) {
std::cout << "waiting on GZIP header" << std::endl;
// still waiting on the full GZIP header, so done
return;
}
runDecoder();
}
void ContentDecoder::consumeBytes(size_t consumed)
{
assert(_inputSize >= consumed);
// move existing (consumed) bytes down
if (consumed > 0) {
size_t newSize = _inputSize - consumed;
memmove(_input, _input + consumed, newSize);
_inputSize = newSize;
}
}
void ContentDecoder::reallocateInputBuffer(size_t newSize)
{
std::cout << "reallocate:" << newSize << std::endl;
_input = (unsigned char*) realloc(_input, newSize);
_inputAllocated = newSize;
}
void ContentDecoder::runDecoder()
{
_zlib->next_in = (unsigned char*) _input;
_zlib->avail_in = _inputSize;
int writtenSize;
// loop, running zlib() inflate and sending output bytes to
// our request body handler. Keep calling inflate until no bytes are
// written, and ZLIB has consumed all available input
do {
_zlib->next_out = _output;
_zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
int result = inflate(_zlib, Z_NO_FLUSH);
if (result == Z_OK || result == Z_STREAM_END) {
// nothing to do
} else if (result == Z_BUF_ERROR) {
// transient error, fall through
} else {
// _error = result;
return;
}
writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - _zlib->avail_out;
if (writtenSize > 0) {
_request->processBodyBytes((char*) _output, writtenSize);
}
if (result == Z_STREAM_END) {
break;
}
} while ((_zlib->avail_in > 0) || (writtenSize > 0));
// update input buffers based on what we consumed
consumeBytes(_inputSize - _zlib->avail_in);
}
bool ContentDecoder::consumeGZipHeader()
{
size_t avail = _inputSize;
if (avail < GZIP_HEADER_SIZE) {
return false; // need more header bytes
}
if ((_input[0] != GZIP_HEADER_ID1) ||
(_input[1] != GZIP_HEADER_ID2) ||
(_input[2] != GZIP_HEADER_METHOD_DEFLATE))
{
return false; // invalid GZip header
}
char flags = _input[3];
unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
if (flags & GZIP_HEADER_FEXTRA) {
gzipHeaderSize += 2;
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(_input + GZIP_HEADER_FEXTRA));
if ( sgIsBigEndian() ) {
sgEndianSwap( &extraHeaderBytes );
}
gzipHeaderSize += extraHeaderBytes;
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
}
#if 0
if (flags & GZIP_HEADER_FNAME) {
gzipHeaderSize++;
while (gzipHeaderSize <= avail) {
if (_input[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
if (flags & GZIP_HEADER_COMMENT) {
gzipHeaderSize++;
while (gzipHeaderSize <= avail) {
if (_input[gzipHeaderSize-1] == 0) {
break; // found terminating NULL character
}
}
}
#endif
if (flags & GZIP_HEADER_CRC) {
gzipHeaderSize += 2;
}
if (avail < gzipHeaderSize) {
return false; // need more header bytes
}
consumeBytes(gzipHeaderSize);
_needGZipHeader = false;
return true;
}
} // of namespace HTTP
} // of namespace simgear

View File

@ -0,0 +1,72 @@
// Written by James Turner
//
// Copyright (C) 2013 James Turner <zakalawe@mac.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
#ifndef SG_HTTP_CONTENT_DECODER_HXX
#define SG_HTTP_CONTENT_DECODER_HXX
#include <string>
#include <zlib.h>
#include <simgear/io/HTTPRequest.hxx>
namespace simgear
{
namespace HTTP
{
class ContentDecoder
{
public:
ContentDecoder();
~ContentDecoder();
void reset();
void initWithRequest(Request_ptr req);
void finish();
void setEncoding(const std::string& encoding);
void receivedBytes(const char* n, size_t s);
private:
bool consumeGZipHeader();
void runDecoder();
void consumeBytes(size_t consumed);
void reallocateInputBuffer(size_t newSize);
Request_ptr _request;
unsigned char* _output;
z_stream* _zlib;
unsigned char* _input;
size_t _inputAllocated, _inputSize;
bool _contentDeflate, _needGZipHeader;
};
} // of namespace HTTP
} // of namespace simgear
#endif // of SG_HTTP_CONTENT_DECODER_HXX

View File

@ -96,6 +96,7 @@ protected:
private:
friend class Client;
friend class Connection;
friend class ContentDecoder;
void processBodyBytes(const char* s, int n);
void setFailure(int code, const std::string& reason);