OLD | NEW |
| (Empty) |
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #ifndef CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
6 #define CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
7 | |
8 #include <deque> | |
9 #include <tr1/memory> | |
10 #include <utility> | |
11 #include <vector> | |
12 | |
13 #include "update_engine/http_fetcher.h" | |
14 #include "update_engine/utils.h" | |
15 | |
16 // This class is a simple wrapper around an HttpFetcher. The client | |
17 // specifies a vector of byte ranges. MultiHttpFetcher will fetch bytes | |
18 // from those offsets. Pass -1 as a length to specify unlimited length. | |
19 // It really only would make sense for the last range specified to have | |
20 // unlimited length. | |
21 | |
22 namespace chromeos_update_engine { | |
23 | |
24 template<typename BaseHttpFetcher> | |
25 class MultiHttpFetcher : public HttpFetcher, public HttpFetcherDelegate { | |
26 public: | |
27 typedef std::vector<std::pair<off_t, off_t> > RangesVect; | |
28 typedef std::vector<std::tr1::shared_ptr<BaseHttpFetcher> > FetchersVect; | |
29 | |
30 MultiHttpFetcher(ProxyResolver* proxy_resolver) | |
31 : HttpFetcher(proxy_resolver), | |
32 sent_transfer_complete_(false), | |
33 pending_next_fetcher_(false), | |
34 current_index_(0), | |
35 bytes_received_this_fetcher_(0) {} | |
36 ~MultiHttpFetcher() {} | |
37 | |
38 void set_ranges(const RangesVect& ranges) { | |
39 ranges_ = ranges; | |
40 fetchers_.resize(ranges_.size()); // Allocate the fetchers | |
41 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> | |
42 >::iterator it = fetchers_.begin(), e = fetchers_.end(); | |
43 it != e; ++it) { | |
44 (*it) = std::tr1::shared_ptr<BaseHttpFetcher>( | |
45 new BaseHttpFetcher(proxy_resolver_)); | |
46 (*it)->set_delegate(this); | |
47 } | |
48 LOG(INFO) << "done w/ list"; | |
49 } | |
50 | |
51 void SetOffset(off_t offset) {} // for now, doesn't support this | |
52 | |
53 // Begins the transfer to the specified URL. | |
54 void BeginTransfer(const std::string& url) { | |
55 url_ = url; | |
56 if (ranges_.empty()) { | |
57 if (delegate_) | |
58 delegate_->TransferComplete(this, true); | |
59 return; | |
60 } | |
61 current_index_ = 0; | |
62 LOG(INFO) << "starting first transfer"; | |
63 StartTransfer(); | |
64 } | |
65 | |
66 void TransferTerminated(HttpFetcher* fetcher) { | |
67 LOG(INFO) << "Received transfer terminated."; | |
68 if (pending_next_fetcher_) { | |
69 pending_next_fetcher_ = false; | |
70 if (++current_index_ >= ranges_.size()) { | |
71 SendTransferComplete(fetcher, true); | |
72 } else { | |
73 StartTransfer(); | |
74 } | |
75 return; | |
76 } | |
77 current_index_ = ranges_.size(); | |
78 sent_transfer_complete_ = true; // a fib | |
79 if (delegate_) { | |
80 // Note that after the callback returns this object may be destroyed. | |
81 delegate_->TransferTerminated(this); | |
82 } | |
83 } | |
84 | |
85 void TerminateTransfer() { | |
86 // If the current fetcher is already being terminated, just wait for its | |
87 // TransferTerminated callback. | |
88 if (pending_next_fetcher_) { | |
89 pending_next_fetcher_ = false; | |
90 return; | |
91 } | |
92 // If there's a current active fetcher terminate it and wait for its | |
93 // TransferTerminated callback. | |
94 if (current_index_ < fetchers_.size()) { | |
95 fetchers_[current_index_]->TerminateTransfer(); | |
96 return; | |
97 } | |
98 // Transfer is terminated before it got started and before any ranges were | |
99 // added. | |
100 TransferTerminated(this); | |
101 } | |
102 | |
103 void Pause() { | |
104 if (current_index_ < fetchers_.size()) | |
105 fetchers_[current_index_]->Pause(); | |
106 } | |
107 | |
108 void Unpause() { | |
109 if (current_index_ < fetchers_.size()) | |
110 fetchers_[current_index_]->Unpause(); | |
111 } | |
112 | |
113 // These functions are overloaded in LibcurlHttp fetcher for testing purposes. | |
114 void set_idle_seconds(int seconds) { | |
115 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
116 it = fetchers_.begin(), | |
117 e = fetchers_.end(); it != e; ++it) { | |
118 (*it)->set_idle_seconds(seconds); | |
119 } | |
120 } | |
121 void set_retry_seconds(int seconds) { | |
122 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
123 it = fetchers_.begin(), | |
124 e = fetchers_.end(); it != e; ++it) { | |
125 (*it)->set_retry_seconds(seconds); | |
126 } | |
127 } | |
128 void SetConnectionAsExpensive(bool is_expensive) { | |
129 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
130 it = fetchers_.begin(), | |
131 e = fetchers_.end(); it != e; ++it) { | |
132 (*it)->SetConnectionAsExpensive(is_expensive); | |
133 } | |
134 } | |
135 void SetBuildType(bool is_official) { | |
136 for (typename std::vector<std::tr1::shared_ptr<BaseHttpFetcher> >::iterator | |
137 it = fetchers_.begin(), | |
138 e = fetchers_.end(); it != e; ++it) { | |
139 (*it)->SetBuildType(is_official); | |
140 } | |
141 } | |
142 | |
143 virtual void SetProxies(const std::deque<std::string>& proxies) { | |
144 for (typename FetchersVect::iterator it = fetchers_.begin(), | |
145 e = fetchers_.end(); it != e; ++it) { | |
146 (*it)->SetProxies(proxies); | |
147 } | |
148 } | |
149 | |
150 private: | |
151 void SendTransferComplete(HttpFetcher* fetcher, bool successful) { | |
152 if (sent_transfer_complete_) | |
153 return; | |
154 LOG(INFO) << "Sending transfer complete"; | |
155 sent_transfer_complete_ = true; | |
156 http_response_code_ = fetcher->http_response_code(); | |
157 if (delegate_) | |
158 delegate_->TransferComplete(this, successful); | |
159 } | |
160 | |
161 void StartTransfer() { | |
162 if (current_index_ >= ranges_.size()) { | |
163 return; | |
164 } | |
165 LOG(INFO) << "Starting a transfer @" << ranges_[current_index_].first << "(" | |
166 << ranges_[current_index_].second << ")"; | |
167 bytes_received_this_fetcher_ = 0; | |
168 fetchers_[current_index_]->SetOffset(ranges_[current_index_].first); | |
169 if (delegate_) | |
170 delegate_->SeekToOffset(ranges_[current_index_].first); | |
171 fetchers_[current_index_]->BeginTransfer(url_); | |
172 } | |
173 | |
174 void ReceivedBytes(HttpFetcher* fetcher, const char* bytes, int length) { | |
175 TEST_AND_RETURN(current_index_ < ranges_.size()); | |
176 TEST_AND_RETURN(fetcher == fetchers_[current_index_].get()); | |
177 TEST_AND_RETURN(!pending_next_fetcher_); | |
178 off_t next_size = length; | |
179 if (ranges_[current_index_].second >= 0) { | |
180 next_size = std::min(next_size, | |
181 ranges_[current_index_].second - | |
182 bytes_received_this_fetcher_); | |
183 } | |
184 LOG_IF(WARNING, next_size <= 0) << "Asked to write length <= 0"; | |
185 if (delegate_) { | |
186 delegate_->ReceivedBytes(this, bytes, next_size); | |
187 } | |
188 bytes_received_this_fetcher_ += length; | |
189 if (ranges_[current_index_].second >= 0 && | |
190 bytes_received_this_fetcher_ >= ranges_[current_index_].second) { | |
191 // Terminates the current fetcher. Waits for its TransferTerminated | |
192 // callback before starting the next fetcher so that we don't end up | |
193 // signalling the delegate that the whole multi-transfer is complete | |
194 // before all fetchers are really done and cleaned up. | |
195 pending_next_fetcher_ = true; | |
196 fetcher->TerminateTransfer(); | |
197 } | |
198 } | |
199 | |
200 void TransferComplete(HttpFetcher* fetcher, bool successful) { | |
201 TEST_AND_RETURN(!pending_next_fetcher_); | |
202 LOG(INFO) << "Received transfer complete."; | |
203 if (current_index_ >= ranges_.size()) { | |
204 SendTransferComplete(fetcher, true); | |
205 return; | |
206 } | |
207 if (ranges_[current_index_].second < 0) { | |
208 // We're done with the current operation | |
209 current_index_++; | |
210 if (current_index_ >= ranges_.size() || !successful) { | |
211 SendTransferComplete(fetcher, successful); | |
212 } else { | |
213 // Do the next transfer | |
214 StartTransfer(); | |
215 } | |
216 return; | |
217 } | |
218 if (bytes_received_this_fetcher_ < ranges_[current_index_].second) { | |
219 LOG(WARNING) << "Received insufficient bytes from fetcher. " | |
220 << "Ending early"; | |
221 SendTransferComplete(fetcher, false); | |
222 return; | |
223 } | |
224 LOG(INFO) << "Got spurious TransferComplete. Ignoring."; | |
225 } | |
226 | |
227 // If true, do not send any more data or TransferComplete to the delegate. | |
228 bool sent_transfer_complete_; | |
229 | |
230 // If true, the next fetcher needs to be started when TransferTerminated is | |
231 // received from the current fetcher. | |
232 bool pending_next_fetcher_; | |
233 | |
234 RangesVect ranges_; | |
235 FetchersVect fetchers_; | |
236 | |
237 RangesVect::size_type current_index_; // index into ranges_, fetchers_ | |
238 off_t bytes_received_this_fetcher_; | |
239 | |
240 DISALLOW_COPY_AND_ASSIGN(MultiHttpFetcher); | |
241 }; | |
242 | |
243 } // namespace chromeos_update_engine | |
244 | |
245 #endif // CHROMEOS_PLATFORM_UPDATE_ENGINE_MULTI_HTTP_FETCHER_H__ | |
OLD | NEW |