TerraSync: retry after socket failures

Assume socket failures are intermittent, up to some maximum count
(currently configured as 16). Add a test case to cover this.
This commit is contained in:
Automatic Release Builder 2020-10-13 08:48:44 +01:00
parent 9c530d6978
commit bd9f04d980
12 changed files with 782 additions and 395 deletions

View File

@ -35,10 +35,12 @@ set(SOURCES
sg_socket.cxx
sg_socket_udp.cxx
HTTPClient.cxx
HTTPTestApi_private.hxx
HTTPFileRequest.cxx
HTTPMemoryRequest.cxx
HTTPRequest.cxx
HTTPRepository.cxx
HTTPRepository_private.hxx
untar.cxx
)
@ -81,6 +83,7 @@ add_test(binobj ${EXECUTABLE_OUTPUT_PATH}/test_binobj)
add_executable(test_repository test_repository.cxx)
target_link_libraries(test_repository ${TEST_LIBS})
target_compile_definitions(test_repository PUBLIC BUILDING_TESTSUITE)
add_test(http_repository ${EXECUTABLE_OUTPUT_PATH}/test_repository)
add_executable(test_untar test_untar.cxx)

View File

@ -37,7 +37,6 @@
#include <simgear/simgear_config.h>
#include <curl/multi.h>
#include <simgear/io/sg_netChat.hxx>
@ -47,6 +46,9 @@
#include <simgear/timing/timestamp.hxx>
#include <simgear/structure/exception.hxx>
#include "HTTPClient_private.hxx"
#include "HTTPTestApi_private.hxx"
#if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
#include "version.h"
#else
@ -64,50 +66,20 @@ namespace HTTP
extern const int DEFAULT_HTTP_PORT = 80;
const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
class Connection;
typedef std::multimap<std::string, Connection*> ConnectionDict;
typedef std::list<Request_ptr> RequestList;
class Client::ClientPrivate
{
public:
CURLM* curlMulti;
void createCurlMulti()
{
curlMulti = curl_multi_init();
// see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
// we request HTTP 1.1 pipelining
curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
void Client::ClientPrivate::createCurlMulti() {
curlMulti = curl_multi_init();
// see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
// we request HTTP 1.1 pipelining
curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
#if (LIBCURL_VERSION_MINOR >= 30)
curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
(long) maxPipelineDepth);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
(long) maxHostConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS,
(long)maxConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
(long)maxPipelineDepth);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
(long)maxHostConnections);
#endif
}
typedef std::map<Request_ptr, CURL*> RequestCurlMap;
RequestCurlMap requests;
std::string userAgent;
std::string proxy;
int proxyPort;
std::string proxyAuth;
unsigned int maxConnections;
unsigned int maxHostConnections;
unsigned int maxPipelineDepth;
RequestList pendingRequests;
SGTimeStamp timeTransferSample;
unsigned int bytesTransferred;
unsigned int lastTransferRate;
uint64_t totalBytesDownloaded;
SGPath tlsCertificatePath;
};
}
Client::Client() :
d(new ClientPrivate)
@ -223,12 +195,23 @@ void Client::update(int waitTimeout)
assert(it->second == e);
d->requests.erase(it);
if (msg->data.result == 0) {
req->responseComplete();
} else {
SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
}
bool doProcess = true;
if (d->testsuiteResponseDoneCallback) {
doProcess =
!d->testsuiteResponseDoneCallback(msg->data.result, req);
}
if (doProcess) {
if (msg->data.result == 0) {
req->responseComplete();
} else {
SG_LOG(SG_IO, SG_WARN,
"CURL Result:" << msg->data.result << " "
<< curl_easy_strerror(msg->data.result));
req->setFailure(msg->data.result,
curl_easy_strerror(msg->data.result));
}
}
curl_multi_remove_handle(d->curlMulti, e);
curl_easy_cleanup(e);
@ -559,6 +542,17 @@ void Client::clearAllConnections()
d->createCurlMulti();
}
/////////////////////////////////////////////////////////////////////
void TestApi::setResponseDoneCallback(Client *cl, ResponseDoneCallback cb) {
cl->d->testsuiteResponseDoneCallback = cb;
}
void TestApi::markRequestAsFailed(Request_ptr req, int curlCode,
const std::string &message) {
req->setFailure(curlCode, message);
}
} // of namespace HTTP
} // of namespace simgear

View File

@ -24,7 +24,8 @@
#ifndef SG_HTTP_CLIENT_HXX
#define SG_HTTP_CLIENT_HXX
#include <memory> // for std::unique_ptr
#include <functional>
#include <memory> // for std::unique_ptr
#include <stdint.h> // for uint_64t
#include <simgear/io/HTTPFileRequest.hxx>
@ -125,6 +126,7 @@ private:
friend class Connection;
friend class Request;
friend class TestApi;
class ClientPrivate;
std::unique_ptr<ClientPrivate> d;

View File

@ -0,0 +1,68 @@
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
#pragma once
#include <list>
#include <map>
#include "HTTPClient.hxx"
#include "HTTPRequest.hxx"
#include <simgear/timing/timestamp.hxx>
#include <curl/multi.h>
namespace simgear {
namespace HTTP {
typedef std::list<Request_ptr> RequestList;
using ResponseDoneCallback =
std::function<bool(int curlResult, Request_ptr req)>;
class Client::ClientPrivate {
public:
CURLM *curlMulti;
void createCurlMulti();
typedef std::map<Request_ptr, CURL *> RequestCurlMap;
RequestCurlMap requests;
std::string userAgent;
std::string proxy;
int proxyPort;
std::string proxyAuth;
unsigned int maxConnections;
unsigned int maxHostConnections;
unsigned int maxPipelineDepth;
RequestList pendingRequests;
SGTimeStamp timeTransferSample;
unsigned int bytesTransferred;
unsigned int lastTransferRate;
uint64_t totalBytesDownloaded;
SGPath tlsCertificatePath;
// only used by unit-tests / test-api, but
// only costs us a pointe here to declare it.
ResponseDoneCallback testsuiteResponseDoneCallback;
};
} // namespace HTTP
} // namespace simgear

