TerraSync: counter to fetch number of bytes which have been extracted from a tarball file

This commit is contained in:
legoboyvdlp R 2021-02-12 16:49:40 +00:00 committed by Automatic Release Builder
parent 6f9f694eff
commit f030816385
5 changed files with 99 additions and 68 deletions

View File

@ -20,15 +20,16 @@
#include "HTTPRepository.hxx" #include "HTTPRepository.hxx"
#include <iostream>
#include <cassert>
#include <algorithm> #include <algorithm>
#include <sstream> #include <cassert>
#include <cstdlib>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <limits>
#include <map> #include <map>
#include <set> #include <set>
#include <fstream> #include <sstream>
#include <limits>
#include <cstdlib>
#include <fcntl.h> #include <fcntl.h>
@ -143,8 +144,8 @@ class HTTPDirectory
HTTPRepository::EntryType type; HTTPRepository::EntryType type;
std::string name, hash; std::string name, hash;
size_t sizeInBytes = 0; size_t sizeInBytes = 0;
SGPath path; // absolute path on disk SGPath path; // absolute path on disk
}; };
typedef std::vector<ChildInfo> ChildInfoList; typedef std::vector<ChildInfo> ChildInfoList;
@ -218,48 +219,48 @@ public:
return; return;
} }
char* buf = nullptr; char* buf = nullptr;
size_t bufSize = 0; size_t bufSize = 0;
for (auto &child : children) { for (auto& child : children) {
if (child.type != HTTPRepository::FileType) if (child.type != HTTPRepository::FileType)
continue;
if (child.path.exists())
continue;
SGPath cp = _repository->installedCopyPath;
cp.append(relativePath());
cp.append(child.name);
if (!cp.exists()) {
continue;
}
SGBinaryFile src(cp);
SGBinaryFile dst(child.path);
src.open(SG_IO_IN);
dst.open(SG_IO_OUT);
if (bufSize < cp.sizeInBytes()) {
bufSize = cp.sizeInBytes();
free(buf);
buf = (char*)malloc(bufSize);
if (!buf) {
continue; continue;
}
}
if (child.path.exists()) src.read(buf, cp.sizeInBytes());
continue; dst.write(buf, cp.sizeInBytes());
src.close();
dst.close();
SGPath cp = _repository->installedCopyPath; // reset caching
cp.append(relativePath()); child.path.set_cached(false);
cp.append(child.name); child.path.set_cached(true);
if (!cp.exists()) {
continue;
}
SGBinaryFile src(cp); std::string hash = computeHashForPath(child.path);
SGBinaryFile dst(child.path); updatedFileContents(child.path, hash);
src.open(SG_IO_IN);
dst.open(SG_IO_OUT);
if (bufSize < cp.sizeInBytes()) {
bufSize = cp.sizeInBytes();
free(buf);
buf = (char *)malloc(bufSize);
if (!buf) {
continue;
}
}
src.read(buf, cp.sizeInBytes());
dst.write(buf, cp.sizeInBytes());
src.close();
dst.close();
// reset caching
child.path.set_cached(false);
child.path.set_cached(true);
std::string hash = computeHashForPath(child.path);
updatedFileContents(child.path, hash);
} }
free(buf); free(buf);
@ -449,6 +450,7 @@ public:
return; return;
} }
compressedBytes = p.sizeInBytes();
buffer = (uint8_t *)malloc(bufferSize); buffer = (uint8_t *)malloc(bufferSize);
} }
@ -461,6 +463,7 @@ public:
} }
size_t rd = file.read((char*)buffer, bufferSize); size_t rd = file.read((char*)buffer, bufferSize);
repo->bytesExtracted += rd;
extractor.extractBytes(buffer, rd); extractor.extractBytes(buffer, rd);
if (file.eof()) { if (file.eof()) {
@ -487,8 +490,14 @@ public:
return HTTPRepoPrivate::ProcessContinue; return HTTPRepoPrivate::ProcessContinue;
} }
size_t archiveSizeBytes() const
{
return compressedBytes;
}
~ArchiveExtractTask() { free(buffer); } ~ArchiveExtractTask() { free(buffer); }
private: private:
// intentionally small so we extract incrementally on Windows // intentionally small so we extract incrementally on Windows
// where Defender throttles many small files, sorry // where Defender throttles many small files, sorry
@ -500,6 +509,7 @@ public:
uint8_t *buffer = nullptr; uint8_t *buffer = nullptr;
SGBinaryFile file; SGBinaryFile file;
ArchiveExtractor extractor; ArchiveExtractor extractor;
std::size_t compressedBytes;
}; };
using ArchiveExtractTaskPtr = std::shared_ptr<ArchiveExtractTask>; using ArchiveExtractTaskPtr = std::shared_ptr<ArchiveExtractTask>;
@ -556,7 +566,7 @@ public:
auto cb = [t](HTTPRepoPrivate* repo) { auto cb = [t](HTTPRepoPrivate* repo) {
return t->run(repo); return t->run(repo);
}; };
_repository->bytesToExtract += t->archiveSizeBytes();
_repository->addTask(cb); _repository->addTask(cb);
} else { } else {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Unable to remove old file/directory " << removePath); SG_LOG(SG_TERRASYNC, SG_ALERT, "Unable to remove old file/directory " << removePath);
@ -952,6 +962,11 @@ size_t HTTPRepository::bytesDownloaded() const
return result; return result;
} }
size_t HTTPRepository::bytesToExtract() const
{
return _d->bytesToExtract - _d->bytesExtracted;
}
void HTTPRepository::setInstalledCopyPath(const SGPath& copyPath) void HTTPRepository::setInstalledCopyPath(const SGPath& copyPath)
{ {
_d->installedCopyPath = copyPath; _d->installedCopyPath = copyPath;
@ -1308,7 +1323,7 @@ HTTPRepository::failure() const
} }
Dir dir(absPath); Dir dir(absPath);
bool result = dir.remove(true); bool result = dir.remove(true);
return result; return result;
} }

