Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: components/cronet/ios/cronet_bidirectional_stream.cc

Issue 2050483002: [Cronet] Coalesce small buffers into single QUIC packet in GRPC on iOS. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Helen's comments. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/cronet/ios/cronet_bidirectional_stream.h" 5 #include "components/cronet/ios/cronet_bidirectional_stream.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <string> 8 #include <string>
9 #include <vector> 9 #include <vector>
10 10
(...skipping 14 matching lines...) Expand all
25 #include "net/http/http_transaction_factory.h" 25 #include "net/http/http_transaction_factory.h"
26 #include "net/http/http_util.h" 26 #include "net/http/http_util.h"
27 #include "net/spdy/spdy_header_block.h" 27 #include "net/spdy/spdy_header_block.h"
28 #include "net/ssl/ssl_info.h" 28 #include "net/ssl/ssl_info.h"
29 #include "net/url_request/http_user_agent_settings.h" 29 #include "net/url_request/http_user_agent_settings.h"
30 #include "net/url_request/url_request_context.h" 30 #include "net/url_request/url_request_context.h"
31 #include "url/gurl.h" 31 #include "url/gurl.h"
32 32
33 namespace cronet { 33 namespace cronet {
34 34
35 CronetBidirectionalStream::WriteBuffers::WriteBuffers() {}
36
37 CronetBidirectionalStream::WriteBuffers::~WriteBuffers() {}
38
39 void CronetBidirectionalStream::WriteBuffers::Clear() {
40 write_buffer_list.clear();
41 write_buffer_len_list.clear();
42 }
43
44 void CronetBidirectionalStream::WriteBuffers::AppendBuffer(
45 const scoped_refptr<net::IOBuffer>& buffer,
46 int buffer_size) {
47 write_buffer_list.push_back(buffer);
48 write_buffer_len_list.push_back(buffer_size);
49 }
50
51 void CronetBidirectionalStream::WriteBuffers::MoveTo(WriteBuffers* target) {
52 std::move(write_buffer_list.begin(), write_buffer_list.end(),
53 std::back_inserter(target->write_buffer_list));
54 std::move(write_buffer_len_list.begin(), write_buffer_len_list.end(),
55 std::back_inserter(target->write_buffer_len_list));
56 Clear();
57 }
58
59 bool CronetBidirectionalStream::WriteBuffers::Empty() const {
60 return write_buffer_list.empty();
61 }
62
35 CronetBidirectionalStream::CronetBidirectionalStream( 63 CronetBidirectionalStream::CronetBidirectionalStream(
36 CronetEnvironment* environment, 64 CronetEnvironment* environment,
37 Delegate* delegate) 65 Delegate* delegate)
38 : read_state_(NOT_STARTED), 66 : read_state_(NOT_STARTED),
39 write_state_(NOT_STARTED), 67 write_state_(NOT_STARTED),
40 write_end_of_stream_(false), 68 write_end_of_stream_(false),
69 request_headers_sent_(false),
70 disable_auto_flush_(false),
71 delay_headers_until_flush_(false),
41 environment_(environment), 72 environment_(environment),
73 pending_write_data_(new WriteBuffers()),
74 flushing_write_data_(new WriteBuffers()),
75 sending_write_data_(new WriteBuffers()),
42 delegate_(delegate) {} 76 delegate_(delegate) {}
43 77
44 CronetBidirectionalStream::~CronetBidirectionalStream() { 78 CronetBidirectionalStream::~CronetBidirectionalStream() {
45 DCHECK(environment_->IsOnNetworkThread()); 79 DCHECK(environment_->IsOnNetworkThread());
46 } 80 }
47 81
48 int CronetBidirectionalStream::Start(const char* url, 82 int CronetBidirectionalStream::Start(const char* url,
49 int priority, 83 int priority,
50 const char* method, 84 const char* method,
51 const net::HttpRequestHeaders& headers, 85 const net::HttpRequestHeaders& headers,
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 scoped_refptr<net::WrappedIOBuffer> write_buffer( 125 scoped_refptr<net::WrappedIOBuffer> write_buffer(
92 new net::WrappedIOBuffer(buffer)); 126 new net::WrappedIOBuffer(buffer));
93 127
94 environment_->PostToNetworkThread( 128 environment_->PostToNetworkThread(
95 FROM_HERE, 129 FROM_HERE,
96 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread, 130 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread,
97 base::Unretained(this), write_buffer, count, end_of_stream)); 131 base::Unretained(this), write_buffer, count, end_of_stream));
98 return true; 132 return true;
99 } 133 }
100 134
135 void CronetBidirectionalStream::Flush() {
136 environment_->PostToNetworkThread(
137 FROM_HERE, base::Bind(&CronetBidirectionalStream::FlushOnNetworkThread,
138 base::Unretained(this)));
139 }
140
101 void CronetBidirectionalStream::Cancel() { 141 void CronetBidirectionalStream::Cancel() {
102 environment_->PostToNetworkThread( 142 environment_->PostToNetworkThread(
103 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread, 143 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread,
104 base::Unretained(this))); 144 base::Unretained(this)));
105 } 145 }
106 146
107 void CronetBidirectionalStream::Destroy() { 147 void CronetBidirectionalStream::Destroy() {
108 // Destroy could be called from any thread, including network thread (if 148 // Destroy could be called from any thread, including network thread (if
109 // posting task to executor throws an exception), but is posted, so |this| 149 // posting task to executor throws an exception), but is posted, so |this|
110 // is valid until calling task is complete. 150 // is valid until calling task is complete.
111 environment_->PostToNetworkThread( 151 environment_->PostToNetworkThread(
112 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread, 152 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread,
113 base::Unretained(this))); 153 base::Unretained(this)));
114 } 154 }
115 155
116 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) { 156 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) {
117 DCHECK(environment_->IsOnNetworkThread()); 157 DCHECK(environment_->IsOnNetworkThread());
118 DCHECK(write_state_ == STARTED); 158 DCHECK_EQ(STARTED, write_state_);
119 write_state_ = WAITING_FOR_WRITE; 159 request_headers_sent_ = request_headers_sent;
120 if (write_end_of_stream_) 160 write_state_ = WAITING_FOR_FLUSH;
161 if (write_end_of_stream_) {
162 if (!request_headers_sent) {
163 // If there is no data to write, then just send headers explicitly.
164 bidi_stream_->SendRequestHeaders();
165 request_headers_sent_ = true;
166 }
121 write_state_ = WRITING_DONE; 167 write_state_ = WRITING_DONE;
168 }
122 delegate_->OnStreamReady(); 169 delegate_->OnStreamReady();
123 } 170 }
124 171
125 void CronetBidirectionalStream::OnHeadersReceived( 172 void CronetBidirectionalStream::OnHeadersReceived(
126 const net::SpdyHeaderBlock& response_headers) { 173 const net::SpdyHeaderBlock& response_headers) {
127 DCHECK(environment_->IsOnNetworkThread()); 174 DCHECK(environment_->IsOnNetworkThread());
128 DCHECK(read_state_ == STARTED); 175 DCHECK_EQ(STARTED, read_state_);
129 read_state_ = WAITING_FOR_READ; 176 read_state_ = WAITING_FOR_READ;
130 // Get http status code from response headers. 177 // Get http status code from response headers.
131 int http_status_code = 0; 178 int http_status_code = 0;
132 const auto http_status_header = response_headers.find(":status"); 179 const auto http_status_header = response_headers.find(":status");
133 if (http_status_header != response_headers.end()) 180 if (http_status_header != response_headers.end())
134 base::StringToInt(http_status_header->second, &http_status_code); 181 base::StringToInt(http_status_header->second, &http_status_code);
135 const char* protocol = "unknown"; 182 const char* protocol = "unknown";
136 switch (bidi_stream_->GetProtocol()) { 183 switch (bidi_stream_->GetProtocol()) {
137 case net::kProtoHTTP2: 184 case net::kProtoHTTP2:
138 protocol = "h2"; 185 protocol = "h2";
139 break; 186 break;
140 case net::kProtoQUIC1SPDY3: 187 case net::kProtoQUIC1SPDY3:
141 protocol = "quic/1+spdy/3"; 188 protocol = "quic/1+spdy/3";
142 break; 189 break;
143 default: 190 default:
144 break; 191 break;
145 } 192 }
146 delegate_->OnHeadersReceived(response_headers, protocol); 193 delegate_->OnHeadersReceived(response_headers, protocol);
147 } 194 }
148 195
149 void CronetBidirectionalStream::OnDataRead(int bytes_read) { 196 void CronetBidirectionalStream::OnDataRead(int bytes_read) {
150 DCHECK(environment_->IsOnNetworkThread()); 197 DCHECK(environment_->IsOnNetworkThread());
151 DCHECK(read_state_ == READING); 198 DCHECK_EQ(READING, read_state_);
152 read_state_ = WAITING_FOR_READ; 199 read_state_ = WAITING_FOR_READ;
153 delegate_->OnDataRead(read_buffer_->data(), bytes_read); 200 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
154 201
155 // Free the read buffer. 202 // Free the read buffer.
156 read_buffer_ = nullptr; 203 read_buffer_ = nullptr;
157 if (bytes_read == 0) 204 if (bytes_read == 0)
158 read_state_ = READING_DONE; 205 read_state_ = READING_DONE;
159 MaybeOnSucceded(); 206 MaybeOnSucceded();
160 } 207 }
161 208
162 void CronetBidirectionalStream::OnDataSent() { 209 void CronetBidirectionalStream::OnDataSent() {
163 DCHECK(environment_->IsOnNetworkThread()); 210 DCHECK(environment_->IsOnNetworkThread());
164 DCHECK(write_state_ == WRITING); 211 DCHECK_EQ(WRITING, write_state_);
165 write_state_ = WAITING_FOR_WRITE; 212 write_state_ = WAITING_FOR_FLUSH;
166 delegate_->OnDataSent(write_buffer_->data()); 213 for (const scoped_refptr<net::IOBuffer>& buffer :
167 // Free the write buffer. 214 sending_write_data_->buffers()) {
168 write_buffer_ = nullptr; 215 delegate_->OnDataSent(buffer->data());
169 if (write_end_of_stream_) 216 }
217 sending_write_data_->Clear();
218 // Send data flushed while other data was sending.
219 if (!flushing_write_data_->Empty()) {
220 SendFlushingWriteData();
221 return;
222 }
223 if (write_end_of_stream_ && pending_write_data_->Empty())
170 write_state_ = WRITING_DONE; 224 write_state_ = WRITING_DONE;
171 MaybeOnSucceded(); 225 MaybeOnSucceded();
kapishnikov 2016/06/16 16:16:47 For future CL: I think MaybeOnSucceded() can go in
172 } 226 }
173 227
174 void CronetBidirectionalStream::OnTrailersReceived( 228 void CronetBidirectionalStream::OnTrailersReceived(
175 const net::SpdyHeaderBlock& response_trailers) { 229 const net::SpdyHeaderBlock& response_trailers) {
176 DCHECK(environment_->IsOnNetworkThread()); 230 DCHECK(environment_->IsOnNetworkThread());
177 delegate_->OnTrailersReceived(response_trailers); 231 delegate_->OnTrailersReceived(response_trailers);
178 } 232 }
179 233
180 void CronetBidirectionalStream::OnFailed(int error) { 234 void CronetBidirectionalStream::OnFailed(int error) {
181 DCHECK(environment_->IsOnNetworkThread()); 235 DCHECK(environment_->IsOnNetworkThread());
182 bidi_stream_.reset(); 236 bidi_stream_.reset();
183 read_state_ = write_state_ = ERROR; 237 read_state_ = write_state_ = ERROR;
184 delegate_->OnFailed(error); 238 delegate_->OnFailed(error);
185 } 239 }
186 240
187 void CronetBidirectionalStream::StartOnNetworkThread( 241 void CronetBidirectionalStream::StartOnNetworkThread(
188 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) { 242 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
189 DCHECK(environment_->IsOnNetworkThread()); 243 DCHECK(environment_->IsOnNetworkThread());
190 DCHECK(!bidi_stream_); 244 DCHECK(!bidi_stream_);
191 DCHECK(environment_->GetURLRequestContext()); 245 DCHECK(environment_->GetURLRequestContext());
192 request_info->extra_headers.SetHeaderIfMissing( 246 request_info->extra_headers.SetHeaderIfMissing(
193 net::HttpRequestHeaders::kUserAgent, environment_->user_agent()); 247 net::HttpRequestHeaders::kUserAgent, environment_->user_agent());
194 bidi_stream_.reset(new net::BidirectionalStream( 248 bidi_stream_.reset(new net::BidirectionalStream(
195 std::move(request_info), environment_->GetURLRequestContext() 249 std::move(request_info), environment_->GetURLRequestContext()
196 ->http_transaction_factory() 250 ->http_transaction_factory()
197 ->GetSession(), 251 ->GetSession(),
198 /*send_request_headers_automatically=*/true, this)); 252 !delay_headers_until_flush_, this));
199 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED); 253 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
200 read_state_ = write_state_ = STARTED; 254 read_state_ = write_state_ = STARTED;
201 } 255 }
202 256
203 void CronetBidirectionalStream::ReadDataOnNetworkThread( 257 void CronetBidirectionalStream::ReadDataOnNetworkThread(
204 scoped_refptr<net::WrappedIOBuffer> read_buffer, 258 scoped_refptr<net::WrappedIOBuffer> read_buffer,
205 int buffer_size) { 259 int buffer_size) {
206 DCHECK(environment_->IsOnNetworkThread()); 260 DCHECK(environment_->IsOnNetworkThread());
207 DCHECK(read_buffer); 261 DCHECK(read_buffer);
208 DCHECK(!read_buffer_); 262 DCHECK(!read_buffer_);
(...skipping 18 matching lines...) Expand all
227 } 281 }
228 OnDataRead(bytes_read); 282 OnDataRead(bytes_read);
229 } 283 }
230 284
231 void CronetBidirectionalStream::WriteDataOnNetworkThread( 285 void CronetBidirectionalStream::WriteDataOnNetworkThread(
232 scoped_refptr<net::WrappedIOBuffer> write_buffer, 286 scoped_refptr<net::WrappedIOBuffer> write_buffer,
233 int buffer_size, 287 int buffer_size,
234 bool end_of_stream) { 288 bool end_of_stream) {
235 DCHECK(environment_->IsOnNetworkThread()); 289 DCHECK(environment_->IsOnNetworkThread());
236 DCHECK(write_buffer); 290 DCHECK(write_buffer);
237 DCHECK(!write_buffer_); 291 DCHECK(!write_end_of_stream_);
238 if (write_state_ != WAITING_FOR_WRITE) { 292 if (!bidi_stream_ || write_end_of_stream_) {
239 DLOG(ERROR) << "Unexpected Write Data in write_state " << write_state_; 293 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
240 // Invoke OnFailed unless it is already invoked. 294 // Invoke OnFailed unless it is already invoked.
241 if (write_state_ != ERROR) 295 if (write_state_ != ERROR)
242 OnFailed(net::ERR_UNEXPECTED); 296 OnFailed(net::ERR_UNEXPECTED);
243 return; 297 return;
244 } 298 }
299 pending_write_data_->AppendBuffer(write_buffer, buffer_size);
300 write_end_of_stream_ = end_of_stream;
301 if (!disable_auto_flush_)
302 FlushOnNetworkThread();
303 }
304
305 void CronetBidirectionalStream::FlushOnNetworkThread() {
306 DCHECK(environment_->IsOnNetworkThread());
307 if (!bidi_stream_)
308 return;
309 // If there is no data to flush, may need to send headers.
310 if (pending_write_data_->Empty()) {
311 if (!request_headers_sent_) {
312 request_headers_sent_ = true;
313 bidi_stream_->SendRequestHeaders();
314 }
315 return;
316 }
317 // If request headers are not sent yet, they will be sent with the data.
318 if (!request_headers_sent_)
319 request_headers_sent_ = true;
320
321 // Move pending data to the flushing list.
322 pending_write_data_->MoveTo(flushing_write_data_.get());
323 DCHECK(pending_write_data_->Empty());
324 if (write_state_ != WRITING)
325 SendFlushingWriteData();
326 }
327
328 void CronetBidirectionalStream::SendFlushingWriteData() {
329 // If previous send is not done, or there is nothing to flush, then exit.
330 if (write_state_ == WRITING || flushing_write_data_->Empty())
331 return;
332 DCHECK(sending_write_data_->Empty());
245 write_state_ = WRITING; 333 write_state_ = WRITING;
246 write_end_of_stream_ = end_of_stream; 334 flushing_write_data_->MoveTo(sending_write_data_.get());
247 335 bidi_stream_->SendvData(sending_write_data_->buffers(),
248 write_buffer_ = write_buffer; 336 sending_write_data_->lengths(), write_end_of_stream_);
xunjieli 2016/06/20 15:24:24 While I was doing the same change on Android side,
249 bidi_stream_->SendData(write_buffer_.get(), buffer_size, end_of_stream);
250 } 337 }
251 338
252 void CronetBidirectionalStream::CancelOnNetworkThread() { 339 void CronetBidirectionalStream::CancelOnNetworkThread() {
253 DCHECK(environment_->IsOnNetworkThread()); 340 DCHECK(environment_->IsOnNetworkThread());
254 if (!bidi_stream_) 341 if (!bidi_stream_)
255 return; 342 return;
256 read_state_ = write_state_ = CANCELED; 343 read_state_ = write_state_ = CANCELED;
257 bidi_stream_.reset(); 344 bidi_stream_.reset();
258 delegate_->OnCanceled(); 345 delegate_->OnCanceled();
259 } 346 }
260 347
261 void CronetBidirectionalStream::DestroyOnNetworkThread() { 348 void CronetBidirectionalStream::DestroyOnNetworkThread() {
262 DCHECK(environment_->IsOnNetworkThread()); 349 DCHECK(environment_->IsOnNetworkThread());
263 delete this; 350 delete this;
264 } 351 }
265 352
266 void CronetBidirectionalStream::MaybeOnSucceded() { 353 void CronetBidirectionalStream::MaybeOnSucceded() {
267 DCHECK(environment_->IsOnNetworkThread()); 354 DCHECK(environment_->IsOnNetworkThread());
268 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) { 355 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
269 read_state_ = write_state_ = SUCCESS; 356 read_state_ = write_state_ = SUCCESS;
270 bidi_stream_.reset(); 357 bidi_stream_.reset();
271 delegate_->OnSucceeded(); 358 delegate_->OnSucceeded();
272 } 359 }
273 } 360 }
274 361
275 } // namespace cronet 362 } // namespace cronet
OLDNEW
« no previous file with comments | « components/cronet/ios/cronet_bidirectional_stream.h ('k') | components/cronet/ios/cronet_c_for_grpc.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698