| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #ifndef CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
| 6 #define CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
| 7 | |
| 8 #include <deque> | |
| 9 #include <tr1/memory> | |
| 10 #include <utility> | |
| 11 #include <vector> | |
| 12 | |
| 13 #include "update_engine/http_fetcher.h" | |
| 14 #include "update_engine/utils.h" | |
| 15 | |
| 16 // This class is a simple wrapper around an HttpFetcher. The client | |
| 17 // specifies a vector of byte ranges. MultiHttpFetcher will fetch bytes | |
| 18 // from those offsets. Pass -1 as a length to specify unlimited length. | |
| 19 // It really only would make sense for the last range specified to have | |
| 20 // unlimited length. | |
| 21 | |
| 22 namespace chromeos_update_engine { | |
| 23 | |
| 24 template<typename BaseHttpFetcher> | |
| 25 class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { | |
| 26 public: | |
| 27 typedef std::vector<std::pair<off_t, off_t> > RangesVect; | |
| 28 typedef std::vector<std::tr1::shared_ptr<BaseHttpFetcher> > FetchersVect; | |
| 29 | |
| 30 MultiHttpFetcher(ProxyResolver* proxy_resolver) | |
| 31 : HttpFetcher(proxy_resolver), | |
| 32 sent_transfer_complete_(false), | |
| 33 pending_next_fetcher_(false), | |
| 34 current_index_(0), | |
| 35 bytes_received_this_fetcher_(0) {} | |
| 36 ~MultiHttpFetcher() {} | |
| 37 | |
| 38 void set_ranges(const RangesVect& ranges) { | |
| 39 ranges_ = ranges; | |
| 40 fetchers_.resize(ranges_.size()); // Allocate the fetchers | |
| 41 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> | |
| 42 >::iterator it = fetchers_.begin(), e = fetchers_.end(); | |
| 43 it != e; ++it) { | |
| 44 (*it) = std::tr1::shared_ptr<BaseHttpFetcher>( | |
| 45 new BaseHttpFetcher(proxy_resolver_)); | |
| 46 (*it)->set_delegate(this); | |
| 47 } | |
| 48 LOG(INFO) << "done w/ list"; | |
| 49 } | |
| 50 | |
| 51 void SetOffset(off_t offset) {} // for now, doesn't support this | |
| 52 | |
| 53 // Begins the transfer to the specified URL. | |
| 54 void BeginTransfer(const std::string& url) { | |
| 55 url_ = url; | |
| 56 if (ranges_.empty()) { | |
| 57 if (delegate_) | |
| 58 delegate_->TransferComplete(this, true); | |
| 59 return; | |
| 60 } | |
| 61 current_index_ = 0; | |
| 62 LOG(INFO) << "starting first transfer"; | |
| 63 StartTransfer(); | |
| 64 } | |
| 65 | |
| 66 void TransferTerminated(HttpFetcher* fetcher) { | |
| 67 LOG(INFO) << "Received transfer terminated."; | |
| 68 if (pending_next_fetcher_) { | |
| 69 pending_next_fetcher_ = false; | |
| 70 if (++current_index_ >= ranges_.size()) { | |
| 71 SendTransferComplete(fetcher, true); | |
| 72 } else { | |
| 73 StartTransfer(); | |
| 74 } | |
| 75 return; | |
| 76 } | |
| 77 current_index_ = ranges_.size(); | |
| 78 sent_transfer_complete_ = true; // a fib | |
| 79 if (delegate_) { | |
| 80 // Note that after the callback returns this object may be destroyed. | |
| 81 delegate_->TransferTerminated(this); | |
| 82 } | |
| 83 } | |
| 84 | |
| 85 void TerminateTransfer() { | |
| 86 // If the current fetcher is already being terminated, just wait for its | |
| 87 // TransferTerminated callback. | |
| 88 if (pending_next_fetcher_) { | |
| 89 pending_next_fetcher_ = false; | |
| 90 return; | |
| 91 } | |
| 92 // If there's a current active fetcher terminate it and wait for its | |
| 93 // TransferTerminated callback. | |
| 94 if (current_index_ < fetchers_.size()) { | |
| 95 fetchers_[current_index_]->TerminateTransfer(); | |
| 96 return; | |
| 97 } | |
| 98 // Transfer is terminated before it got started and before any ranges were | |
| 99 // added. | |
| 100 TransferTerminated(this); | |
| 101 } | |
| 102 | |
| 103 void Pause() { | |
| 104 if (current_index_ < fetchers_.size()) | |
| 105 fetchers_[current_index_]->Pause(); | |
| 106 } | |
| 107 | |
| 108 void Unpause() { | |
| 109 if (current_index_ < fetchers_.size()) | |
| 110 fetchers_[current_index_]->Unpause(); | |
| 111 } | |
| 112 | |
| 113 // These functions are overloaded in LibcurlHttp fetcher for testing purposes. | |
| 114 void set_idle_seconds(int seconds) { | |
| 115 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
| 116 it = fetchers_.begin(), | |
| 117 e = fetchers_.end(); it != e; ++it) { | |
| 118 (*it)->set_idle_seconds(seconds); | |
| 119 } | |
| 120 } | |
| 121 void set_retry_seconds(int seconds) { | |
| 122 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
| 123 it = fetchers_.begin(), | |
| 124 e = fetchers_.end(); it != e; ++it) { | |
| 125 (*it)->set_retry_seconds(seconds); | |
| 126 } | |
| 127 } | |
| 128 void SetConnectionAsExpensive(bool is_expensive) { | |
| 129 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
| 130 it = fetchers_.begin(), | |
| 131 e = fetchers_.end(); it != e; ++it) { | |
| 132 (*it)->SetConnectionAsExpensive(is_expensive); | |
| 133 } | |
| 134 } | |
| 135 void SetBuildType(bool is_official) { | |
| 136 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
| 137 it = fetchers_.begin(), | |
| 138 e = fetchers_.end(); it != e; ++it) { | |
| 139 (*it)->SetBuildType(is_official); | |
| 140 } | |
| 141 } | |
| 142 | |
| 143 virtual void SetProxies(const std::deque<std::string>& proxies) { | |
| 144 for (typename FetchersVect::iterator it = fetchers_.begin(), | |
| 145 e = fetchers_.end(); it != e; ++it) { | |
| 146 (*it)->SetProxies(proxies); | |
| 147 } | |
| 148 } | |
| 149 | |
| 150 private: | |
| 151 void SendTransferComplete(HttpFetcher* fetcher, bool successful) { | |
| 152 if (sent_transfer_complete_) | |
| 153 return; | |
| 154 LOG(INFO) << "Sending transfer complete"; | |
| 155 sent_transfer_complete_ = true; | |
| 156 http_response_code_ = fetcher->http_response_code(); | |
| 157 if (delegate_) | |
| 158 delegate_->TransferComplete(this, successful); | |
| 159 } | |
| 160 | |
| 161 void StartTransfer() { | |
| 162 if (current_index_ >= ranges_.size()) { | |
| 163 return; | |
| 164 } | |
| 165 LOG(INFO) << "Starting a transfer @" << ranges_[current_index_].first << "(" | |
| 166 << ranges_[current_index_].second << ")"; | |
| 167 bytes_received_this_fetcher_ = 0; | |
| 168 fetchers_[current_index_]->SetOffset(ranges_[current_index_].first); | |
| 169 if (delegate_) | |
| 170 delegate_->SeekToOffset(ranges_[current_index_].first); | |
| 171 fetchers_[current_index_]->BeginTransfer(url_); | |
| 172 } | |
| 173 | |
| 174 void ReceivedBytes(HttpFetcher* fetcher, const char* bytes, int length) { | |
| 175 TEST_AND_RETURN(current_index_ < ranges_.size()); | |
| 176 TEST_AND_RETURN(fetcher == fetchers_[current_index_].get()); | |
| 177 TEST_AND_RETURN(!pending_next_fetcher_); | |
| 178 off_t next_size = length; | |
| 179 if (ranges_[current_index_].second >= 0) { | |
| 180 next_size = std::min(next_size, | |
| 181 ranges_[current_index_].second - | |
| 182 bytes_received_this_fetcher_); | |
| 183 } | |
| 184 LOG_IF(WARNING, next_size <= 0) << "Asked to write length <= 0"; | |
| 185 if (delegate_) { | |
| 186 delegate_->ReceivedBytes(this, bytes, next_size); | |
| 187 } | |
| 188 bytes_received_this_fetcher_ += length; | |
| 189 if (ranges_[current_index_].second >= 0 && | |
| 190 bytes_received_this_fetcher_ >= ranges_[current_index_].second) { | |
| 191 // Terminates the current fetcher. Waits for its TransferTerminated | |
| 192 // callback before starting the next fetcher so that we don't end up | |
| 193 // signalling the delegate that the whole multi-transfer is complete | |
| 194 // before all fetchers are really done and cleaned up. | |
| 195 pending_next_fetcher_ = true; | |
| 196 fetcher->TerminateTransfer(); | |
| 197 } | |
| 198 } | |
| 199 | |
| 200 void TransferComplete(HttpFetcher* fetcher, bool successful) { | |
| 201 TEST_AND_RETURN(!pending_next_fetcher_); | |
| 202 LOG(INFO) << "Received transfer complete."; | |
| 203 if (current_index_ >= ranges_.size()) { | |
| 204 SendTransferComplete(fetcher, true); | |
| 205 return; | |
| 206 } | |
| 207 if (ranges_[current_index_].second < 0) { | |
| 208 // We're done with the current operation | |
| 209 current_index_++; | |
| 210 if (current_index_ >= ranges_.size() || !successful) { | |
| 211 SendTransferComplete(fetcher, successful); | |
| 212 } else { | |
| 213 // Do the next transfer | |
| 214 StartTransfer(); | |
| 215 } | |
| 216 return; | |
| 217 } | |
| 218 if (bytes_received_this_fetcher_ < ranges_[current_index_].second) { | |
| 219 LOG(WARNING) << "Received insufficient bytes from fetcher. " | |
| 220 << "Ending early"; | |
| 221 SendTransferComplete(fetcher, false); | |
| 222 return; | |
| 223 } | |
| 224 LOG(INFO) << "Got spurious TransferComplete. Ignoring."; | |
| 225 } | |
| 226 | |
| 227 // If true, do not send any more data or TransferComplete to the delegate. | |
| 228 bool sent_transfer_complete_; | |
| 229 | |
| 230 // If true, the next fetcher needs to be started when TransferTerminated is | |
| 231 // received from the current fetcher. | |
| 232 bool pending_next_fetcher_; | |
| 233 | |
| 234 RangesVect ranges_; | |
| 235 FetchersVect fetchers_; | |
| 236 | |
| 237 RangesVect::size_type current_index_; // index into ranges_, fetchers_ | |
| 238 off_t bytes_received_this_fetcher_; | |
| 239 | |
| 240 DISALLOW_COPY_AND_ASSIGN(MultiHttpFetcher); | |
| 241 }; | |
| 242 | |
| 243 } // namespace chromeos_update_engine | |
| 244 | |
| 245 #endif // CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
| OLD | NEW |