| Index: multi_http_fetcher.h
|
| diff --git a/multi_http_fetcher.h b/multi_http_fetcher.h
|
| index 692054dded1f790fd6ebad1f940e4aa4139f515e..533998b033b74d287943f959d9d30b787f13fb08 100644
|
| --- a/multi_http_fetcher.h
|
| +++ b/multi_http_fetcher.h
|
| @@ -10,6 +10,7 @@
|
| #include <vector>
|
|
|
| #include "update_engine/http_fetcher.h"
|
| +#include "update_engine/utils.h"
|
|
|
| // This class is a simple wrapper around an HttpFetcher. The client
|
| // specifies a vector of byte ranges. MultiHttpFetcher will fetch bytes
|
| @@ -26,6 +27,7 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate {
|
|
|
| MultiHttpFetcher()
|
| : sent_transfer_complete_(false),
|
| + pending_next_fetcher_(false),
|
| current_index_(0),
|
| bytes_received_this_fetcher_(0) {}
|
| ~MultiHttpFetcher() {}
|
| @@ -56,11 +58,41 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate {
|
| StartTransfer();
|
| }
|
|
|
| - void TerminateTransfer() {
|
| - if (current_index_ < fetchers_.size())
|
| - fetchers_[current_index_]->TerminateTransfer();
|
| + void TransferTerminated(HttpFetcher* fetcher) {
|
| + LOG(INFO) << "Received transfer terminated.";
|
| + if (pending_next_fetcher_) {
|
| + pending_next_fetcher_ = false;
|
| + if (++current_index_ >= ranges_.size()) {
|
| + SendTransferComplete(fetcher, true);
|
| + } else {
|
| + StartTransfer();
|
| + }
|
| + return;
|
| + }
|
| current_index_ = ranges_.size();
|
| sent_transfer_complete_ = true; // a fib
|
| + if (delegate_) {
|
| + // Note that after the callback returns this object may be destroyed.
|
| + delegate_->TransferTerminated(this);
|
| + }
|
| + }
|
| +
|
| + void TerminateTransfer() {
|
| + // If the current fetcher is already being terminated, just wait for its
|
| + // TransferTerminated callback.
|
| + if (pending_next_fetcher_) {
|
| + pending_next_fetcher_ = false;
|
| + return;
|
| + }
|
| + // If there's a current active fetcher terminate it and wait for its
|
| + // TransferTerminated callback.
|
| + if (current_index_ < fetchers_.size()) {
|
| + fetchers_[current_index_]->TerminateTransfer();
|
| + return;
|
| + }
|
| + // Transfer is terminated before it got started and before any ranges were
|
| + // added.
|
| + TransferTerminated(this);
|
| }
|
|
|
| void Pause() {
|
| @@ -127,15 +159,10 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate {
|
| fetchers_[current_index_]->BeginTransfer(url_);
|
| }
|
|
|
| - void ReceivedBytes(HttpFetcher* fetcher,
|
| - const char* bytes,
|
| - int length) {
|
| - if (current_index_ >= ranges_.size())
|
| - return;
|
| - if (fetcher != fetchers_[current_index_].get()) {
|
| - LOG(WARNING) << "Received bytes from invalid fetcher";
|
| - return;
|
| - }
|
| + void ReceivedBytes(HttpFetcher* fetcher, const char* bytes, int length) {
|
| + TEST_AND_RETURN(current_index_ < ranges_.size());
|
| + TEST_AND_RETURN(fetcher == fetchers_[current_index_].get());
|
| + TEST_AND_RETURN(!pending_next_fetcher_);
|
| off_t next_size = length;
|
| if (ranges_[current_index_].second >= 0) {
|
| next_size = std::min(next_size,
|
| @@ -149,23 +176,22 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate {
|
| bytes_received_this_fetcher_ += length;
|
| if (ranges_[current_index_].second >= 0 &&
|
| bytes_received_this_fetcher_ >= ranges_[current_index_].second) {
|
| - fetchers_[current_index_]->TerminateTransfer();
|
| - current_index_++;
|
| - if (current_index_ == ranges_.size()) {
|
| - SendTransferComplete(fetchers_[current_index_ - 1].get(), true);
|
| - } else {
|
| - StartTransfer();
|
| - }
|
| + // Terminates the current fetcher. Waits for its TransferTerminated
|
| + // callback before starting the next fetcher so that we don't end up
|
| + // signalling the delegate that the whole multi-transfer is complete
|
| + // before all fetchers are really done and cleaned up.
|
| + pending_next_fetcher_ = true;
|
| + fetcher->TerminateTransfer();
|
| }
|
| }
|
|
|
| void TransferComplete(HttpFetcher* fetcher, bool successful) {
|
| - LOG(INFO) << "Received transfer complete";
|
| + TEST_AND_RETURN(!pending_next_fetcher_);
|
| + LOG(INFO) << "Received transfer complete.";
|
| if (current_index_ >= ranges_.size()) {
|
| SendTransferComplete(fetcher, true);
|
| return;
|
| }
|
| -
|
| if (ranges_[current_index_].second < 0) {
|
| // We're done with the current operation
|
| current_index_++;
|
| @@ -177,27 +203,28 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate {
|
| }
|
| return;
|
| }
|
| -
|
| if (bytes_received_this_fetcher_ < ranges_[current_index_].second) {
|
| LOG(WARNING) << "Received insufficient bytes from fetcher. "
|
| << "Ending early";
|
| SendTransferComplete(fetcher, false);
|
| return;
|
| - } else {
|
| - LOG(INFO) << "Got spurious TransferComplete. Ingoring.";
|
| }
|
| + LOG(INFO) << "Got spurious TransferComplete. Ignoring.";
|
| }
|
|
|
| // If true, do not send any more data or TransferComplete to the delegate.
|
| bool sent_transfer_complete_;
|
|
|
| + // If true, the next fetcher needs to be started when TransferTerminated is
|
| + // received from the current fetcher.
|
| + bool pending_next_fetcher_;
|
| +
|
| RangesVect ranges_;
|
| std::vector<std::tr1::shared_ptr<BaseHttpFetcher> > fetchers_;
|
|
|
| RangesVect::size_type current_index_; // index into ranges_, fetchers_
|
| off_t bytes_received_this_fetcher_;
|
|
|
| - private:
|
| DISALLOW_COPY_AND_ASSIGN(MultiHttpFetcher);
|
| };
|
|
|
|
|