Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: components/cronet/ios/cronet_bidirectional_stream.cc

Issue 2050483002: [Cronet] Coalesce small buffers into single QUIC packet in GRPC on iOS. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove flushing_write_data_. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/cronet/ios/cronet_bidirectional_stream.h" 5 #include "components/cronet/ios/cronet_bidirectional_stream.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <string> 8 #include <string>
9 #include <vector> 9 #include <vector>
10 10
(...skipping 14 matching lines...) Expand all
25 #include "net/http/http_transaction_factory.h" 25 #include "net/http/http_transaction_factory.h"
26 #include "net/http/http_util.h" 26 #include "net/http/http_util.h"
27 #include "net/spdy/spdy_header_block.h" 27 #include "net/spdy/spdy_header_block.h"
28 #include "net/ssl/ssl_info.h" 28 #include "net/ssl/ssl_info.h"
29 #include "net/url_request/http_user_agent_settings.h" 29 #include "net/url_request/http_user_agent_settings.h"
30 #include "net/url_request/url_request_context.h" 30 #include "net/url_request/url_request_context.h"
31 #include "url/gurl.h" 31 #include "url/gurl.h"
32 32
33 namespace cronet { 33 namespace cronet {
34 34
35 CronetBidirectionalStream::WriteBuffers::WriteBuffers() {}
36
37 CronetBidirectionalStream::WriteBuffers::~WriteBuffers() {}
38
39 void CronetBidirectionalStream::WriteBuffers::Clear() {
40 write_buffer_list.clear();
41 write_buffer_len_list.clear();
42 }
43
44 void CronetBidirectionalStream::WriteBuffers::AppendBuffer(
45 const scoped_refptr<net::IOBuffer>& buffer,
46 int buffer_size) {
47 write_buffer_list.push_back(buffer);
48 write_buffer_len_list.push_back(buffer_size);
49 }
50
51 bool CronetBidirectionalStream::WriteBuffers::Empty() const {
52 return write_buffer_list.empty();
53 }
54
35 CronetBidirectionalStream::CronetBidirectionalStream( 55 CronetBidirectionalStream::CronetBidirectionalStream(
36 CronetEnvironment* environment, 56 CronetEnvironment* environment,
37 Delegate* delegate) 57 Delegate* delegate)
38 : read_state_(NOT_STARTED), 58 : read_state_(NOT_STARTED),
39 write_state_(NOT_STARTED), 59 write_state_(NOT_STARTED),
40 write_end_of_stream_(false), 60 write_end_of_stream_(false),
61 request_headers_sent_(false),
62 disable_auto_flush_(false),
63 delay_headers_until_flush_(false),
41 environment_(environment), 64 environment_(environment),
65 pending_write_data_(new WriteBuffers()),
66 sending_write_data_(new WriteBuffers()),
42 delegate_(delegate) {} 67 delegate_(delegate) {}
43 68
44 CronetBidirectionalStream::~CronetBidirectionalStream() { 69 CronetBidirectionalStream::~CronetBidirectionalStream() {
45 DCHECK(environment_->IsOnNetworkThread()); 70 DCHECK(environment_->IsOnNetworkThread());
46 } 71 }
47 72
48 int CronetBidirectionalStream::Start(const char* url, 73 int CronetBidirectionalStream::Start(const char* url,
49 int priority, 74 int priority,
50 const char* method, 75 const char* method,
51 const net::HttpRequestHeaders& headers, 76 const net::HttpRequestHeaders& headers,
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 scoped_refptr<net::WrappedIOBuffer> write_buffer( 116 scoped_refptr<net::WrappedIOBuffer> write_buffer(
92 new net::WrappedIOBuffer(buffer)); 117 new net::WrappedIOBuffer(buffer));
93 118
94 environment_->PostToNetworkThread( 119 environment_->PostToNetworkThread(
95 FROM_HERE, 120 FROM_HERE,
96 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread, 121 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread,
97 base::Unretained(this), write_buffer, count, end_of_stream)); 122 base::Unretained(this), write_buffer, count, end_of_stream));
98 return true; 123 return true;
99 } 124 }
100 125
126 void CronetBidirectionalStream::Flush() {
127 environment_->PostToNetworkThread(
128 FROM_HERE, base::Bind(&CronetBidirectionalStream::FlushOnNetworkThread,
129 base::Unretained(this)));
130 }
131
101 void CronetBidirectionalStream::Cancel() { 132 void CronetBidirectionalStream::Cancel() {
102 environment_->PostToNetworkThread( 133 environment_->PostToNetworkThread(
103 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread, 134 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread,
104 base::Unretained(this))); 135 base::Unretained(this)));
105 } 136 }
106 137
107 void CronetBidirectionalStream::Destroy() { 138 void CronetBidirectionalStream::Destroy() {
108 // Destroy could be called from any thread, including network thread (if 139 // Destroy could be called from any thread, including network thread (if
109 // posting task to executor throws an exception), but is posted, so |this| 140 // posting task to executor throws an exception), but is posted, so |this|
110 // is valid until calling task is complete. 141 // is valid until calling task is complete.
111 environment_->PostToNetworkThread( 142 environment_->PostToNetworkThread(
112 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread, 143 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread,
113 base::Unretained(this))); 144 base::Unretained(this)));
114 } 145 }
115 146
116 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) { 147 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) {
117 DCHECK(environment_->IsOnNetworkThread()); 148 DCHECK(environment_->IsOnNetworkThread());
118 DCHECK(write_state_ == STARTED); 149 DCHECK_EQ(STARTED, write_state_);
119 write_state_ = WAITING_FOR_WRITE; 150 request_headers_sent_ = request_headers_sent;
120 if (write_end_of_stream_) 151 write_state_ = WAITING_FOR_FLUSH;
152 if (write_end_of_stream_) {
153 if (!request_headers_sent) {
154 // If there is no data to write, then just send headers explicitly.
155 bidi_stream_->SendRequestHeaders();
156 request_headers_sent_ = true;
157 }
121 write_state_ = WRITING_DONE; 158 write_state_ = WRITING_DONE;
159 }
122 delegate_->OnStreamReady(); 160 delegate_->OnStreamReady();
123 } 161 }
124 162
125 void CronetBidirectionalStream::OnHeadersReceived( 163 void CronetBidirectionalStream::OnHeadersReceived(
126 const net::SpdyHeaderBlock& response_headers) { 164 const net::SpdyHeaderBlock& response_headers) {
127 DCHECK(environment_->IsOnNetworkThread()); 165 DCHECK(environment_->IsOnNetworkThread());
128 DCHECK(read_state_ == STARTED); 166 DCHECK_EQ(STARTED, read_state_);
129 read_state_ = WAITING_FOR_READ; 167 read_state_ = WAITING_FOR_READ;
130 // Get http status code from response headers. 168 // Get http status code from response headers.
131 int http_status_code = 0; 169 int http_status_code = 0;
132 const auto http_status_header = response_headers.find(":status"); 170 const auto http_status_header = response_headers.find(":status");
133 if (http_status_header != response_headers.end()) 171 if (http_status_header != response_headers.end())
134 base::StringToInt(http_status_header->second, &http_status_code); 172 base::StringToInt(http_status_header->second, &http_status_code);
135 const char* protocol = "unknown"; 173 const char* protocol = "unknown";
136 switch (bidi_stream_->GetProtocol()) { 174 switch (bidi_stream_->GetProtocol()) {
137 case net::kProtoHTTP2: 175 case net::kProtoHTTP2:
138 protocol = "h2"; 176 protocol = "h2";
139 break; 177 break;
140 case net::kProtoQUIC1SPDY3: 178 case net::kProtoQUIC1SPDY3:
141 protocol = "quic/1+spdy/3"; 179 protocol = "quic/1+spdy/3";
142 break; 180 break;
143 default: 181 default:
144 break; 182 break;
145 } 183 }
146 delegate_->OnHeadersReceived(response_headers, protocol); 184 delegate_->OnHeadersReceived(response_headers, protocol);
147 } 185 }
148 186
149 void CronetBidirectionalStream::OnDataRead(int bytes_read) { 187 void CronetBidirectionalStream::OnDataRead(int bytes_read) {
150 DCHECK(environment_->IsOnNetworkThread()); 188 DCHECK(environment_->IsOnNetworkThread());
151 DCHECK(read_state_ == READING); 189 DCHECK_EQ(READING, read_state_);
152 read_state_ = WAITING_FOR_READ; 190 read_state_ = WAITING_FOR_READ;
153 delegate_->OnDataRead(read_buffer_->data(), bytes_read); 191 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
154 192
155 // Free the read buffer. 193 // Free the read buffer.
156 read_buffer_ = nullptr; 194 read_buffer_ = nullptr;
157 if (bytes_read == 0) 195 if (bytes_read == 0)
158 read_state_ = READING_DONE; 196 read_state_ = READING_DONE;
159 MaybeOnSucceded(); 197 MaybeOnSucceded();
160 } 198 }
161 199
162 void CronetBidirectionalStream::OnDataSent() { 200 void CronetBidirectionalStream::OnDataSent() {
163 DCHECK(environment_->IsOnNetworkThread()); 201 DCHECK(environment_->IsOnNetworkThread());
164 DCHECK(write_state_ == WRITING); 202 DCHECK_EQ(WRITING, write_state_);
165 write_state_ = WAITING_FOR_WRITE; 203 write_state_ = WAITING_FOR_FLUSH;
166 delegate_->OnDataSent(write_buffer_->data()); 204 for (scoped_refptr<net::IOBuffer> buffer :
xunjieli 2016/06/14 14:29:34 Does const & work here? I am not sure doing this w
mef 2016/06/14 19:54:30 Done.
167 // Free the write buffer. 205 sending_write_data_->write_buffer_list) {
168 write_buffer_ = nullptr; 206 delegate_->OnDataSent(buffer->data());
207 }
208 sending_write_data_->Clear();
209 // Send data flushed while other data was sending.
210 if (!pending_write_data_->Empty()) {
xunjieli 2016/06/14 14:29:34 I am not exactly sure if we should auto flush |pen
kapishnikov 2016/06/14 14:41:58 Can it result in flushing data that were written b
kapishnikov 2016/06/14 14:50:25 Sorry Helen, I didn't see your comment, I think we
mef 2016/06/14 19:54:30 Done.
211 FlushOnNetworkThread();
212 return;
213 }
169 if (write_end_of_stream_) 214 if (write_end_of_stream_)
170 write_state_ = WRITING_DONE; 215 write_state_ = WRITING_DONE;
171 MaybeOnSucceded(); 216 MaybeOnSucceded();
172 } 217 }
173 218
174 void CronetBidirectionalStream::OnTrailersReceived( 219 void CronetBidirectionalStream::OnTrailersReceived(
175 const net::SpdyHeaderBlock& response_trailers) { 220 const net::SpdyHeaderBlock& response_trailers) {
176 DCHECK(environment_->IsOnNetworkThread()); 221 DCHECK(environment_->IsOnNetworkThread());
177 delegate_->OnTrailersReceived(response_trailers); 222 delegate_->OnTrailersReceived(response_trailers);
178 } 223 }
179 224
180 void CronetBidirectionalStream::OnFailed(int error) { 225 void CronetBidirectionalStream::OnFailed(int error) {
181 DCHECK(environment_->IsOnNetworkThread()); 226 DCHECK(environment_->IsOnNetworkThread());
182 bidi_stream_.reset(); 227 bidi_stream_.reset();
183 read_state_ = write_state_ = ERROR; 228 read_state_ = write_state_ = ERROR;
184 delegate_->OnFailed(error); 229 delegate_->OnFailed(error);
185 } 230 }
186 231
187 void CronetBidirectionalStream::StartOnNetworkThread( 232 void CronetBidirectionalStream::StartOnNetworkThread(
188 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) { 233 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
189 DCHECK(environment_->IsOnNetworkThread()); 234 DCHECK(environment_->IsOnNetworkThread());
190 DCHECK(!bidi_stream_); 235 DCHECK(!bidi_stream_);
191 DCHECK(environment_->GetURLRequestContext()); 236 DCHECK(environment_->GetURLRequestContext());
192 request_info->extra_headers.SetHeaderIfMissing( 237 request_info->extra_headers.SetHeaderIfMissing(
193 net::HttpRequestHeaders::kUserAgent, environment_->user_agent()); 238 net::HttpRequestHeaders::kUserAgent, environment_->user_agent());
194 bidi_stream_.reset(new net::BidirectionalStream( 239 bidi_stream_.reset(new net::BidirectionalStream(
195 std::move(request_info), environment_->GetURLRequestContext() 240 std::move(request_info), environment_->GetURLRequestContext()
196 ->http_transaction_factory() 241 ->http_transaction_factory()
197 ->GetSession(), 242 ->GetSession(),
198 /*send_request_headers_automatically=*/true, this)); 243 !delay_headers_until_flush_, this));
199 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED); 244 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
200 read_state_ = write_state_ = STARTED; 245 read_state_ = write_state_ = STARTED;
201 } 246 }
202 247
203 void CronetBidirectionalStream::ReadDataOnNetworkThread( 248 void CronetBidirectionalStream::ReadDataOnNetworkThread(
204 scoped_refptr<net::WrappedIOBuffer> read_buffer, 249 scoped_refptr<net::WrappedIOBuffer> read_buffer,
205 int buffer_size) { 250 int buffer_size) {
206 DCHECK(environment_->IsOnNetworkThread()); 251 DCHECK(environment_->IsOnNetworkThread());
207 DCHECK(read_buffer); 252 DCHECK(read_buffer);
208 DCHECK(!read_buffer_); 253 DCHECK(!read_buffer_);
(...skipping 18 matching lines...) Expand all
227 } 272 }
228 OnDataRead(bytes_read); 273 OnDataRead(bytes_read);
229 } 274 }
230 275
231 void CronetBidirectionalStream::WriteDataOnNetworkThread( 276 void CronetBidirectionalStream::WriteDataOnNetworkThread(
232 scoped_refptr<net::WrappedIOBuffer> write_buffer, 277 scoped_refptr<net::WrappedIOBuffer> write_buffer,
233 int buffer_size, 278 int buffer_size,
234 bool end_of_stream) { 279 bool end_of_stream) {
235 DCHECK(environment_->IsOnNetworkThread()); 280 DCHECK(environment_->IsOnNetworkThread());
236 DCHECK(write_buffer); 281 DCHECK(write_buffer);
237 DCHECK(!write_buffer_); 282 DCHECK(!write_end_of_stream_);
238 if (write_state_ != WAITING_FOR_WRITE) { 283 if (!bidi_stream_ || write_end_of_stream_) {
239 DLOG(ERROR) << "Unexpected Write Data in write_state " << write_state_; 284 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
240 // Invoke OnFailed unless it is already invoked. 285 // Invoke OnFailed unless it is already invoked.
241 if (write_state_ != ERROR) 286 if (write_state_ != ERROR)
242 OnFailed(net::ERR_UNEXPECTED); 287 OnFailed(net::ERR_UNEXPECTED);
243 return; 288 return;
244 } 289 }
290 pending_write_data_->AppendBuffer(write_buffer, buffer_size);
291 write_end_of_stream_ = end_of_stream;
292 if (!disable_auto_flush_)
293 FlushOnNetworkThread();
294 }
295
296 void CronetBidirectionalStream::FlushOnNetworkThread() {
297 DCHECK(environment_->IsOnNetworkThread());
298 if (!bidi_stream_)
299 return;
300 // If there is no data to flush, may need to send headers.
301 if (pending_write_data_->Empty()) {
302 if (!request_headers_sent_) {
303 request_headers_sent_ = true;
304 bidi_stream_->SendRequestHeaders();
305 }
306 return;
307 }
308 // If request headers are not sent yet, they will be sent with the data.
309 request_headers_sent_ = true;
xunjieli 2016/06/14 14:29:34 Maybe move this line inside of "if (!request_heade
mef 2016/06/14 19:54:30 Done.
310 // If previous send is not done, or there is nothing to flush, then exit.
311 if (write_state_ == WRITING)
312 return;
313 DCHECK(!pending_write_data_->Empty());
314 DCHECK(sending_write_data_->Empty());
245 write_state_ = WRITING; 315 write_state_ = WRITING;
246 write_end_of_stream_ = end_of_stream; 316 sending_write_data_.swap(pending_write_data_);
247 317 bidi_stream_->SendvData(sending_write_data_->write_buffer_list,
248 write_buffer_ = write_buffer; 318 sending_write_data_->write_buffer_len_list,
249 bidi_stream_->SendData(write_buffer_.get(), buffer_size, end_of_stream); 319 write_end_of_stream_);
250 } 320 }
251 321
252 void CronetBidirectionalStream::CancelOnNetworkThread() { 322 void CronetBidirectionalStream::CancelOnNetworkThread() {
253 DCHECK(environment_->IsOnNetworkThread()); 323 DCHECK(environment_->IsOnNetworkThread());
254 if (!bidi_stream_) 324 if (!bidi_stream_)
255 return; 325 return;
256 read_state_ = write_state_ = CANCELED; 326 read_state_ = write_state_ = CANCELED;
257 bidi_stream_.reset(); 327 bidi_stream_.reset();
258 delegate_->OnCanceled(); 328 delegate_->OnCanceled();
259 } 329 }
260 330
261 void CronetBidirectionalStream::DestroyOnNetworkThread() { 331 void CronetBidirectionalStream::DestroyOnNetworkThread() {
262 DCHECK(environment_->IsOnNetworkThread()); 332 DCHECK(environment_->IsOnNetworkThread());
263 delete this; 333 delete this;
264 } 334 }
265 335
266 void CronetBidirectionalStream::MaybeOnSucceded() { 336 void CronetBidirectionalStream::MaybeOnSucceded() {
267 DCHECK(environment_->IsOnNetworkThread()); 337 DCHECK(environment_->IsOnNetworkThread());
268 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) { 338 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
269 read_state_ = write_state_ = SUCCESS; 339 read_state_ = write_state_ = SUCCESS;
270 bidi_stream_.reset(); 340 bidi_stream_.reset();
271 delegate_->OnSucceeded(); 341 delegate_->OnSucceeded();
272 } 342 }
273 } 343 }
274 344
275 } // namespace cronet 345 } // namespace cronet
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698