View File

@ -45,132 +45,43 @@
#include <simgear/misc/sg_hash.hxx>
#include "HTTPRepository_private.hxx"
namespace simgear
{
class HTTPDirectory;
using HTTPDirectory_ptr = std::unique_ptr<HTTPDirectory>;
namespace {
class HTTPRepoGetRequest : public HTTP::Request
{
public:
HTTPRepoGetRequest(HTTPDirectory* d, const std::string& u) :
HTTP::Request(u),
_directory(d)
{
}
std::string innerResultCodeAsString(HTTPRepository::ResultCode code) {
switch (code) {
case HTTPRepository::REPO_NO_ERROR:
return "no error";
case HTTPRepository::REPO_ERROR_NOT_FOUND:
return "not found";
case HTTPRepository::REPO_ERROR_SOCKET:
return "socket error";
case HTTPRepository::SVN_ERROR_XML:
return "malformed XML";
case HTTPRepository::SVN_ERROR_TXDELTA:
return "malformed XML";
case HTTPRepository::REPO_ERROR_IO:
return "I/O error";
case HTTPRepository::REPO_ERROR_CHECKSUM:
return "checksum verification error";
case HTTPRepository::REPO_ERROR_FILE_NOT_FOUND:
return "file not found";
case HTTPRepository::REPO_ERROR_HTTP:
return "HTTP-level error";
case HTTPRepository::REPO_ERROR_CANCELLED:
return "cancelled";
case HTTPRepository::REPO_PARTIAL_UPDATE:
return "partial update (incomplete)";
}
virtual void cancel();
return "Unknown response code";
}
size_t contentSize() const
{
return _contentSize;
}
void setContentSize(size_t sz)
{
_contentSize = sz;
}
protected:
HTTPDirectory* _directory;
size_t _contentSize = 0;
};
typedef SGSharedPtr<HTTPRepoGetRequest> RepoRequestPtr;
std::string innerResultCodeAsString(HTTPRepository::ResultCode code)
{
switch (code) {
case HTTPRepository::REPO_NO_ERROR: return "no error";
case HTTPRepository::REPO_ERROR_NOT_FOUND: return "not found";
case HTTPRepository::REPO_ERROR_SOCKET: return "socket error";
case HTTPRepository::SVN_ERROR_XML: return "malformed XML";
case HTTPRepository::SVN_ERROR_TXDELTA: return "malformed XML";
case HTTPRepository::REPO_ERROR_IO: return "I/O error";
case HTTPRepository::REPO_ERROR_CHECKSUM: return "checksum verification error";
case HTTPRepository::REPO_ERROR_FILE_NOT_FOUND: return "file not found";
case HTTPRepository::REPO_ERROR_HTTP: return "HTTP-level error";
case HTTPRepository::REPO_ERROR_CANCELLED: return "cancelled";
case HTTPRepository::REPO_PARTIAL_UPDATE: return "partial update (incomplete)";
}
return "Unknown response code";
}
class HTTPRepoPrivate
{
public:
struct HashCacheEntry
{
std::string filePath;
time_t modTime;
size_t lengthBytes;
std::string hashHex;
};
typedef std::vector<HashCacheEntry> HashCache;
HashCache hashes;
int hashCacheDirty = 0;
struct Failure
{
SGPath path;
HTTPRepository::ResultCode error;
};
typedef std::vector<Failure> FailureList;
FailureList failures;
HTTPRepoPrivate(HTTPRepository* parent) :
p(parent),
isUpdating(false),
status(HTTPRepository::REPO_NO_ERROR),
totalDownloaded(0)
{ ; }
~HTTPRepoPrivate();
HTTPRepository* p; // link back to outer
HTTP::Client* http;
std::string baseUrl;
SGPath basePath;
bool isUpdating;
HTTPRepository::ResultCode status;
HTTPDirectory_ptr rootDir;
size_t totalDownloaded;
HTTPRepository::SyncPredicate syncPredicate;
HTTP::Request_ptr updateFile(HTTPDirectory* dir, const std::string& name,
size_t sz);
HTTP::Request_ptr updateDir(HTTPDirectory* dir, const std::string& hash,
size_t sz);
std::string hashForPath(const SGPath& p);
void updatedFileContents(const SGPath& p, const std::string& newHash);
void parseHashCache();
std::string computeHashForPath(const SGPath& p);
void writeHashCache();
void failedToGetRootIndex(HTTPRepository::ResultCode st);
void failedToUpdateChild(const SGPath& relativePath,
HTTPRepository::ResultCode fileStatus);
typedef std::vector<RepoRequestPtr> RequestVector;
RequestVector queuedRequests,
activeRequests;
void makeRequest(RepoRequestPtr req);
void finishedRequest(const RepoRequestPtr& req);
HTTPDirectory* getOrCreateDirectory(const std::string& path);
bool deleteDirectory(const std::string& relPath, const SGPath& absPath);
typedef std::vector<HTTPDirectory_ptr> DirectoryVector;
DirectoryVector directories;
SGPath installedCopyPath;
};
} // namespace
class HTTPDirectory
{
@ -243,6 +154,8 @@ public:
children.clear();
parseDirIndex(children);
std::sort(children.begin(), children.end());
_repository->updatedChildSuccessfully(_relativePath);
}
void failedToUpdate(HTTPRepository::ResultCode status)
@ -470,9 +383,14 @@ public:
SG_LOG(SG_TERRASYNC, SG_WARN, "Checksum error for " << absolutePath() << "/" << file << " " << it->hash << " " << hash);
// we don't erase the file on a hash mismatch, because if we're syncing during the
// middle of a server-side update, the downloaded file may actually become valid.
_repository->failedToUpdateChild(_relativePath, HTTPRepository::REPO_ERROR_CHECKSUM);
_repository->failedToUpdateChild(
_relativePath + "/" + file,
HTTPRepository::REPO_ERROR_CHECKSUM);
} else {
_repository->updatedFileContents(it->path, hash);
_repository->updatedChildSuccessfully(_relativePath + "/" +
file);
_repository->totalDownloaded += sz;
SGPath p = SGPath(absolutePath(), file);
@ -779,6 +697,10 @@ std::string HTTPRepository::resultCodeAsString(ResultCode code)
return innerResultCodeAsString(code);
}
HTTPRepository::FailureVec HTTPRepository::failures() const {
return _d->failures;
}
void HTTPRepository::setFilter(SyncPredicate sp) { _d->syncPredicate = sp; }
HTTPRepository::ResultCode
@ -809,61 +731,80 @@ HTTPRepository::failure() const
}
protected:
virtual void gotBodyData(const char* s, int n)
{
if (!file.get()) {
file.reset(new SGBinaryFile(pathInRepo));
if (!file->open(SG_IO_OUT)) {
SG_LOG(SG_TERRASYNC, SG_WARN, "unable to create file " << pathInRepo);
_directory->repository()->http->cancelRequest(this, "Unable to create output file:" + pathInRepo.utf8Str());
}
void gotBodyData(const char *s, int n) override {
if (!file.get()) {
file.reset(new SGBinaryFile(pathInRepo));
if (!file->open(SG_IO_OUT)) {
SG_LOG(SG_TERRASYNC, SG_WARN,
"unable to create file " << pathInRepo);
_directory->repository()->http->cancelRequest(
this, "Unable to create output file:" + pathInRepo.utf8Str());
}
sha1_init(&hashContext);
}
sha1_write(&hashContext, s, n);
file->write(s, n);
sha1_init(&hashContext);
}
virtual void onDone()
{
file->close();
if (responseCode() == 200) {
std::string hash = strutils::encodeHex(sha1_result(&hashContext), HASH_LENGTH);
_directory->didUpdateFile(fileName, hash, contentSize());
} else if (responseCode() == 404) {
SG_LOG(SG_TERRASYNC, SG_WARN, "terrasync file not found on server: " << fileName << " for " << _directory->absolutePath());
_directory->didFailToUpdateFile(fileName, HTTPRepository::REPO_ERROR_FILE_NOT_FOUND);
} else {
SG_LOG(SG_TERRASYNC, SG_WARN, "terrasync file download error on server: " << fileName << " for " << _directory->absolutePath() <<
"\n\tserver responded: " << responseCode() << "/" << responseReason());
_directory->didFailToUpdateFile(fileName, HTTPRepository::REPO_ERROR_HTTP);
}
sha1_write(&hashContext, s, n);
file->write(s, n);
}
_directory->repository()->finishedRequest(this);
void onDone() override {
file->close();
if (responseCode() == 200) {
std::string hash =
strutils::encodeHex(sha1_result(&hashContext), HASH_LENGTH);
_directory->didUpdateFile(fileName, hash, contentSize());
} else if (responseCode() == 404) {
SG_LOG(SG_TERRASYNC, SG_WARN,
"terrasync file not found on server: "
<< fileName << " for " << _directory->absolutePath());
_directory->didFailToUpdateFile(
fileName, HTTPRepository::REPO_ERROR_FILE_NOT_FOUND);
} else {
SG_LOG(SG_TERRASYNC, SG_WARN,
"terrasync file download error on server: "
<< fileName << " for " << _directory->absolutePath()
<< "\n\tserver responded: " << responseCode() << "/"
<< responseReason());
_directory->didFailToUpdateFile(fileName,
HTTPRepository::REPO_ERROR_HTTP);
// should we every retry here?
}
virtual void onFail()
{
HTTPRepository::ResultCode code = HTTPRepository::REPO_ERROR_SOCKET;
if (responseCode() == -1) {
code = HTTPRepository::REPO_ERROR_CANCELLED;
}
_directory->repository()->finishedRequest(
this, HTTPRepoPrivate::RequestFinish::Done);
}
if (file) {
file->close();
}
file.reset();
if (pathInRepo.exists()) {
pathInRepo.remove();
}
if (_directory) {
_directory->didFailToUpdateFile(fileName, code);
_directory->repository()->finishedRequest(this);
}
void onFail() override {
HTTPRepository::ResultCode code = HTTPRepository::REPO_ERROR_SOCKET;
if (responseCode() == -1) {
code = HTTPRepository::REPO_ERROR_CANCELLED;
}
if (file) {
file->close();
}
file.reset();
if (pathInRepo.exists()) {
pathInRepo.remove();
}
if (_directory) {
_directory->didFailToUpdateFile(fileName, code);
const auto doRetry = code == HTTPRepository::REPO_ERROR_SOCKET
? HTTPRepoPrivate::RequestFinish::Retry
: HTTPRepoPrivate::RequestFinish::Done;
_directory->repository()->finishedRequest(this, doRetry);
}
}
void prepareForRetry() override {
HTTP::Request::prepareForRetry();
file.reset();
}
private:
static std::string makeUrl(HTTPDirectory* d, const std::string& file)
{
@ -897,80 +838,106 @@ HTTPRepository::failure() const
return _isRootDir;
}
protected:
virtual void gotBodyData(const char* s, int n)
{
body += std::string(s, n);
sha1_write(&hashContext, s, n);
void prepareForRetry() override {
body.clear();
sha1_init(&hashContext);
HTTP::Request::prepareForRetry();
}
virtual void onDone()
{
if (responseCode() == 200) {
std::string hash = strutils::encodeHex(sha1_result(&hashContext), HASH_LENGTH);
if (!_targetHash.empty() && (hash != _targetHash)) {
SG_LOG(SG_TERRASYNC, SG_WARN,
"Checksum error getting dirIndex for:"
<< _directory->relativePath() << "; expected "
<< _targetHash << " but received " << hash);
protected:
void gotBodyData(const char *s, int n) override {
body += std::string(s, n);
sha1_write(&hashContext, s, n);
}
_directory->failedToUpdate(
HTTPRepository::REPO_ERROR_CHECKSUM);
_directory->repository()->finishedRequest(this);
return;
}
void onDone() override {
if (responseCode() == 200) {
std::string hash =
strutils::encodeHex(sha1_result(&hashContext), HASH_LENGTH);
if (!_targetHash.empty() && (hash != _targetHash)) {
SG_LOG(SG_TERRASYNC, SG_WARN,
"Checksum error getting dirIndex for:"
<< _directory->relativePath() << "; expected "
<< _targetHash << " but received " << hash);
std::string curHash = _directory->repository()->hashForPath(path());
if (hash != curHash) {
simgear::Dir d(_directory->absolutePath());
if (!d.exists()) {
if (!d.create(0700)) {
throw sg_io_exception("Unable to create directory", d.path());
}
}
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_CHECKSUM);
// dir index data has changed, so write to disk and update
// the hash accordingly
sg_ofstream of(pathInRepo(), std::ios::trunc | std::ios::out | std::ios::binary);
if (!of.is_open()) {
throw sg_io_exception("Failed to open directory index file for writing", pathInRepo());
}
of.write(body.data(), body.size());
of.close();
_directory->dirIndexUpdated(hash);
//SG_LOG(SG_TERRASYNC, SG_INFO, "updated dir index " << _directory->absolutePath());
}
_directory->repository()->totalDownloaded += contentSize();
try {
// either way we've confirmed the index is valid so update
// children now
SGTimeStamp st;
st.stamp();
_directory->updateChildrenBasedOnHash();
SG_LOG(SG_TERRASYNC, SG_DEBUG, "after update of:" << _directory->absolutePath() << " child update took:" << st.elapsedMSec());
} catch (sg_exception& ) {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_IO);
}
} else if (responseCode() == 404) {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_FILE_NOT_FOUND);
} else {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_HTTP);
// don't retry checkums failures
_directory->repository()->finishedRequest(
this, HTTPRepoPrivate::RequestFinish::Done);
return;
}
_directory->repository()->finishedRequest(this);
std::string curHash = _directory->repository()->hashForPath(path());
if (hash != curHash) {
simgear::Dir d(_directory->absolutePath());
if (!d.exists()) {
if (!d.create(0700)) {
throw sg_io_exception("Unable to create directory", d.path());
}
}
// dir index data has changed, so write to disk and update
// the hash accordingly
sg_ofstream of(pathInRepo(), std::ios::trunc | std::ios::out |
std::ios::binary);
if (!of.is_open()) {
throw sg_io_exception(
"Failed to open directory index file for writing",
pathInRepo());
}
of.write(body.data(), body.size());
of.close();
_directory->dirIndexUpdated(hash);
// SG_LOG(SG_TERRASYNC, SG_INFO, "updated dir index " <<
// _directory->absolutePath());
}
_directory->repository()->totalDownloaded += contentSize();
try {
// either way we've confirmed the index is valid so update
// children now
SGTimeStamp st;
st.stamp();
_directory->updateChildrenBasedOnHash();
SG_LOG(SG_TERRASYNC, SG_DEBUG,
"after update of:" << _directory->absolutePath()
<< " child update took:"
<< st.elapsedMSec());
} catch (sg_exception &) {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_IO);
}
} else if (responseCode() == 404) {
_directory->failedToUpdate(
HTTPRepository::REPO_ERROR_FILE_NOT_FOUND);
} else {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_HTTP);
}
_directory->repository()->finishedRequest(
this, HTTPRepoPrivate::RequestFinish::Done);
}
virtual void onFail()
{
if (_directory) {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_SOCKET);
_directory->repository()->finishedRequest(this);
}
void onFail() override {
HTTPRepository::ResultCode code = HTTPRepository::REPO_ERROR_SOCKET;
if (responseCode() == -1) {
code = HTTPRepository::REPO_ERROR_CANCELLED;
}
SG_LOG(SG_TERRASYNC, SG_WARN,
"Socket failure getting directory: " << url());
if (_directory) {
_directory->failedToUpdate(code);
const auto doRetry = code == HTTPRepository::REPO_ERROR_SOCKET
? HTTPRepoPrivate::RequestFinish::Retry
: HTTPRepoPrivate::RequestFinish::Done;
_directory->repository()->finishedRequest(this, doRetry);
}
}
private:
static std::string makeUrl(HTTPDirectory* d)
{
@ -1194,10 +1161,9 @@ HTTPRepository::failure() const
DirectoryWithPath p(relPath);
auto it = std::find_if(directories.begin(), directories.end(), p);
if (it != directories.end()) {
HTTPDirectory* d = it->get();
assert(d->absolutePath() == absPath);
directories.erase(it);
} else {
assert((*it)->absolutePath() == absPath);
directories.erase(it);
} else {
// we encounter this code path when deleting an orphaned directory
}
@ -1213,40 +1179,46 @@ HTTPRepository::failure() const
void HTTPRepoPrivate::makeRequest(RepoRequestPtr req)
{
if (activeRequests.size() > 4) {
queuedRequests.push_back(req);
queuedRequests.push_back(req);
} else {
activeRequests.push_back(req);
http->makeRequest(req);
}
}
void HTTPRepoPrivate::finishedRequest(const RepoRequestPtr& req)
{
RequestVector::iterator it = std::find(activeRequests.begin(), activeRequests.end(), req);
// in some cases, for example a checksum failure, we clear the active
// and queued request vectors, so the ::find above can fail
if (it != activeRequests.end()) {
activeRequests.erase(it);
}
void HTTPRepoPrivate::finishedRequest(const RepoRequestPtr &req,
RequestFinish retryRequest) {
auto it = std::find(activeRequests.begin(), activeRequests.end(), req);
// in some cases, we clear the active
// and queued request vectors, so the ::find above can fail
if (it != activeRequests.end()) {
activeRequests.erase(it);
}
if (!queuedRequests.empty()) {
RepoRequestPtr rr = queuedRequests.front();
queuedRequests.erase(queuedRequests.begin());
activeRequests.push_back(rr);
http->makeRequest(rr);
}
if (retryRequest == HTTPRepoPrivate::RequestFinish::Retry) {
SG_LOG(SG_TERRASYNC, SG_INFO, "Retrying request for:" << req->url());
req->prepareForRetry();
queuedRequests.push_back(req);
}
// rate limit how often we write this, since otherwise
// it dominates the time on Windows. 256 seems about right,
// causes a write a few times a minute.
if (hashCacheDirty > 256) {
writeHashCache();
}
if (!queuedRequests.empty()) {
RepoRequestPtr rr = queuedRequests.front();
queuedRequests.erase(queuedRequests.begin());
activeRequests.push_back(rr);
http->makeRequest(rr);
}
if (activeRequests.empty() && queuedRequests.empty()) {
isUpdating = false;
writeHashCache();
}
// rate limit how often we write this, since otherwise
// it dominates the time on Windows. 256 seems about right,
// causes a write a few times a minute.
if (hashCacheDirty > 256) {
writeHashCache();
}
if (activeRequests.empty() && queuedRequests.empty()) {
isUpdating = false;
writeHashCache();
}
}
void HTTPRepoPrivate::failedToGetRootIndex(HTTPRepository::ResultCode st)
@ -1262,36 +1234,49 @@ HTTPRepository::failure() const
void HTTPRepoPrivate::failedToUpdateChild(const SGPath& relativePath,
HTTPRepository::ResultCode fileStatus)
{
if (fileStatus == HTTPRepository::REPO_ERROR_CHECKSUM) {
// stop updating, and mark repository as failed, becuase this
// usually indicates we need to start a fresh update from the
// root.
// (we could issue a retry here, but we leave that to higher layers)
status = fileStatus;
if (fileStatus == HTTPRepository::REPO_ERROR_CANCELLED) {
// if we were cancelled, don't report or log
return;
} else {
SG_LOG(SG_TERRASYNC, SG_WARN,
"failed to update entry:" << relativePath << " status/code: "
<< innerResultCodeAsString(fileStatus)
<< "/" << fileStatus);
}
queuedRequests.clear();
HTTPRepository::Failure f;
f.path = relativePath;
f.error = fileStatus;
failures.push_back(f);
RequestVector copyOfActive(activeRequests);
RequestVector::iterator rq;
for (rq = copyOfActive.begin(); rq != copyOfActive.end(); ++rq) {
http->cancelRequest(*rq, "Repository updated failed due to checksum error");
}
if (failures.size() >= maxPermittedFailures) {
SG_LOG(SG_TERRASYNC, SG_WARN,
"Repo:" << baseUrl << " exceeded failure count ("
<< failures.size() << "), abandoning");
SG_LOG(SG_TERRASYNC, SG_WARN, "failed to update repository:" << baseUrl
<< "\n\tchecksum failure for: " << relativePath
<< "\n\tthis typically indicates the remote repository is corrupt or was being updated during the sync");
} else if (fileStatus == HTTPRepository::REPO_ERROR_CANCELLED) {
// if we were cancelled, don't report or log
return;
} else {
SG_LOG(SG_TERRASYNC, SG_WARN, "failed to update entry:" << relativePath << " status/code: "
<< innerResultCodeAsString(fileStatus) << "/" << fileStatus);
status = HTTPRepository::REPO_PARTIAL_UPDATE;
queuedRequests.clear();
auto copyOfActiveRequests = activeRequests;
for (auto rq : copyOfActiveRequests) {
http->cancelRequest(rq,
"Abandoning repo sync due to multiple failures");
}
}
}
Failure f;
f.path = relativePath;
f.error = fileStatus;
failures.push_back(f);
void HTTPRepoPrivate::updatedChildSuccessfully(const SGPath &relativePath) {
if (failures.empty()) {
return;
}
// find and remove any existing failures for that path
failures.erase(
std::remove_if(failures.begin(), failures.end(),
[relativePath](const HTTPRepository::Failure &f) {
return f.path == relativePath;
}),
failures.end());
}
} // of namespace simgear

View File

@ -33,64 +33,77 @@ class HTTPRepoPrivate;
class HTTPRepository
{
public:
enum ResultCode {
REPO_NO_ERROR = 0,
REPO_ERROR_NOT_FOUND,
REPO_ERROR_SOCKET,
SVN_ERROR_XML,
SVN_ERROR_TXDELTA,
REPO_ERROR_IO,
REPO_ERROR_CHECKSUM,
REPO_ERROR_FILE_NOT_FOUND,
REPO_ERROR_HTTP,
REPO_ERROR_CANCELLED,
REPO_PARTIAL_UPDATE
};
enum ResultCode {
REPO_NO_ERROR = 0,
REPO_ERROR_NOT_FOUND,
REPO_ERROR_SOCKET,
SVN_ERROR_XML,
SVN_ERROR_TXDELTA,
REPO_ERROR_IO,
REPO_ERROR_CHECKSUM,
REPO_ERROR_FILE_NOT_FOUND,
REPO_ERROR_HTTP,
REPO_ERROR_CANCELLED,
REPO_PARTIAL_UPDATE ///< repository is working, but file-level failures
///< occurred
};
HTTPRepository(const SGPath& root, HTTP::Client* cl);
virtual ~HTTPRepository();
HTTPRepository(const SGPath &root, HTTP::Client *cl);
virtual ~HTTPRepository();
virtual SGPath fsBase() const;
virtual SGPath fsBase() const;
virtual void setBaseUrl(const std::string& url);
virtual std::string baseUrl() const;
virtual void setBaseUrl(const std::string &url);
virtual std::string baseUrl() const;
virtual HTTP::Client* http() const;
virtual HTTP::Client *http() const;
virtual void update();
virtual void update();
virtual bool isDoingSync() const;
virtual bool isDoingSync() const;
virtual ResultCode failure() const;
virtual ResultCode failure() const;
virtual size_t bytesToDownload() const;
virtual size_t bytesToDownload() const;
virtual size_t bytesDownloaded() const;
virtual size_t bytesDownloaded() const;
/**
* optionally provide the location of an installer copy of this
* repository. When a file is missing it will be copied from this tree.
*/
void setInstalledCopyPath(const SGPath& copyPath);
static std::string resultCodeAsString(ResultCode code);
/**
* optionally provide the location of an installer copy of this
* repository. When a file is missing it will be copied from this tree.
*/
void setInstalledCopyPath(const SGPath &copyPath);
enum class SyncAction { Add, Update, Delete, UpToDate };
static std::string resultCodeAsString(ResultCode code);
enum EntryType { FileType, DirectoryType, TarballType };
enum class SyncAction { Add, Update, Delete, UpToDate };
struct SyncItem {
const std::string directory; // relative path in the repository
const EntryType type;
const std::string filename;
const SyncAction action;
const SGPath pathOnDisk; // path the entry does / will have
};
enum EntryType { FileType, DirectoryType, TarballType };
struct SyncItem {
const std::string directory; // relative path in the repository
const EntryType type;
const std::string filename;
const SyncAction action;
const SGPath pathOnDisk; // path the entry does / will have
};
using SyncPredicate = std::function<bool(const SyncItem &item)>;
void setFilter(SyncPredicate sp);
struct Failure {
SGPath path;
ResultCode error;
};
using FailureVec = std::vector<Failure>;
/**
* @brief return file-level failures
*/
FailureVec failures() const;
private:
bool isBare() const;

View File

@ -0,0 +1,122 @@
// HTTPRepository.cxx -- plain HTTP TerraSync remote client
//
// Copyright (C) 20126 James Turner <zakalawe@mac.com>
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation; either version 2 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
// USA.
#pragma once
#include <memory>
#include <string>
#include <simgear/io/HTTPClient.hxx>
#include <simgear/misc/sg_path.hxx>
#include "HTTPRepository.hxx"
namespace simgear {
class HTTPDirectory;
using HTTPDirectory_ptr = std::unique_ptr<HTTPDirectory>;
class HTTPRepoGetRequest : public HTTP::Request {
public:
HTTPRepoGetRequest(HTTPDirectory *d, const std::string &u)
: HTTP::Request(u), _directory(d) {}
virtual void cancel();
size_t contentSize() const { return _contentSize; }
void setContentSize(size_t sz) { _contentSize = sz; }
protected:
HTTPDirectory *_directory;
size_t _contentSize = 0;
};
typedef SGSharedPtr<HTTPRepoGetRequest> RepoRequestPtr;
class HTTPRepoPrivate {
public:
struct HashCacheEntry {
std::string filePath;
time_t modTime;
size_t lengthBytes;
std::string hashHex;
};
typedef std::vector<HashCacheEntry> HashCache;
HashCache hashes;
int hashCacheDirty = 0;
HTTPRepository::FailureVec failures;
int maxPermittedFailures = 16;
HTTPRepoPrivate(HTTPRepository *parent)
: p(parent), isUpdating(false), status(HTTPRepository::REPO_NO_ERROR),
totalDownloaded(0) {
;
}
~HTTPRepoPrivate();
HTTPRepository *p; // link back to outer
HTTP::Client *http;
std::string baseUrl;
SGPath basePath;
bool isUpdating;
HTTPRepository::ResultCode status;
HTTPDirectory_ptr rootDir;
size_t totalDownloaded;
HTTPRepository::SyncPredicate syncPredicate;
HTTP::Request_ptr updateFile(HTTPDirectory *dir, const std::string &name,
size_t sz);
HTTP::Request_ptr updateDir(HTTPDirectory *dir, const std::string &hash,
size_t sz);
std::string hashForPath(const SGPath &p);
void updatedFileContents(const SGPath &p, const std::string &newHash);
void parseHashCache();
std::string computeHashForPath(const SGPath &p);
void writeHashCache();
void failedToGetRootIndex(HTTPRepository::ResultCode st);
void failedToUpdateChild(const SGPath &relativePath,
HTTPRepository::ResultCode fileStatus);
void updatedChildSuccessfully(const SGPath &relativePath);
typedef std::vector<RepoRequestPtr> RequestVector;
RequestVector queuedRequests, activeRequests;
void makeRequest(RepoRequestPtr req);
enum class RequestFinish { Done, Retry };
void finishedRequest(const RepoRequestPtr &req, RequestFinish retryRequest);
HTTPDirectory *getOrCreateDirectory(const std::string &path);
bool deleteDirectory(const std::string &relPath, const SGPath &absPath);
typedef std::vector<HTTPDirectory_ptr> DirectoryVector;
DirectoryVector directories;
SGPath installedCopyPath;
};
} // namespace simgear

View File

@ -56,6 +56,15 @@ Request::~Request()
}
void Request::prepareForRetry() {
setReadyState(UNSENT);
_willClose = false;
_connectionCloseHeader = false;
_responseStatus = 0;
_responseLength = 0;
_receivedBodyBytes = 0;
}
//------------------------------------------------------------------------------
Request* Request::done(const Callback& cb)
{

View File

@ -208,7 +208,9 @@ public:
*/
bool serverSupportsPipelining() const;
protected:
virtual void prepareForRetry();
protected:
Request(const std::string& url, const std::string method = "GET");
virtual void requestStart();
@ -222,12 +224,14 @@ protected:
virtual void onFail();
virtual void onAlways();
void setFailure(int code, const std::string& reason);
void setSuccess(int code);
private:
void setFailure(int code, const std::string &reason);
private:
friend class Client;
friend class Connection;
friend class ContentDecoder;
friend class TestApi;
Request(const Request&); // = delete;
Request& operator=(const Request&); // = delete;

View File

@ -0,0 +1,45 @@
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
#pragma once
#include <functional>
#include "HTTPRequest.hxx"
namespace simgear {
namespace HTTP {
class Client;
using ResponseDoneCallback =
std::function<bool(int curlResult, Request_ptr req)>;
/**
* @brief this API is for unit-testing HTTP code.
* Don't use it for anything else. It's for unit-testing.
*/
class TestApi {
public:
// alow test suite to manipulate requests to simulate network errors;
// without this, it's hard to provoke certain failures in a loop-back
// network sitation.
static void setResponseDoneCallback(Client *cl, ResponseDoneCallback cb);
static void markRequestAsFailed(Request_ptr req, int curlCode,
const std::string &message);
};
} // namespace HTTP
} // namespace simgear

View File

@ -1,16 +1,18 @@
#include <cassert>
#include <cstdlib>
#include <errno.h>
#include <fcntl.h>
#include <functional>
#include <iostream>
#include <map>
#include <sstream>
#include <errno.h>
#include <fcntl.h>
#include <simgear/simgear_config.h>
#include "test_HTTP.hxx"
#include "HTTPRepository.hxx"
#include "HTTPClient.hxx"
#include "HTTPRepository.hxx"
#include "HTTPTestApi_private.hxx"
#include "test_HTTP.hxx"
#include <simgear/misc/strutils.hxx>
#include <simgear/misc/sg_hash.hxx>
@ -25,6 +27,8 @@
using namespace simgear;
using TestApi = simgear::HTTP::TestApi;
std::string dataForFile(const std::string& parentName, const std::string& name, int revision)
{
std::ostringstream os;
@ -45,6 +49,9 @@ std::string hashForData(const std::string& d)
return strutils::encodeHex(sha1_result(&info), HASH_LENGTH);
}
class TestRepoEntry;
using AccessCallback = std::function<void(TestRepoEntry &entry)>;
class TestRepoEntry
{
public:
@ -70,7 +77,8 @@ public:
int requestCount;
bool getWillFail;
bool returnCorruptData;
std::unique_ptr<SGCallback> accessCallback;
AccessCallback accessCallback;
void clearRequestCounts();
@ -270,8 +278,8 @@ public:
return;
}
if (entry->accessCallback.get()) {
(*entry->accessCallback)();
if (entry->accessCallback) {
entry->accessCallback(*entry);
}
if (entry->getWillFail) {
@ -282,20 +290,29 @@ public:
entry->requestCount++;
std::string content;
bool closeSocket = false;
size_t contentSize = 0;
if (entry->returnCorruptData) {
content = dataForFile("!$£$!" + entry->parent->name,
"corrupt_" + entry->name,
entry->revision);
contentSize = content.size();
} else {
content = entry->data();
content = entry->data();
contentSize = content.size();
}
std::stringstream d;
d << "HTTP/1.1 " << 200 << " " << reasonForCode(200) << "\r\n";
d << "Content-Length:" << content.size() << "\r\n";
d << "Content-Length:" << contentSize << "\r\n";
d << "\r\n"; // final CRLF to terminate the headers
d << content;
push(d.str().c_str());
if (closeSocket) {
closeWhenDone();
}
} else {
sendErrorResponse(404, false, "");
}
@ -401,6 +418,15 @@ void waitForUpdateComplete(HTTP::Client* cl, HTTPRepository* repo)
std::cerr << "timed out" << std::endl;
}
void runForTime(HTTP::Client *cl, HTTPRepository *repo, int msec = 15) {
SGTimeStamp start(SGTimeStamp::now());
while (start.elapsedMSec() < msec) {
cl->update();
testServer.poll();
SGTimeStamp::sleepForMSec(1);
}
}
void testBasicClone(HTTP::Client* cl)
{
std::unique_ptr<HTTPRepository> repo;
@ -618,9 +644,19 @@ void testAbandonCorruptFiles(HTTP::Client* cl)
repo->setBaseUrl("http://localhost:2000/repo");
repo->update();
waitForUpdateComplete(cl, repo.get());
if (repo->failure() != HTTPRepository::REPO_ERROR_CHECKSUM) {
std::cerr << "Got failure state:" << repo->failure() << std::endl;
throw sg_exception("Bad result from corrupt files test");
if (repo->failure() != HTTPRepository::REPO_PARTIAL_UPDATE) {
std::cerr << "Got failure state:" << repo->failure() << std::endl;
throw sg_exception("Bad result from corrupt files test");
}
auto failedFiles = repo->failures();
if (failedFiles.size() != 1) {
throw sg_exception("Bad result from corrupt files test");
}
if (failedFiles.front().path.utf8Str() != "dirB/subdirG/fileBGA") {
throw sg_exception("Bad path from corrupt files test:" +
failedFiles.front().path.utf8Str());
}
repo.reset();
@ -657,15 +693,21 @@ void testServerModifyDuringSync(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
global_repo->findEntry("dirA/fileAA")->accessCallback.reset(make_callback(&modifyBTree));
global_repo->findEntry("dirA/fileAA")->accessCallback =
[](const TestRepoEntry &r) {
std::cout << "Modifying sub-tree" << std::endl;
global_repo->findEntry("dirB/subdirA/fileBAC")->revision++;
global_repo->defineFile("dirB/subdirZ/fileBZA");
global_repo->findEntry("dirB/subdirB/fileBBB")->revision++;
};
repo->update();
waitForUpdateComplete(cl, repo.get());
global_repo->findEntry("dirA/fileAA")->accessCallback.reset();
global_repo->findEntry("dirA/fileAA")->accessCallback = AccessCallback{};
if (repo->failure() != HTTPRepository::REPO_ERROR_CHECKSUM) {
throw sg_exception("Bad result from modify during sync test");
if (repo->failure() != HTTPRepository::REPO_PARTIAL_UPDATE) {
throw sg_exception("Bad result from modify during sync test");
}
std::cout << "Passed test modify server during sync" << std::endl;
@ -755,6 +797,103 @@ void testCopyInstalledChildren(HTTP::Client* cl)
std::cout << "passed Copy installed children" << std::endl;
}
void testRetryAfterSocketFailure(HTTP::Client *cl) {
global_repo->clearRequestCounts();
global_repo->clearFailFlags();
std::unique_ptr<HTTPRepository> repo;
SGPath p(simgear::Dir::current().path());
p.append("http_repo_retry_after_socket_fail");
simgear::Dir pd(p);
if (pd.exists()) {
pd.removeChildren();
}
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
int aaFailsRemaining = 2;
int subdirBAFailsRemaining = 2;
TestApi::setResponseDoneCallback(
cl, [&aaFailsRemaining, &subdirBAFailsRemaining](int curlResult,
HTTP::Request_ptr req) {
if (req->url() == "http://localhost:2000/repo/dirA/fileAA") {
if (aaFailsRemaining == 0)
return false;
--aaFailsRemaining;
TestApi::markRequestAsFailed(req, 56, "Simulated socket failure");
return true;
} else if (req->url() ==
"http://localhost:2000/repo/dirB/subdirA/.dirindex") {
if (subdirBAFailsRemaining == 0)
return false;
--subdirBAFailsRemaining;
TestApi::markRequestAsFailed(req, 56, "Simulated socket failure");
return true;
} else {
return false;
}
});
repo->update();
waitForUpdateComplete(cl, repo.get());
if (repo->failure() != HTTPRepository::REPO_NO_ERROR) {
throw sg_exception("Bad result from retry socket failure test");
}
verifyFileState(p, "dirA/fileAA");
verifyFileState(p, "dirB/subdirA/fileBAA");
verifyFileState(p, "dirB/subdirA/fileBAC");
verifyRequestCount("dirA/fileAA", 3);
verifyRequestCount("dirB/subdirA", 3);
verifyRequestCount("dirB/subdirA/fileBAC", 1);
}
void testPersistentSocketFailure(HTTP::Client *cl) {
global_repo->clearRequestCounts();
global_repo->clearFailFlags();
std::unique_ptr<HTTPRepository> repo;
SGPath p(simgear::Dir::current().path());
p.append("http_repo_persistent_socket_fail");
simgear::Dir pd(p);
if (pd.exists()) {
pd.removeChildren();
}
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
TestApi::setResponseDoneCallback(
cl, [](int curlResult, HTTP::Request_ptr req) {
const auto url = req->url();
if (url.find("http://localhost:2000/repo/dirB") == 0) {
TestApi::markRequestAsFailed(req, 56, "Simulated socket failure");
return true;
}
return false;
});
repo->update();
waitForUpdateComplete(cl, repo.get());
if (repo->failure() != HTTPRepository::REPO_PARTIAL_UPDATE) {
throw sg_exception("Bad result from retry socket failure test");
}
verifyFileState(p, "dirA/fileAA");
verifyRequestCount("dirA/fileAA", 1);
verifyRequestCount("dirD/fileDA", 1);
verifyRequestCount("dirD/subdirDA/fileDAA", 1);
verifyRequestCount("dirD/subdirDB/fileDBA", 1);
}
int main(int argc, char* argv[])
{
sglog().setLogLevels( SG_ALL, SG_INFO );
@ -800,6 +939,8 @@ int main(int argc, char* argv[])
cl.clearAllConnections();
testCopyInstalledChildren(&cl);
testRetryAfterSocketFailure(&cl);
testPersistentSocketFailure(&cl);
std::cout << "all tests passed ok" << std::endl;
return 0;

View File

@ -744,6 +744,7 @@ void SGTerraSync::WorkerThread::fail(SyncItem failedItem)
_state._fail_count++;
failedItem._status = SyncItem::Failed;
_freshTiles.push_back(failedItem);
// not we also end up here for partial syncs
SG_LOG(SG_TERRASYNC,SG_INFO,
"Failed to sync'" << failedItem._dir << "'");
_completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt;