Index: multi_http_fetcher.h |
diff --git a/multi_http_fetcher.h b/multi_http_fetcher.h |
index 692054dded1f790fd6ebad1f940e4aa4139f515e..533998b033b74d287943f959d9d30b787f13fb08 100644 |
--- a/multi_http_fetcher.h |
+++ b/multi_http_fetcher.h |
@@ -10,6 +10,7 @@ |
#include <vector> |
#include "update_engine/http_fetcher.h" |
+#include "update_engine/utils.h" |
// This class is a simple wrapper around an HttpFetcher. The client |
// specifies a vector of byte ranges. MultiHttpFetcher will fetch bytes |
@@ -26,6 +27,7 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { |
MultiHttpFetcher() |
: sent_transfer_complete_(false), |
+ pending_next_fetcher_(false), |
current_index_(0), |
bytes_received_this_fetcher_(0) {} |
~MultiHttpFetcher() {} |
@@ -56,11 +58,41 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { |
StartTransfer(); |
} |
- void TerminateTransfer() { |
- if (current_index_ < fetchers_.size()) |
- fetchers_[current_index_]->TerminateTransfer(); |
+ void TransferTerminated(HttpFetcher* fetcher) { |
+ LOG(INFO) << "Received transfer terminated."; |
+ if (pending_next_fetcher_) { |
+ pending_next_fetcher_ = false; |
+ if (++current_index_ >= ranges_.size()) { |
+ SendTransferComplete(fetcher, true); |
+ } else { |
+ StartTransfer(); |
+ } |
+ return; |
+ } |
current_index_ = ranges_.size(); |
sent_transfer_complete_ = true; // a fib |
+ if (delegate_) { |
+ // Note that after the callback returns this object may be destroyed. |
+ delegate_->TransferTerminated(this); |
+ } |
+ } |
+ |
+ void TerminateTransfer() { |
+ // If the current fetcher is already being terminated, just wait for its |
+ // TransferTerminated callback. |
+ if (pending_next_fetcher_) { |
+ pending_next_fetcher_ = false; |
+ return; |
+ } |
+ // If there's a current active fetcher terminate it and wait for its |
+ // TransferTerminated callback. |
+ if (current_index_ < fetchers_.size()) { |
+ fetchers_[current_index_]->TerminateTransfer(); |
+ return; |
+ } |
+ // Transfer is terminated before it got started and before any ranges were |
+ // added. |
+ TransferTerminated(this); |
} |
void Pause() { |
@@ -127,15 +159,10 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { |
fetchers_[current_index_]->BeginTransfer(url_); |
} |
- void ReceivedBytes(HttpFetcher* fetcher, |
- const char* bytes, |
- int length) { |
- if (current_index_ >= ranges_.size()) |
- return; |
- if (fetcher != fetchers_[current_index_].get()) { |
- LOG(WARNING) << "Received bytes from invalid fetcher"; |
- return; |
- } |
+ void ReceivedBytes(HttpFetcher* fetcher, const char* bytes, int length) { |
+ TEST_AND_RETURN(current_index_ < ranges_.size()); |
+ TEST_AND_RETURN(fetcher == fetchers_[current_index_].get()); |
+ TEST_AND_RETURN(!pending_next_fetcher_); |
off_t next_size = length; |
if (ranges_[current_index_].second >= 0) { |
next_size = std::min(next_size, |
@@ -149,23 +176,22 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { |
bytes_received_this_fetcher_ += length; |
if (ranges_[current_index_].second >= 0 && |
bytes_received_this_fetcher_ >= ranges_[current_index_].second) { |
- fetchers_[current_index_]->TerminateTransfer(); |
- current_index_++; |
- if (current_index_ == ranges_.size()) { |
- SendTransferComplete(fetchers_[current_index_ - 1].get(), true); |
- } else { |
- StartTransfer(); |
- } |
+ // Terminates the current fetcher. Waits for its TransferTerminated |
+ // callback before starting the next fetcher so that we don't end up |
+ // signalling the delegate that the whole multi-transfer is complete |
+ // before all fetchers are really done and cleaned up. |
+ pending_next_fetcher_ = true; |
+ fetcher->TerminateTransfer(); |
} |
} |
void TransferComplete(HttpFetcher* fetcher, bool successful) { |
- LOG(INFO) << "Received transfer complete"; |
+ TEST_AND_RETURN(!pending_next_fetcher_); |
+ LOG(INFO) << "Received transfer complete."; |
if (current_index_ >= ranges_.size()) { |
SendTransferComplete(fetcher, true); |
return; |
} |
- |
if (ranges_[current_index_].second < 0) { |
// We're done with the current operation |
current_index_++; |
@@ -177,27 +203,28 @@ class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { |
} |
return; |
} |
- |
if (bytes_received_this_fetcher_ < ranges_[current_index_].second) { |
LOG(WARNING) << "Received insufficient bytes from fetcher. " |
<< "Ending early"; |
SendTransferComplete(fetcher, false); |
return; |
- } else { |
- LOG(INFO) << "Got spurious TransferComplete. Ingoring."; |
} |
+ LOG(INFO) << "Got spurious TransferComplete. Ignoring."; |
} |
// If true, do not send any more data or TransferComplete to the delegate. |
bool sent_transfer_complete_; |
+ // If true, the next fetcher needs to be started when TransferTerminated is |
+ // received from the current fetcher. |
+ bool pending_next_fetcher_; |
+ |
RangesVect ranges_; |
std::vector<std::tr1::shared_ptr<BaseHttpFetcher> > fetchers_; |
RangesVect::size_type current_index_; // index into ranges_, fetchers_ |
off_t bytes_received_this_fetcher_; |
- private: |
DISALLOW_COPY_AND_ASSIGN(MultiHttpFetcher); |
}; |