From f030816385237b75df9ed325d4ca210ff002ca92 Mon Sep 17 00:00:00 2001 From: legoboyvdlp R Date: Fri, 12 Feb 2021 16:49:40 +0000 Subject: [PATCH] TerraSync: counter to fetch number of bytes which have been extracted from a tarball file --- simgear/io/HTTPRepository.cxx | 109 +++++++++++++++----------- simgear/io/HTTPRepository.hxx | 2 + simgear/io/HTTPRepository_private.hxx | 17 ++-- simgear/scene/tsync/terrasync.cxx | 38 ++++++--- simgear/scene/tsync/terrasync.hxx | 1 + 5 files changed, 99 insertions(+), 68 deletions(-) diff --git a/simgear/io/HTTPRepository.cxx b/simgear/io/HTTPRepository.cxx index 1b91b738..2d53a9b3 100644 --- a/simgear/io/HTTPRepository.cxx +++ b/simgear/io/HTTPRepository.cxx @@ -20,15 +20,16 @@ #include "HTTPRepository.hxx" -#include -#include #include -#include +#include +#include +#include +#include +#include +#include #include #include -#include -#include -#include +#include #include @@ -143,8 +144,8 @@ class HTTPDirectory HTTPRepository::EntryType type; std::string name, hash; - size_t sizeInBytes = 0; - SGPath path; // absolute path on disk + size_t sizeInBytes = 0; + SGPath path; // absolute path on disk }; typedef std::vector ChildInfoList; @@ -218,48 +219,48 @@ public: return; } - char* buf = nullptr; - size_t bufSize = 0; + char* buf = nullptr; + size_t bufSize = 0; - for (auto &child : children) { - if (child.type != HTTPRepository::FileType) + for (auto& child : children) { + if (child.type != HTTPRepository::FileType) + continue; + + if (child.path.exists()) + continue; + + SGPath cp = _repository->installedCopyPath; + cp.append(relativePath()); + cp.append(child.name); + if (!cp.exists()) { + continue; + } + + SGBinaryFile src(cp); + SGBinaryFile dst(child.path); + src.open(SG_IO_IN); + dst.open(SG_IO_OUT); + + if (bufSize < cp.sizeInBytes()) { + bufSize = cp.sizeInBytes(); + free(buf); + buf = (char*)malloc(bufSize); + if (!buf) { continue; + } + } - if (child.path.exists()) - continue; + src.read(buf, cp.sizeInBytes()); + dst.write(buf, cp.sizeInBytes()); + src.close(); + dst.close(); - SGPath cp = _repository->installedCopyPath; - cp.append(relativePath()); - cp.append(child.name); - if (!cp.exists()) { - continue; - } + // reset caching + child.path.set_cached(false); + child.path.set_cached(true); - SGBinaryFile src(cp); - SGBinaryFile dst(child.path); - src.open(SG_IO_IN); - dst.open(SG_IO_OUT); - - if (bufSize < cp.sizeInBytes()) { - bufSize = cp.sizeInBytes(); - free(buf); - buf = (char *)malloc(bufSize); - if (!buf) { - continue; - } - } - - src.read(buf, cp.sizeInBytes()); - dst.write(buf, cp.sizeInBytes()); - src.close(); - dst.close(); - - // reset caching - child.path.set_cached(false); - child.path.set_cached(true); - - std::string hash = computeHashForPath(child.path); - updatedFileContents(child.path, hash); + std::string hash = computeHashForPath(child.path); + updatedFileContents(child.path, hash); } free(buf); @@ -449,6 +450,7 @@ public: return; } + compressedBytes = p.sizeInBytes(); buffer = (uint8_t *)malloc(bufferSize); } @@ -461,6 +463,7 @@ public: } size_t rd = file.read((char*)buffer, bufferSize); + repo->bytesExtracted += rd; extractor.extractBytes(buffer, rd); if (file.eof()) { @@ -487,8 +490,14 @@ public: return HTTPRepoPrivate::ProcessContinue; } + size_t archiveSizeBytes() const + { + return compressedBytes; + } + ~ArchiveExtractTask() { free(buffer); } + private: // intentionally small so we extract incrementally on Windows // where Defender throttles many small files, sorry @@ -500,6 +509,7 @@ public: uint8_t *buffer = nullptr; SGBinaryFile file; ArchiveExtractor extractor; + std::size_t compressedBytes; }; using ArchiveExtractTaskPtr = std::shared_ptr; @@ -556,7 +566,7 @@ public: auto cb = [t](HTTPRepoPrivate* repo) { return t->run(repo); }; - + _repository->bytesToExtract += t->archiveSizeBytes(); _repository->addTask(cb); } else { SG_LOG(SG_TERRASYNC, SG_ALERT, "Unable to remove old file/directory " << removePath); @@ -952,6 +962,11 @@ size_t HTTPRepository::bytesDownloaded() const return result; } +size_t HTTPRepository::bytesToExtract() const +{ + return _d->bytesToExtract - _d->bytesExtracted; +} + void HTTPRepository::setInstalledCopyPath(const SGPath& copyPath) { _d->installedCopyPath = copyPath; @@ -1308,7 +1323,7 @@ HTTPRepository::failure() const } Dir dir(absPath); - bool result = dir.remove(true); + bool result = dir.remove(true); return result; } diff --git a/simgear/io/HTTPRepository.hxx b/simgear/io/HTTPRepository.hxx index 6f087962..bcb26c06 100644 --- a/simgear/io/HTTPRepository.hxx +++ b/simgear/io/HTTPRepository.hxx @@ -73,6 +73,8 @@ public: virtual size_t bytesDownloaded() const; + virtual size_t bytesToExtract() const; + /** * optionally provide the location of an installer copy of this * repository. When a file is missing it will be copied from this tree. diff --git a/simgear/io/HTTPRepository_private.hxx b/simgear/io/HTTPRepository_private.hxx index 56ab28a5..09f6f888 100644 --- a/simgear/io/HTTPRepository_private.hxx +++ b/simgear/io/HTTPRepository_private.hxx @@ -60,22 +60,23 @@ public: HTTPRepository::FailureVec failures; int maxPermittedFailures = 16; - HTTPRepoPrivate(HTTPRepository *parent) - : p(parent), isUpdating(false), status(HTTPRepository::REPO_NO_ERROR), - totalDownloaded(0) { - ; + HTTPRepoPrivate(HTTPRepository* parent) + : p(parent) + { } ~HTTPRepoPrivate(); HTTPRepository *p; // link back to outer - HTTP::Client *http; + HTTP::Client* http = nullptr; std::string baseUrl; SGPath basePath; - bool isUpdating; - HTTPRepository::ResultCode status; + bool isUpdating = false; + HTTPRepository::ResultCode status = HTTPRepository::REPO_NO_ERROR; HTTPDirectory_ptr rootDir; - size_t totalDownloaded; + size_t totalDownloaded = 0; + size_t bytesToExtract = 0; + size_t bytesExtracted = 0; HTTPRepository::SyncPredicate syncPredicate; HTTP::Request_ptr updateFile(HTTPDirectory *dir, const std::string &name, diff --git a/simgear/scene/tsync/terrasync.cxx b/simgear/scene/tsync/terrasync.cxx index e93f168a..d3f9edd2 100644 --- a/simgear/scene/tsync/terrasync.cxx +++ b/simgear/scene/tsync/terrasync.cxx @@ -165,6 +165,7 @@ public: SGTimeStamp stamp; bool busy = false; ///< is the slot working or idle unsigned int pendingKBytes = 0; + unsigned int pendingExtractKBytes = 0; unsigned int nextWarnTimeout = 0; }; @@ -197,18 +198,18 @@ static unsigned int syncSlotForType(SyncItem::Type ty) struct TerrasyncThreadState { - TerrasyncThreadState() : - _busy(false), - _stalled(false), - _hasServer(false), - _fail_count(0), - _updated_tile_count(0), - _success_count(0), - _consecutive_errors(0), - _cache_hits(0), - _transfer_rate(0), - _total_kb_downloaded(0), - _totalKbPending(0) + TerrasyncThreadState() : _busy(false), + _stalled(false), + _hasServer(false), + _fail_count(0), + _updated_tile_count(0), + _success_count(0), + _consecutive_errors(0), + _cache_hits(0), + _transfer_rate(0), + _total_kb_downloaded(0), + _totalKbPending(0), + _extractTotalKbPending(0) {} bool _busy; @@ -223,6 +224,7 @@ struct TerrasyncThreadState // kbytes, not bytes, because bytes might overflow 2^31 int _total_kb_downloaded; unsigned int _totalKbPending; + unsigned int _extractTotalKbPending; }; /////////////////////////////////////////////////////////////////////////////// @@ -582,6 +584,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) #endif // convert bytes to kbytes here slot.pendingKBytes = (slot.repository->bytesToDownload() >> 10); + slot.pendingExtractKBytes = (slot.repository->bytesToExtract() >> 10); return; // easy, still working } @@ -611,6 +614,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) slot.busy = false; slot.repository.reset(); slot.pendingKBytes = 0; + slot.pendingExtractKBytes = 0; slot.currentItem = {}; } @@ -646,7 +650,8 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) slot.nextWarnTimeout = 30 * 1000; slot.stamp.stamp(); slot.busy = true; - slot.pendingKBytes = slot.repository->bytesToDownload(); + slot.pendingKBytes = slot.repository->bytesToDownload() >> 10; + slot.pendingExtractKBytes = slot.repository->bytesToExtract() >> 10; SG_LOG(SG_TERRASYNC, SG_INFO, "sync of " << slot.repository->baseUrl() << " started, queue size is " << slot.queue.size()); } @@ -791,20 +796,24 @@ void SGTerraSync::WorkerThread::runInternal() bool anySlotBusy = false; unsigned int newPendingCount = 0; + unsigned int newExtractCount = 0; // how much is left to extract // update each sync slot in turn for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) { updateSyncSlot(_syncSlots[slot]); newPendingCount += _syncSlots[slot].pendingKBytes; + newExtractCount += _syncSlots[slot].pendingExtractKBytes; anySlotBusy |= _syncSlots[slot].busy; } { std::lock_guard g(_stateLock); _state._totalKbPending = newPendingCount; // approximately atomic update + _state._extractTotalKbPending = newExtractCount; _state._busy = anySlotBusy; } + if (!anySlotBusy) { // wait on the blocking deque here, otherwise we spin // the loop very fast, since _http::update with no connections @@ -1130,6 +1139,7 @@ void SGTerraSync::bind() _transferRateBytesSecNode = _terraRoot->getNode("transfer-rate-bytes-sec", true); _pendingKbytesNode = _terraRoot->getNode("pending-kbytes", true); _downloadedKBtesNode = _terraRoot->getNode("downloaded-kbytes", true); + _extractPendingKbytesNode = _terraRoot->getNode("extract-pending-kbytes", true); _enabledNode = _terraRoot->getNode("enabled", true); _availableNode = _terraRoot->getNode("available", true); _maxErrorsNode = _terraRoot->getNode("max-errors", true); @@ -1155,6 +1165,7 @@ void SGTerraSync::unbind() _transferRateBytesSecNode.clear(); _pendingKbytesNode.clear(); _downloadedKBtesNode.clear(); + _extractPendingKbytesNode.clear(); _enabledNode.clear(); _availableNode.clear(); _maxErrorsNode.clear(); @@ -1191,6 +1202,7 @@ void SGTerraSync::update(double) _transferRateBytesSecNode->setIntValue(copiedState._transfer_rate); _pendingKbytesNode->setIntValue(copiedState._totalKbPending); _downloadedKBtesNode->setIntValue(copiedState._total_kb_downloaded); + _extractPendingKbytesNode->setIntValue(copiedState._extractTotalKbPending); _stalledNode->setBoolValue(_workerThread->isStalled()); _activeNode->setBoolValue(worker_running); diff --git a/simgear/scene/tsync/terrasync.hxx b/simgear/scene/tsync/terrasync.hxx index 0b55eed3..6cd0b107 100644 --- a/simgear/scene/tsync/terrasync.hxx +++ b/simgear/scene/tsync/terrasync.hxx @@ -107,6 +107,7 @@ private: SGPropertyNode_ptr _transferRateBytesSecNode; SGPropertyNode_ptr _pendingKbytesNode; SGPropertyNode_ptr _downloadedKBtesNode; + SGPropertyNode_ptr _extractPendingKbytesNode; SGPropertyNode_ptr _maxErrorsNode; // we manually bind+init TerraSync during early startup