View File

@ -73,6 +73,8 @@ public:
virtual size_t bytesDownloaded() const; virtual size_t bytesDownloaded() const;
virtual size_t bytesToExtract() const;
/** /**
* optionally provide the location of an installer copy of this * optionally provide the location of an installer copy of this
* repository. When a file is missing it will be copied from this tree. * repository. When a file is missing it will be copied from this tree.

View File

@ -60,22 +60,23 @@ public:
HTTPRepository::FailureVec failures; HTTPRepository::FailureVec failures;
int maxPermittedFailures = 16; int maxPermittedFailures = 16;
HTTPRepoPrivate(HTTPRepository *parent) HTTPRepoPrivate(HTTPRepository* parent)
: p(parent), isUpdating(false), status(HTTPRepository::REPO_NO_ERROR), : p(parent)
totalDownloaded(0) { {
;
} }
~HTTPRepoPrivate(); ~HTTPRepoPrivate();
HTTPRepository *p; // link back to outer HTTPRepository *p; // link back to outer
HTTP::Client *http; HTTP::Client* http = nullptr;
std::string baseUrl; std::string baseUrl;
SGPath basePath; SGPath basePath;
bool isUpdating; bool isUpdating = false;
HTTPRepository::ResultCode status; HTTPRepository::ResultCode status = HTTPRepository::REPO_NO_ERROR;
HTTPDirectory_ptr rootDir; HTTPDirectory_ptr rootDir;
size_t totalDownloaded; size_t totalDownloaded = 0;
size_t bytesToExtract = 0;
size_t bytesExtracted = 0;
HTTPRepository::SyncPredicate syncPredicate; HTTPRepository::SyncPredicate syncPredicate;
HTTP::Request_ptr updateFile(HTTPDirectory *dir, const std::string &name, HTTP::Request_ptr updateFile(HTTPDirectory *dir, const std::string &name,

View File

@ -165,6 +165,7 @@ public:
SGTimeStamp stamp; SGTimeStamp stamp;
bool busy = false; ///< is the slot working or idle bool busy = false; ///< is the slot working or idle
unsigned int pendingKBytes = 0; unsigned int pendingKBytes = 0;
unsigned int pendingExtractKBytes = 0;
unsigned int nextWarnTimeout = 0; unsigned int nextWarnTimeout = 0;
}; };
@ -197,18 +198,18 @@ static unsigned int syncSlotForType(SyncItem::Type ty)
struct TerrasyncThreadState struct TerrasyncThreadState
{ {
TerrasyncThreadState() : TerrasyncThreadState() : _busy(false),
_busy(false), _stalled(false),
_stalled(false), _hasServer(false),
_hasServer(false), _fail_count(0),
_fail_count(0), _updated_tile_count(0),
_updated_tile_count(0), _success_count(0),
_success_count(0), _consecutive_errors(0),
_consecutive_errors(0), _cache_hits(0),
_cache_hits(0), _transfer_rate(0),
_transfer_rate(0), _total_kb_downloaded(0),
_total_kb_downloaded(0), _totalKbPending(0),
_totalKbPending(0) _extractTotalKbPending(0)
{} {}
bool _busy; bool _busy;
@ -223,6 +224,7 @@ struct TerrasyncThreadState
// kbytes, not bytes, because bytes might overflow 2^31 // kbytes, not bytes, because bytes might overflow 2^31
int _total_kb_downloaded; int _total_kb_downloaded;
unsigned int _totalKbPending; unsigned int _totalKbPending;
unsigned int _extractTotalKbPending;
}; };
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -582,6 +584,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
#endif #endif
// convert bytes to kbytes here // convert bytes to kbytes here
slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10); slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10);
slot.pendingExtractKBytes = (slot.repository->bytesToExtract() >> 10);
return; // easy, still working return; // easy, still working
} }
@ -611,6 +614,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
slot.busy = false; slot.busy = false;
slot.repository.reset(); slot.repository.reset();
slot.pendingKBytes = 0; slot.pendingKBytes = 0;
slot.pendingExtractKBytes = 0;
slot.currentItem = {}; slot.currentItem = {};
} }
@ -646,7 +650,8 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
slot.nextWarnTimeout = 30 * 1000; slot.nextWarnTimeout = 30 * 1000;
slot.stamp.stamp(); slot.stamp.stamp();
slot.busy = true; slot.busy = true;
slot.pendingKBytes = slot.repository->bytesToDownload(); slot.pendingKBytes = slot.repository->bytesToDownload() >> 10;
slot.pendingExtractKBytes = slot.repository->bytesToExtract() >> 10;
SG_LOG(SG_TERRASYNC, SG_INFO, "sync of " << slot.repository->baseUrl() << " started, queue size is " << slot.queue.size()); SG_LOG(SG_TERRASYNC, SG_INFO, "sync of " << slot.repository->baseUrl() << " started, queue size is " << slot.queue.size());
} }
@ -791,20 +796,24 @@ void SGTerraSync::WorkerThread::runInternal()
bool anySlotBusy = false; bool anySlotBusy = false;
unsigned int newPendingCount = 0; unsigned int newPendingCount = 0;
unsigned int newExtractCount = 0; // how much is left to extract
// update each sync slot in turn // update each sync slot in turn
for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) { for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) {
updateSyncSlot(_syncSlots[slot]); updateSyncSlot(_syncSlots[slot]);
newPendingCount += _syncSlots[slot].pendingKBytes; newPendingCount += _syncSlots[slot].pendingKBytes;
newExtractCount += _syncSlots[slot].pendingExtractKBytes;
anySlotBusy |= _syncSlots[slot].busy; anySlotBusy |= _syncSlots[slot].busy;
} }
{ {
std::lock_guard<std::mutex> g(_stateLock); std::lock_guard<std::mutex> g(_stateLock);
_state._totalKbPending = newPendingCount; // approximately atomic update _state._totalKbPending = newPendingCount; // approximately atomic update
_state._extractTotalKbPending = newExtractCount;
_state._busy = anySlotBusy; _state._busy = anySlotBusy;
} }
if (!anySlotBusy) { if (!anySlotBusy) {
// wait on the blocking deque here, otherwise we spin // wait on the blocking deque here, otherwise we spin
// the loop very fast, since _http::update with no connections // the loop very fast, since _http::update with no connections
@ -1130,6 +1139,7 @@ void SGTerraSync::bind()
_transferRateBytesSecNode = _terraRoot->getNode("transfer-rate-bytes-sec", true); _transferRateBytesSecNode = _terraRoot->getNode("transfer-rate-bytes-sec", true);
_pendingKbytesNode = _terraRoot->getNode("pending-kbytes", true); _pendingKbytesNode = _terraRoot->getNode("pending-kbytes", true);
_downloadedKBtesNode = _terraRoot->getNode("downloaded-kbytes", true); _downloadedKBtesNode = _terraRoot->getNode("downloaded-kbytes", true);
_extractPendingKbytesNode = _terraRoot->getNode("extract-pending-kbytes", true);
_enabledNode = _terraRoot->getNode("enabled", true); _enabledNode = _terraRoot->getNode("enabled", true);
_availableNode = _terraRoot->getNode("available", true); _availableNode = _terraRoot->getNode("available", true);
_maxErrorsNode = _terraRoot->getNode("max-errors", true); _maxErrorsNode = _terraRoot->getNode("max-errors", true);
@ -1155,6 +1165,7 @@ void SGTerraSync::unbind()
_transferRateBytesSecNode.clear(); _transferRateBytesSecNode.clear();
_pendingKbytesNode.clear(); _pendingKbytesNode.clear();
_downloadedKBtesNode.clear(); _downloadedKBtesNode.clear();
_extractPendingKbytesNode.clear();
_enabledNode.clear(); _enabledNode.clear();
_availableNode.clear(); _availableNode.clear();
_maxErrorsNode.clear(); _maxErrorsNode.clear();
@ -1191,6 +1202,7 @@ void SGTerraSync::update(double)
_transferRateBytesSecNode->setIntValue(copiedState._transfer_rate); _transferRateBytesSecNode->setIntValue(copiedState._transfer_rate);
_pendingKbytesNode->setIntValue(copiedState._totalKbPending); _pendingKbytesNode->setIntValue(copiedState._totalKbPending);
_downloadedKBtesNode->setIntValue(copiedState._total_kb_downloaded); _downloadedKBtesNode->setIntValue(copiedState._total_kb_downloaded);
_extractPendingKbytesNode->setIntValue(copiedState._extractTotalKbPending);
_stalledNode->setBoolValue(_workerThread->isStalled()); _stalledNode->setBoolValue(_workerThread->isStalled());
_activeNode->setBoolValue(worker_running); _activeNode->setBoolValue(worker_running);

View File

@ -107,6 +107,7 @@ private:
SGPropertyNode_ptr _transferRateBytesSecNode; SGPropertyNode_ptr _transferRateBytesSecNode;
SGPropertyNode_ptr _pendingKbytesNode; SGPropertyNode_ptr _pendingKbytesNode;
SGPropertyNode_ptr _downloadedKBtesNode; SGPropertyNode_ptr _downloadedKBtesNode;
SGPropertyNode_ptr _extractPendingKbytesNode;
SGPropertyNode_ptr _maxErrorsNode; SGPropertyNode_ptr _maxErrorsNode;
// we manually bind+init TerraSync during early startup // we manually bind+init TerraSync during early startup