Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(162)

Side by Side Diff: components/cronet/ios/cronet_bidirectional_stream.cc

Issue 2050483002: [Cronet] Coalesce small buffers into single QUIC packet in GRPC on iOS. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Merge SendFlushingWriteData and FlushDataOnNetworkThread. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/cronet/ios/cronet_bidirectional_stream.h" 5 #include "components/cronet/ios/cronet_bidirectional_stream.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <string> 8 #include <string>
9 #include <vector> 9 #include <vector>
10 10
(...skipping 14 matching lines...) Expand all
25 #include "net/http/http_transaction_factory.h" 25 #include "net/http/http_transaction_factory.h"
26 #include "net/http/http_util.h" 26 #include "net/http/http_util.h"
27 #include "net/spdy/spdy_header_block.h" 27 #include "net/spdy/spdy_header_block.h"
28 #include "net/ssl/ssl_info.h" 28 #include "net/ssl/ssl_info.h"
29 #include "net/url_request/http_user_agent_settings.h" 29 #include "net/url_request/http_user_agent_settings.h"
30 #include "net/url_request/url_request_context.h" 30 #include "net/url_request/url_request_context.h"
31 #include "url/gurl.h" 31 #include "url/gurl.h"
32 32
33 namespace cronet { 33 namespace cronet {
34 34
35 CronetBidirectionalStream::PendingWriteData::PendingWriteData() {}
36
37 CronetBidirectionalStream::PendingWriteData::~PendingWriteData() {}
38
35 CronetBidirectionalStream::CronetBidirectionalStream( 39 CronetBidirectionalStream::CronetBidirectionalStream(
36 CronetEnvironment* environment, 40 CronetEnvironment* environment,
37 Delegate* delegate) 41 Delegate* delegate)
38 : read_state_(NOT_STARTED), 42 : read_state_(NOT_STARTED),
39 write_state_(NOT_STARTED), 43 write_state_(NOT_STARTED),
40 write_end_of_stream_(false), 44 write_end_of_stream_(false),
45 request_headers_sent_(false),
46 disable_auto_flush_(false),
47 delay_headers_until_flush_(false),
41 environment_(environment), 48 environment_(environment),
42 delegate_(delegate) {} 49 delegate_(delegate) {}
43 50
44 CronetBidirectionalStream::~CronetBidirectionalStream() { 51 CronetBidirectionalStream::~CronetBidirectionalStream() {
45 DCHECK(environment_->IsOnNetworkThread()); 52 DCHECK(environment_->IsOnNetworkThread());
46 } 53 }
47 54
48 int CronetBidirectionalStream::Start(const char* url, 55 int CronetBidirectionalStream::Start(const char* url,
49 int priority, 56 int priority,
50 const char* method, 57 const char* method,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 scoped_refptr<net::WrappedIOBuffer> write_buffer( 98 scoped_refptr<net::WrappedIOBuffer> write_buffer(
92 new net::WrappedIOBuffer(buffer)); 99 new net::WrappedIOBuffer(buffer));
93 100
94 environment_->PostToNetworkThread( 101 environment_->PostToNetworkThread(
95 FROM_HERE, 102 FROM_HERE,
96 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread, 103 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread,
97 base::Unretained(this), write_buffer, count, end_of_stream)); 104 base::Unretained(this), write_buffer, count, end_of_stream));
98 return true; 105 return true;
99 } 106 }
100 107
108 bool CronetBidirectionalStream::Flush() {
109 environment_->PostToNetworkThread(
110 FROM_HERE, base::Bind(&CronetBidirectionalStream::FlushOnNetworkThread,
111 base::Unretained(this)));
112 return true;
113 }
114
101 void CronetBidirectionalStream::Cancel() { 115 void CronetBidirectionalStream::Cancel() {
102 environment_->PostToNetworkThread( 116 environment_->PostToNetworkThread(
103 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread, 117 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread,
104 base::Unretained(this))); 118 base::Unretained(this)));
105 } 119 }
106 120
107 void CronetBidirectionalStream::Destroy() { 121 void CronetBidirectionalStream::Destroy() {
108 // Destroy could be called from any thread, including network thread (if 122 // Destroy could be called from any thread, including network thread (if
109 // posting task to executor throws an exception), but is posted, so |this| 123 // posting task to executor throws an exception), but is posted, so |this|
110 // is valid until calling task is complete. 124 // is valid until calling task is complete.
111 environment_->PostToNetworkThread( 125 environment_->PostToNetworkThread(
112 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread, 126 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread,
113 base::Unretained(this))); 127 base::Unretained(this)));
114 } 128 }
115 129
116 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) { 130 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) {
117 DCHECK(environment_->IsOnNetworkThread()); 131 DCHECK(environment_->IsOnNetworkThread());
118 DCHECK(write_state_ == STARTED); 132 DCHECK_EQ(write_state_, STARTED);
119 write_state_ = WAITING_FOR_WRITE; 133 request_headers_sent_ = request_headers_sent;
120 if (write_end_of_stream_) 134 write_state_ = WAITING_FOR_FLUSH;
135 if (write_end_of_stream_) {
136 if (!request_headers_sent) {
137 // If there is no data to write, then just send headers explicitly.
138 bidi_stream_->SendRequestHeaders();
139 request_headers_sent_ = true;
140 }
121 write_state_ = WRITING_DONE; 141 write_state_ = WRITING_DONE;
142 }
122 delegate_->OnStreamReady(); 143 delegate_->OnStreamReady();
123 } 144 }
124 145
125 void CronetBidirectionalStream::OnHeadersReceived( 146 void CronetBidirectionalStream::OnHeadersReceived(
126 const net::SpdyHeaderBlock& response_headers) { 147 const net::SpdyHeaderBlock& response_headers) {
127 DCHECK(environment_->IsOnNetworkThread()); 148 DCHECK(environment_->IsOnNetworkThread());
128 DCHECK(read_state_ == STARTED); 149 DCHECK_EQ(read_state_, STARTED);
xunjieli 2016/06/10 20:31:37 nit: the expected state should be the first arg. D
mef 2016/06/10 21:15:14 Hrm, I think that's true for ASSERT_EQ, but I see
xunjieli 2016/06/10 22:22:09 I have been doing that way. But if there are place
kapishnikov 2016/06/13 21:21:57 Found this one: https://bugs.chromium.org/p/chromi
mef 2016/06/14 00:20:38 Done.
129 read_state_ = WAITING_FOR_READ; 150 read_state_ = WAITING_FOR_READ;
130 // Get http status code from response headers. 151 // Get http status code from response headers.
131 int http_status_code = 0; 152 int http_status_code = 0;
132 const auto http_status_header = response_headers.find(":status"); 153 const auto http_status_header = response_headers.find(":status");
133 if (http_status_header != response_headers.end()) 154 if (http_status_header != response_headers.end())
134 base::StringToInt(http_status_header->second, &http_status_code); 155 base::StringToInt(http_status_header->second, &http_status_code);
135 const char* protocol = "unknown"; 156 const char* protocol = "unknown";
136 switch (bidi_stream_->GetProtocol()) { 157 switch (bidi_stream_->GetProtocol()) {
137 case net::kProtoHTTP2: 158 case net::kProtoHTTP2:
138 protocol = "h2"; 159 protocol = "h2";
139 break; 160 break;
140 case net::kProtoQUIC1SPDY3: 161 case net::kProtoQUIC1SPDY3:
141 protocol = "quic/1+spdy/3"; 162 protocol = "quic/1+spdy/3";
142 break; 163 break;
143 default: 164 default:
144 break; 165 break;
145 } 166 }
146 delegate_->OnHeadersReceived(response_headers, protocol); 167 delegate_->OnHeadersReceived(response_headers, protocol);
147 } 168 }
148 169
149 void CronetBidirectionalStream::OnDataRead(int bytes_read) { 170 void CronetBidirectionalStream::OnDataRead(int bytes_read) {
150 DCHECK(environment_->IsOnNetworkThread()); 171 DCHECK(environment_->IsOnNetworkThread());
151 DCHECK(read_state_ == READING); 172 DCHECK_EQ(read_state_, READING);
152 read_state_ = WAITING_FOR_READ; 173 read_state_ = WAITING_FOR_READ;
153 delegate_->OnDataRead(read_buffer_->data(), bytes_read); 174 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
154 175
155 // Free the read buffer. 176 // Free the read buffer.
156 read_buffer_ = nullptr; 177 read_buffer_ = nullptr;
157 if (bytes_read == 0) 178 if (bytes_read == 0)
158 read_state_ = READING_DONE; 179 read_state_ = READING_DONE;
159 MaybeOnSucceded(); 180 MaybeOnSucceded();
160 } 181 }
161 182
162 void CronetBidirectionalStream::OnDataSent() { 183 void CronetBidirectionalStream::OnDataSent() {
163 DCHECK(environment_->IsOnNetworkThread()); 184 DCHECK(environment_->IsOnNetworkThread());
164 DCHECK(write_state_ == WRITING); 185 DCHECK_EQ(write_state_, WRITING);
165 write_state_ = WAITING_FOR_WRITE; 186 write_state_ = WAITING_FOR_FLUSH;
166 delegate_->OnDataSent(write_buffer_->data()); 187 for (scoped_refptr<net::IOBuffer> buffer :
167 // Free the write buffer. 188 sending_write_data_->write_buffer_list) {
168 write_buffer_ = nullptr; 189 delegate_->OnDataSent(buffer->data());
190 }
191 sending_write_data_.reset();
192 // Send data flushed while other data was sending.
193 if (flushing_write_data_) {
194 FlushOnNetworkThread();
195 return;
196 }
169 if (write_end_of_stream_) 197 if (write_end_of_stream_)
170 write_state_ = WRITING_DONE; 198 write_state_ = WRITING_DONE;
171 MaybeOnSucceded(); 199 MaybeOnSucceded();
172 } 200 }
173 201
174 void CronetBidirectionalStream::OnTrailersReceived( 202 void CronetBidirectionalStream::OnTrailersReceived(
175 const net::SpdyHeaderBlock& response_trailers) { 203 const net::SpdyHeaderBlock& response_trailers) {
176 DCHECK(environment_->IsOnNetworkThread()); 204 DCHECK(environment_->IsOnNetworkThread());
177 delegate_->OnTrailersReceived(response_trailers); 205 delegate_->OnTrailersReceived(response_trailers);
178 } 206 }
179 207
180 void CronetBidirectionalStream::OnFailed(int error) { 208 void CronetBidirectionalStream::OnFailed(int error) {
181 DCHECK(environment_->IsOnNetworkThread()); 209 DCHECK(environment_->IsOnNetworkThread());
182 bidi_stream_.reset(); 210 bidi_stream_.reset();
183 read_state_ = write_state_ = ERROR; 211 read_state_ = write_state_ = ERROR;
184 delegate_->OnFailed(error); 212 delegate_->OnFailed(error);
185 } 213 }
186 214
187 void CronetBidirectionalStream::StartOnNetworkThread( 215 void CronetBidirectionalStream::StartOnNetworkThread(
188 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) { 216 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
189 DCHECK(environment_->IsOnNetworkThread()); 217 DCHECK(environment_->IsOnNetworkThread());
190 DCHECK(!bidi_stream_); 218 DCHECK(!bidi_stream_);
191 DCHECK(environment_->GetURLRequestContext()); 219 DCHECK(environment_->GetURLRequestContext());
192 request_info->extra_headers.SetHeaderIfMissing( 220 request_info->extra_headers.SetHeaderIfMissing(
193 net::HttpRequestHeaders::kUserAgent, environment_->user_agent()); 221 net::HttpRequestHeaders::kUserAgent, environment_->user_agent());
194 bidi_stream_.reset(new net::BidirectionalStream( 222 bidi_stream_.reset(new net::BidirectionalStream(
195 std::move(request_info), environment_->GetURLRequestContext() 223 std::move(request_info), environment_->GetURLRequestContext()
196 ->http_transaction_factory() 224 ->http_transaction_factory()
197 ->GetSession(), 225 ->GetSession(),
198 /*send_request_headers_automatically=*/true, this)); 226 !delay_headers_until_flush_, this));
199 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED); 227 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
200 read_state_ = write_state_ = STARTED; 228 read_state_ = write_state_ = STARTED;
201 } 229 }
202 230
203 void CronetBidirectionalStream::ReadDataOnNetworkThread( 231 void CronetBidirectionalStream::ReadDataOnNetworkThread(
204 scoped_refptr<net::WrappedIOBuffer> read_buffer, 232 scoped_refptr<net::WrappedIOBuffer> read_buffer,
205 int buffer_size) { 233 int buffer_size) {
206 DCHECK(environment_->IsOnNetworkThread()); 234 DCHECK(environment_->IsOnNetworkThread());
207 DCHECK(read_buffer); 235 DCHECK(read_buffer);
208 DCHECK(!read_buffer_); 236 DCHECK(!read_buffer_);
(...skipping 16 matching lines...) Expand all
225 OnFailed(bytes_read); 253 OnFailed(bytes_read);
226 return; 254 return;
227 } 255 }
228 OnDataRead(bytes_read); 256 OnDataRead(bytes_read);
229 } 257 }
230 258
231 void CronetBidirectionalStream::WriteDataOnNetworkThread( 259 void CronetBidirectionalStream::WriteDataOnNetworkThread(
232 scoped_refptr<net::WrappedIOBuffer> write_buffer, 260 scoped_refptr<net::WrappedIOBuffer> write_buffer,
233 int buffer_size, 261 int buffer_size,
234 bool end_of_stream) { 262 bool end_of_stream) {
235 DCHECK(environment_->IsOnNetworkThread()); 263 DCHECK(environment_->IsOnNetworkThread());
kapishnikov 2016/06/13 21:21:57 Can we add DCHECK here to check that write_end_of_
mef 2016/06/14 00:20:37 Done.
236 DCHECK(write_buffer); 264 DCHECK(write_buffer);
237 DCHECK(!write_buffer_); 265 if (!bidi_stream_) {
238 if (write_state_ != WAITING_FOR_WRITE) { 266 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
239 DLOG(ERROR) << "Unexpected Write Data in write_state " << write_state_;
240 // Invoke OnFailed unless it is already invoked. 267 // Invoke OnFailed unless it is already invoked.
241 if (write_state_ != ERROR) 268 if (write_state_ != ERROR)
242 OnFailed(net::ERR_UNEXPECTED); 269 OnFailed(net::ERR_UNEXPECTED);
243 return; 270 return;
244 } 271 }
272 if (!pending_write_data_)
273 pending_write_data_.reset(new PendingWriteData());
274 pending_write_data_->write_buffer_list.push_back(write_buffer);
kapishnikov 2016/06/13 21:21:57 Can we create a method inside PendingWriteData str
mef 2016/06/14 00:20:38 Done.
275 pending_write_data_->write_buffer_len_list.push_back(buffer_size);
276 write_end_of_stream_ = end_of_stream;
277 if (!disable_auto_flush_)
278 FlushOnNetworkThread();
279 }
280
281 void CronetBidirectionalStream::FlushOnNetworkThread() {
282 DCHECK(environment_->IsOnNetworkThread());
283 if (!bidi_stream_)
284 return;
285 // If there is no data to flush, may need to send headers.
286 if (!pending_write_data_) {
287 if (!request_headers_sent_) {
288 request_headers_sent_ = true;
289 bidi_stream_->SendRequestHeaders();
290 }
291 return;
292 }
293 // If request headers are not sent yet, they will be sent with the data.
294 request_headers_sent_ = true;
295 // Move pending data to the flushing list.
296 if (flushing_write_data_) {
xunjieli 2016/06/10 20:31:37 I think it is slightly confusing that |flushing_wr
mef 2016/06/10 21:15:14 That's an interesting obeservation. With FlushOnNe
mef 2016/06/14 00:20:38 Done. I think. :)
297 std::move(pending_write_data_->write_buffer_list.begin(),
kapishnikov 2016/06/13 21:21:57 Maybe a new method in PendingWriteData struct? E.g
mef 2016/06/14 00:20:37 I think we don't need to separate pending_write_da
298 pending_write_data_->write_buffer_list.end(),
299 std::back_inserter(flushing_write_data_->write_buffer_list));
300 std::move(pending_write_data_->write_buffer_len_list.begin(),
301 pending_write_data_->write_buffer_len_list.end(),
302 std::back_inserter(flushing_write_data_->write_buffer_len_list));
303 } else {
304 std::swap(flushing_write_data_, pending_write_data_);
305 }
306 // If previous send is not done, or there is nothing to flush, then exit.
307 if ((write_state_ == WRITING) || !flushing_write_data_)
kapishnikov 2016/06/13 21:21:56 I think the content of flushing_write_data_ can ne
mef 2016/06/14 00:20:38 Done.
308 return;
245 write_state_ = WRITING; 309 write_state_ = WRITING;
246 write_end_of_stream_ = end_of_stream; 310 sending_write_data_.swap(flushing_write_data_);
kapishnikov 2016/06/13 21:21:57 Should we add DCHECK that the content of sending_w
mef 2016/06/14 00:20:38 Done.
247 311 bidi_stream_->SendvData(sending_write_data_->write_buffer_list,
248 write_buffer_ = write_buffer; 312 sending_write_data_->write_buffer_len_list,
249 bidi_stream_->SendData(write_buffer_.get(), buffer_size, end_of_stream); 313 write_end_of_stream_);
250 } 314 }
251 315
252 void CronetBidirectionalStream::CancelOnNetworkThread() { 316 void CronetBidirectionalStream::CancelOnNetworkThread() {
253 DCHECK(environment_->IsOnNetworkThread()); 317 DCHECK(environment_->IsOnNetworkThread());
254 if (!bidi_stream_) 318 if (!bidi_stream_)
255 return; 319 return;
256 read_state_ = write_state_ = CANCELED; 320 read_state_ = write_state_ = CANCELED;
257 bidi_stream_.reset(); 321 bidi_stream_.reset();
258 delegate_->OnCanceled(); 322 delegate_->OnCanceled();
259 } 323 }
260 324
261 void CronetBidirectionalStream::DestroyOnNetworkThread() { 325 void CronetBidirectionalStream::DestroyOnNetworkThread() {
262 DCHECK(environment_->IsOnNetworkThread()); 326 DCHECK(environment_->IsOnNetworkThread());
263 delete this; 327 delete this;
264 } 328 }
265 329
266 void CronetBidirectionalStream::MaybeOnSucceded() { 330 void CronetBidirectionalStream::MaybeOnSucceded() {
267 DCHECK(environment_->IsOnNetworkThread()); 331 DCHECK(environment_->IsOnNetworkThread());
268 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) { 332 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
269 read_state_ = write_state_ = SUCCESS; 333 read_state_ = write_state_ = SUCCESS;
270 bidi_stream_.reset(); 334 bidi_stream_.reset();
271 delegate_->OnSucceeded(); 335 delegate_->OnSucceeded();
272 } 336 }
273 } 337 }
274 338
275 } // namespace cronet 339 } // namespace cronet
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698