Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: components/grpc_support/bidirectional_stream.cc

Issue 2273403003: Moving gRPC support interfaces out of cronet and into a new component. (Closed)
Patch Set: Address comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/cronet/ios/cronet_bidirectional_stream.h" 5 #include "components/grpc_support/bidirectional_stream.h"
6 6
7 #include <memory> 7 #include <memory>
8 #include <string> 8 #include <string>
9 #include <vector> 9 #include <vector>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h" 14 #include "base/memory/ref_counted.h"
15 #include "base/strings/string_number_conversions.h" 15 #include "base/strings/string_number_conversions.h"
16 #include "components/cronet/ios/cronet_environment.h"
17 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
18 #include "net/base/net_errors.h" 17 #include "net/base/net_errors.h"
19 #include "net/base/request_priority.h" 18 #include "net/base/request_priority.h"
20 #include "net/http/bidirectional_stream.h" 19 #include "net/http/bidirectional_stream.h"
21 #include "net/http/bidirectional_stream_request_info.h" 20 #include "net/http/bidirectional_stream_request_info.h"
22 #include "net/http/http_network_session.h" 21 #include "net/http/http_network_session.h"
23 #include "net/http/http_response_headers.h" 22 #include "net/http/http_response_headers.h"
24 #include "net/http/http_status_code.h" 23 #include "net/http/http_status_code.h"
25 #include "net/http/http_transaction_factory.h" 24 #include "net/http/http_transaction_factory.h"
26 #include "net/http/http_util.h" 25 #include "net/http/http_util.h"
27 #include "net/spdy/spdy_header_block.h" 26 #include "net/spdy/spdy_header_block.h"
28 #include "net/ssl/ssl_info.h" 27 #include "net/ssl/ssl_info.h"
29 #include "net/url_request/http_user_agent_settings.h" 28 #include "net/url_request/http_user_agent_settings.h"
30 #include "net/url_request/url_request_context.h" 29 #include "net/url_request/url_request_context.h"
30 #include "net/url_request/url_request_context_getter.h"
31 #include "url/gurl.h" 31 #include "url/gurl.h"
32 32
33 namespace cronet { 33 namespace grpc_support {
34 34
35 CronetBidirectionalStream::WriteBuffers::WriteBuffers() {} 35 BidirectionalStream::WriteBuffers::WriteBuffers() {}
36 36
37 CronetBidirectionalStream::WriteBuffers::~WriteBuffers() {} 37 BidirectionalStream::WriteBuffers::~WriteBuffers() {}
38 38
39 void CronetBidirectionalStream::WriteBuffers::Clear() { 39 void BidirectionalStream::WriteBuffers::Clear() {
40 write_buffer_list.clear(); 40 write_buffer_list.clear();
41 write_buffer_len_list.clear(); 41 write_buffer_len_list.clear();
42 } 42 }
43 43
44 void CronetBidirectionalStream::WriteBuffers::AppendBuffer( 44 void BidirectionalStream::WriteBuffers::AppendBuffer(
45 const scoped_refptr<net::IOBuffer>& buffer, 45 const scoped_refptr<net::IOBuffer>& buffer,
46 int buffer_size) { 46 int buffer_size) {
47 write_buffer_list.push_back(buffer); 47 write_buffer_list.push_back(buffer);
48 write_buffer_len_list.push_back(buffer_size); 48 write_buffer_len_list.push_back(buffer_size);
49 } 49 }
50 50
51 void CronetBidirectionalStream::WriteBuffers::MoveTo(WriteBuffers* target) { 51 void BidirectionalStream::WriteBuffers::MoveTo(WriteBuffers* target) {
52 std::move(write_buffer_list.begin(), write_buffer_list.end(), 52 std::move(write_buffer_list.begin(), write_buffer_list.end(),
53 std::back_inserter(target->write_buffer_list)); 53 std::back_inserter(target->write_buffer_list));
54 std::move(write_buffer_len_list.begin(), write_buffer_len_list.end(), 54 std::move(write_buffer_len_list.begin(), write_buffer_len_list.end(),
55 std::back_inserter(target->write_buffer_len_list)); 55 std::back_inserter(target->write_buffer_len_list));
56 Clear(); 56 Clear();
57 } 57 }
58 58
59 bool CronetBidirectionalStream::WriteBuffers::Empty() const { 59 bool BidirectionalStream::WriteBuffers::Empty() const {
60 return write_buffer_list.empty(); 60 return write_buffer_list.empty();
61 } 61 }
62 62
63 CronetBidirectionalStream::CronetBidirectionalStream( 63 BidirectionalStream::BidirectionalStream(
64 CronetEnvironment* environment, 64 net::URLRequestContextGetter* request_context_getter,
65 Delegate* delegate) 65 Delegate* delegate)
66 : read_state_(NOT_STARTED), 66 : read_state_(NOT_STARTED),
67 write_state_(NOT_STARTED), 67 write_state_(NOT_STARTED),
68 write_end_of_stream_(false), 68 write_end_of_stream_(false),
69 request_headers_sent_(false), 69 request_headers_sent_(false),
70 disable_auto_flush_(false), 70 disable_auto_flush_(false),
71 delay_headers_until_flush_(false), 71 delay_headers_until_flush_(false),
72 environment_(environment), 72 request_context_getter_(request_context_getter),
73 pending_write_data_(new WriteBuffers()), 73 pending_write_data_(new WriteBuffers()),
74 flushing_write_data_(new WriteBuffers()), 74 flushing_write_data_(new WriteBuffers()),
75 sending_write_data_(new WriteBuffers()), 75 sending_write_data_(new WriteBuffers()),
76 delegate_(delegate), 76 delegate_(delegate),
77 weak_factory_(this) { 77 weak_factory_(this) {
78 weak_this_ = weak_factory_.GetWeakPtr(); 78 weak_this_ = weak_factory_.GetWeakPtr();
79 } 79 }
80 80
81 CronetBidirectionalStream::~CronetBidirectionalStream() { 81 BidirectionalStream::~BidirectionalStream() {
82 DCHECK(environment_->IsOnNetworkThread()); 82 DCHECK(IsOnNetworkThread());
83 } 83 }
84 84
85 int CronetBidirectionalStream::Start(const char* url, 85 int BidirectionalStream::Start(const char* url,
86 int priority, 86 int priority,
87 const char* method, 87 const char* method,
88 const net::HttpRequestHeaders& headers, 88 const net::HttpRequestHeaders& headers,
89 bool end_of_stream) { 89 bool end_of_stream) {
90 // Prepare request info here to be able to return the error. 90 // Prepare request info here to be able to return the error.
91 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info( 91 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info(
92 new net::BidirectionalStreamRequestInfo()); 92 new net::BidirectionalStreamRequestInfo());
93 request_info->url = GURL(url); 93 request_info->url = GURL(url);
94 request_info->priority = static_cast<net::RequestPriority>(priority); 94 request_info->priority = static_cast<net::RequestPriority>(priority);
95 // Http method is a token, just as header name. 95 // Http method is a token, just as header name.
96 request_info->method = method; 96 request_info->method = method;
97 if (!net::HttpUtil::IsValidHeaderName(request_info->method)) 97 if (!net::HttpUtil::IsValidHeaderName(request_info->method))
98 return -1; 98 return -1;
99 request_info->extra_headers.CopyFrom(headers); 99 request_info->extra_headers.CopyFrom(headers);
100 request_info->end_stream_on_headers = end_of_stream; 100 request_info->end_stream_on_headers = end_of_stream;
101 write_end_of_stream_ = end_of_stream; 101 write_end_of_stream_ = end_of_stream;
102 DCHECK(environment_); 102 PostToNetworkThread(FROM_HERE,
103 environment_->PostToNetworkThread( 103 base::Bind(&BidirectionalStream::StartOnNetworkThread,
104 FROM_HERE, base::Bind(&CronetBidirectionalStream::StartOnNetworkThread, 104 weak_this_, base::Passed(&request_info)));
105 weak_this_, base::Passed(&request_info)));
106 return 0; 105 return 0;
107 } 106 }
108 107
109 bool CronetBidirectionalStream::ReadData(char* buffer, int capacity) { 108 bool BidirectionalStream::ReadData(char* buffer, int capacity) {
110 if (!buffer) 109 if (!buffer)
111 return false; 110 return false;
112 scoped_refptr<net::WrappedIOBuffer> read_buffer( 111 scoped_refptr<net::WrappedIOBuffer> read_buffer(
113 new net::WrappedIOBuffer(buffer)); 112 new net::WrappedIOBuffer(buffer));
114 113
115 environment_->PostToNetworkThread( 114 PostToNetworkThread(FROM_HERE,
116 FROM_HERE, base::Bind(&CronetBidirectionalStream::ReadDataOnNetworkThread, 115 base::Bind(&BidirectionalStream::ReadDataOnNetworkThread,
117 weak_this_, read_buffer, capacity)); 116 weak_this_, read_buffer, capacity));
118 return true; 117 return true;
119 } 118 }
120 119
121 bool CronetBidirectionalStream::WriteData(const char* buffer, 120 bool BidirectionalStream::WriteData(const char* buffer,
122 int count, 121 int count,
123 bool end_of_stream) { 122 bool end_of_stream) {
124 if (!buffer) 123 if (!buffer)
125 return false; 124 return false;
126 125
127 scoped_refptr<net::WrappedIOBuffer> write_buffer( 126 scoped_refptr<net::WrappedIOBuffer> write_buffer(
128 new net::WrappedIOBuffer(buffer)); 127 new net::WrappedIOBuffer(buffer));
129 128
130 environment_->PostToNetworkThread( 129 PostToNetworkThread(
131 FROM_HERE, 130 FROM_HERE, base::Bind(&BidirectionalStream::WriteDataOnNetworkThread,
132 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread, 131 weak_this_, write_buffer, count, end_of_stream));
133 weak_this_, write_buffer, count, end_of_stream));
134 return true; 132 return true;
135 } 133 }
136 134
137 void CronetBidirectionalStream::Flush() { 135 void BidirectionalStream::Flush() {
138 environment_->PostToNetworkThread( 136 PostToNetworkThread(
139 FROM_HERE, 137 FROM_HERE,
140 base::Bind(&CronetBidirectionalStream::FlushOnNetworkThread, weak_this_)); 138 base::Bind(&BidirectionalStream::FlushOnNetworkThread, weak_this_));
141 } 139 }
142 140
143 void CronetBidirectionalStream::Cancel() { 141 void BidirectionalStream::Cancel() {
144 environment_->PostToNetworkThread( 142 PostToNetworkThread(
145 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread, 143 FROM_HERE,
146 weak_this_)); 144 base::Bind(&BidirectionalStream::CancelOnNetworkThread, weak_this_));
147 } 145 }
148 146
149 void CronetBidirectionalStream::Destroy() { 147 void BidirectionalStream::Destroy() {
150 // Destroy could be called from any thread, including network thread (if 148 // Destroy could be called from any thread, including network thread (if
151 // posting task to executor throws an exception), but is posted, so |this| 149 // posting task to executor throws an exception), but is posted, so |this|
152 // is valid until calling task is complete. 150 // is valid until calling task is complete.
153 environment_->PostToNetworkThread( 151 PostToNetworkThread(FROM_HERE,
154 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread, 152 base::Bind(&BidirectionalStream::DestroyOnNetworkThread,
155 base::Unretained(this))); 153 base::Unretained(this)));
156 } 154 }
157 155
158 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) { 156 void BidirectionalStream::OnStreamReady(bool request_headers_sent) {
159 DCHECK(environment_->IsOnNetworkThread()); 157 DCHECK(IsOnNetworkThread());
160 DCHECK_EQ(STARTED, write_state_); 158 DCHECK_EQ(STARTED, write_state_);
161 if (!bidi_stream_) 159 if (!bidi_stream_)
162 return; 160 return;
163 request_headers_sent_ = request_headers_sent; 161 request_headers_sent_ = request_headers_sent;
164 write_state_ = WAITING_FOR_FLUSH; 162 write_state_ = WAITING_FOR_FLUSH;
165 if (write_end_of_stream_) { 163 if (write_end_of_stream_) {
166 if (!request_headers_sent) { 164 if (!request_headers_sent) {
167 // If there is no data to write, then just send headers explicitly. 165 // If there is no data to write, then just send headers explicitly.
168 bidi_stream_->SendRequestHeaders(); 166 bidi_stream_->SendRequestHeaders();
169 request_headers_sent_ = true; 167 request_headers_sent_ = true;
170 } 168 }
171 write_state_ = WRITING_DONE; 169 write_state_ = WRITING_DONE;
172 } 170 }
173 delegate_->OnStreamReady(); 171 delegate_->OnStreamReady();
174 } 172 }
175 173
176 void CronetBidirectionalStream::OnHeadersReceived( 174 void BidirectionalStream::OnHeadersReceived(
177 const net::SpdyHeaderBlock& response_headers) { 175 const net::SpdyHeaderBlock& response_headers) {
178 DCHECK(environment_->IsOnNetworkThread()); 176 DCHECK(IsOnNetworkThread());
179 DCHECK_EQ(STARTED, read_state_); 177 DCHECK_EQ(STARTED, read_state_);
180 if (!bidi_stream_) 178 if (!bidi_stream_)
181 return; 179 return;
182 read_state_ = WAITING_FOR_READ; 180 read_state_ = WAITING_FOR_READ;
183 // Get http status code from response headers. 181 // Get http status code from response headers.
184 int http_status_code = 0; 182 int http_status_code = 0;
185 const auto http_status_header = response_headers.find(":status"); 183 const auto http_status_header = response_headers.find(":status");
186 if (http_status_header != response_headers.end()) 184 if (http_status_header != response_headers.end())
187 base::StringToInt(http_status_header->second, &http_status_code); 185 base::StringToInt(http_status_header->second, &http_status_code);
188 const char* protocol = "unknown"; 186 const char* protocol = "unknown";
189 switch (bidi_stream_->GetProtocol()) { 187 switch (bidi_stream_->GetProtocol()) {
190 case net::kProtoHTTP2: 188 case net::kProtoHTTP2:
191 protocol = "h2"; 189 protocol = "h2";
192 break; 190 break;
193 case net::kProtoQUIC: 191 case net::kProtoQUIC:
194 protocol = "quic/1+spdy/3"; 192 protocol = "quic/1+spdy/3";
195 break; 193 break;
196 default: 194 default:
197 break; 195 break;
198 } 196 }
199 delegate_->OnHeadersReceived(response_headers, protocol); 197 delegate_->OnHeadersReceived(response_headers, protocol);
200 } 198 }
201 199
202 void CronetBidirectionalStream::OnDataRead(int bytes_read) { 200 void BidirectionalStream::OnDataRead(int bytes_read) {
203 DCHECK(environment_->IsOnNetworkThread()); 201 DCHECK(IsOnNetworkThread());
204 DCHECK_EQ(READING, read_state_); 202 DCHECK_EQ(READING, read_state_);
205 if (!bidi_stream_) 203 if (!bidi_stream_)
206 return; 204 return;
207 read_state_ = WAITING_FOR_READ; 205 read_state_ = WAITING_FOR_READ;
208 delegate_->OnDataRead(read_buffer_->data(), bytes_read); 206 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
209 207
210 // Free the read buffer. 208 // Free the read buffer.
211 read_buffer_ = nullptr; 209 read_buffer_ = nullptr;
212 if (bytes_read == 0) 210 if (bytes_read == 0)
213 read_state_ = READING_DONE; 211 read_state_ = READING_DONE;
214 MaybeOnSucceded(); 212 MaybeOnSucceded();
215 } 213 }
216 214
217 void CronetBidirectionalStream::OnDataSent() { 215 void BidirectionalStream::OnDataSent() {
218 DCHECK(environment_->IsOnNetworkThread()); 216 DCHECK(IsOnNetworkThread());
219 if (!bidi_stream_) 217 if (!bidi_stream_)
220 return; 218 return;
221 DCHECK_EQ(WRITING, write_state_); 219 DCHECK_EQ(WRITING, write_state_);
222 write_state_ = WAITING_FOR_FLUSH; 220 write_state_ = WAITING_FOR_FLUSH;
223 for (const scoped_refptr<net::IOBuffer>& buffer : 221 for (const scoped_refptr<net::IOBuffer>& buffer :
224 sending_write_data_->buffers()) { 222 sending_write_data_->buffers()) {
225 delegate_->OnDataSent(buffer->data()); 223 delegate_->OnDataSent(buffer->data());
226 } 224 }
227 sending_write_data_->Clear(); 225 sending_write_data_->Clear();
228 // Send data flushed while other data was sending. 226 // Send data flushed while other data was sending.
229 if (!flushing_write_data_->Empty()) { 227 if (!flushing_write_data_->Empty()) {
230 SendFlushingWriteData(); 228 SendFlushingWriteData();
231 return; 229 return;
232 } 230 }
233 if (write_end_of_stream_ && pending_write_data_->Empty()) { 231 if (write_end_of_stream_ && pending_write_data_->Empty()) {
234 write_state_ = WRITING_DONE; 232 write_state_ = WRITING_DONE;
235 MaybeOnSucceded(); 233 MaybeOnSucceded();
236 } 234 }
237 } 235 }
238 236
239 void CronetBidirectionalStream::OnTrailersReceived( 237 void BidirectionalStream::OnTrailersReceived(
240 const net::SpdyHeaderBlock& response_trailers) { 238 const net::SpdyHeaderBlock& response_trailers) {
241 DCHECK(environment_->IsOnNetworkThread()); 239 DCHECK(IsOnNetworkThread());
242 if (!bidi_stream_) 240 if (!bidi_stream_)
243 return; 241 return;
244 delegate_->OnTrailersReceived(response_trailers); 242 delegate_->OnTrailersReceived(response_trailers);
245 } 243 }
246 244
247 void CronetBidirectionalStream::OnFailed(int error) { 245 void BidirectionalStream::OnFailed(int error) {
248 DCHECK(environment_->IsOnNetworkThread()); 246 DCHECK(IsOnNetworkThread());
249 if (!bidi_stream_ && read_state_ != NOT_STARTED) 247 if (!bidi_stream_ && read_state_ != NOT_STARTED)
250 return; 248 return;
251 read_state_ = write_state_ = ERROR; 249 read_state_ = write_state_ = ERR;
252 weak_factory_.InvalidateWeakPtrs(); 250 weak_factory_.InvalidateWeakPtrs();
253 // Delete underlying |bidi_stream_| asynchronously as it may still be used. 251 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
254 environment_->PostToNetworkThread( 252 PostToNetworkThread(FROM_HERE,
255 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>, 253 base::Bind(&base::DeletePointer<net::BidirectionalStream>,
256 bidi_stream_.release())); 254 bidi_stream_.release()));
257 delegate_->OnFailed(error); 255 delegate_->OnFailed(error);
258 } 256 }
259 257
260 void CronetBidirectionalStream::StartOnNetworkThread( 258 void BidirectionalStream::StartOnNetworkThread(
261 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) { 259 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
262 DCHECK(environment_->IsOnNetworkThread()); 260 DCHECK(IsOnNetworkThread());
263 DCHECK(!bidi_stream_); 261 DCHECK(!bidi_stream_);
264 DCHECK(environment_->GetURLRequestContext()); 262 DCHECK(request_context_getter_->GetURLRequestContext());
263 net::URLRequestContext* request_context =
264 request_context_getter_->GetURLRequestContext();
265 request_info->extra_headers.SetHeaderIfMissing( 265 request_info->extra_headers.SetHeaderIfMissing(
266 net::HttpRequestHeaders::kUserAgent, environment_->user_agent()); 266 net::HttpRequestHeaders::kUserAgent,
267 request_context->http_user_agent_settings()->GetUserAgent());
267 bidi_stream_.reset(new net::BidirectionalStream( 268 bidi_stream_.reset(new net::BidirectionalStream(
268 std::move(request_info), environment_->GetURLRequestContext() 269 std::move(request_info),
269 ->http_transaction_factory() 270 request_context->http_transaction_factory()->GetSession(),
270 ->GetSession(),
271 !delay_headers_until_flush_, this)); 271 !delay_headers_until_flush_, this));
272 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED); 272 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
273 read_state_ = write_state_ = STARTED; 273 read_state_ = write_state_ = STARTED;
274 } 274 }
275 275
276 void CronetBidirectionalStream::ReadDataOnNetworkThread( 276 void BidirectionalStream::ReadDataOnNetworkThread(
277 scoped_refptr<net::WrappedIOBuffer> read_buffer, 277 scoped_refptr<net::WrappedIOBuffer> read_buffer,
278 int buffer_size) { 278 int buffer_size) {
279 DCHECK(environment_->IsOnNetworkThread()); 279 DCHECK(IsOnNetworkThread());
280 DCHECK(read_buffer); 280 DCHECK(read_buffer);
281 DCHECK(!read_buffer_); 281 DCHECK(!read_buffer_);
282 if (read_state_ != WAITING_FOR_READ) { 282 if (read_state_ != WAITING_FOR_READ) {
283 DLOG(ERROR) << "Unexpected Read Data in read_state " << read_state_; 283 DLOG(ERROR) << "Unexpected Read Data in read_state " << read_state_;
284 // Invoke OnFailed unless it is already invoked. 284 // Invoke OnFailed unless it is already invoked.
285 if (read_state_ != ERROR) 285 if (read_state_ != ERR)
286 OnFailed(net::ERR_UNEXPECTED); 286 OnFailed(net::ERR_UNEXPECTED);
287 return; 287 return;
288 } 288 }
289 read_state_ = READING; 289 read_state_ = READING;
290 read_buffer_ = read_buffer; 290 read_buffer_ = read_buffer;
291 291
292 int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size); 292 int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size);
293 // If IO is pending, wait for the BidirectionalStream to call OnDataRead. 293 // If IO is pending, wait for the BidirectionalStream to call OnDataRead.
294 if (bytes_read == net::ERR_IO_PENDING) 294 if (bytes_read == net::ERR_IO_PENDING)
295 return; 295 return;
296 296
297 if (bytes_read < 0) { 297 if (bytes_read < 0) {
298 OnFailed(bytes_read); 298 OnFailed(bytes_read);
299 return; 299 return;
300 } 300 }
301 OnDataRead(bytes_read); 301 OnDataRead(bytes_read);
302 } 302 }
303 303
304 void CronetBidirectionalStream::WriteDataOnNetworkThread( 304 void BidirectionalStream::WriteDataOnNetworkThread(
305 scoped_refptr<net::WrappedIOBuffer> write_buffer, 305 scoped_refptr<net::WrappedIOBuffer> write_buffer,
306 int buffer_size, 306 int buffer_size,
307 bool end_of_stream) { 307 bool end_of_stream) {
308 DCHECK(environment_->IsOnNetworkThread()); 308 DCHECK(IsOnNetworkThread());
309 DCHECK(write_buffer); 309 DCHECK(write_buffer);
310 DCHECK(!write_end_of_stream_); 310 DCHECK(!write_end_of_stream_);
311 if (!bidi_stream_ || write_end_of_stream_) { 311 if (!bidi_stream_ || write_end_of_stream_) {
312 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_; 312 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
313 // Invoke OnFailed unless it is already invoked. 313 // Invoke OnFailed unless it is already invoked.
314 if (write_state_ != ERROR) 314 if (write_state_ != ERR)
315 OnFailed(net::ERR_UNEXPECTED); 315 OnFailed(net::ERR_UNEXPECTED);
316 return; 316 return;
317 } 317 }
318 pending_write_data_->AppendBuffer(write_buffer, buffer_size); 318 pending_write_data_->AppendBuffer(write_buffer, buffer_size);
319 write_end_of_stream_ = end_of_stream; 319 write_end_of_stream_ = end_of_stream;
320 if (!disable_auto_flush_) 320 if (!disable_auto_flush_)
321 FlushOnNetworkThread(); 321 FlushOnNetworkThread();
322 } 322 }
323 323
324 void CronetBidirectionalStream::FlushOnNetworkThread() { 324 void BidirectionalStream::FlushOnNetworkThread() {
325 DCHECK(environment_->IsOnNetworkThread()); 325 DCHECK(IsOnNetworkThread());
326 if (!bidi_stream_) 326 if (!bidi_stream_)
327 return; 327 return;
328 // If there is no data to flush, may need to send headers. 328 // If there is no data to flush, may need to send headers.
329 if (pending_write_data_->Empty()) { 329 if (pending_write_data_->Empty()) {
330 if (!request_headers_sent_) { 330 if (!request_headers_sent_) {
331 request_headers_sent_ = true; 331 request_headers_sent_ = true;
332 bidi_stream_->SendRequestHeaders(); 332 bidi_stream_->SendRequestHeaders();
333 } 333 }
334 return; 334 return;
335 } 335 }
336 // If request headers are not sent yet, they will be sent with the data. 336 // If request headers are not sent yet, they will be sent with the data.
337 if (!request_headers_sent_) 337 if (!request_headers_sent_)
338 request_headers_sent_ = true; 338 request_headers_sent_ = true;
339 339
340 // Move pending data to the flushing list. 340 // Move pending data to the flushing list.
341 pending_write_data_->MoveTo(flushing_write_data_.get()); 341 pending_write_data_->MoveTo(flushing_write_data_.get());
342 DCHECK(pending_write_data_->Empty()); 342 DCHECK(pending_write_data_->Empty());
343 if (write_state_ != WRITING) 343 if (write_state_ != WRITING)
344 SendFlushingWriteData(); 344 SendFlushingWriteData();
345 } 345 }
346 346
347 void CronetBidirectionalStream::SendFlushingWriteData() { 347 void BidirectionalStream::SendFlushingWriteData() {
348 DCHECK(bidi_stream_); 348 DCHECK(bidi_stream_);
349 // If previous send is not done, or there is nothing to flush, then exit. 349 // If previous send is not done, or there is nothing to flush, then exit.
350 if (write_state_ == WRITING || flushing_write_data_->Empty()) 350 if (write_state_ == WRITING || flushing_write_data_->Empty())
351 return; 351 return;
352 DCHECK(sending_write_data_->Empty()); 352 DCHECK(sending_write_data_->Empty());
353 write_state_ = WRITING; 353 write_state_ = WRITING;
354 flushing_write_data_->MoveTo(sending_write_data_.get()); 354 flushing_write_data_->MoveTo(sending_write_data_.get());
355 bidi_stream_->SendvData(sending_write_data_->buffers(), 355 bidi_stream_->SendvData(sending_write_data_->buffers(),
356 sending_write_data_->lengths(), 356 sending_write_data_->lengths(),
357 write_end_of_stream_ && pending_write_data_->Empty()); 357 write_end_of_stream_ && pending_write_data_->Empty());
358 } 358 }
359 359
360 void CronetBidirectionalStream::CancelOnNetworkThread() { 360 void BidirectionalStream::CancelOnNetworkThread() {
361 DCHECK(environment_->IsOnNetworkThread()); 361 DCHECK(IsOnNetworkThread());
362 if (!bidi_stream_) 362 if (!bidi_stream_)
363 return; 363 return;
364 read_state_ = write_state_ = CANCELED; 364 read_state_ = write_state_ = CANCELED;
365 bidi_stream_.reset(); 365 bidi_stream_.reset();
366 weak_factory_.InvalidateWeakPtrs(); 366 weak_factory_.InvalidateWeakPtrs();
367 delegate_->OnCanceled(); 367 delegate_->OnCanceled();
368 } 368 }
369 369
370 void CronetBidirectionalStream::DestroyOnNetworkThread() { 370 void BidirectionalStream::DestroyOnNetworkThread() {
371 DCHECK(environment_->IsOnNetworkThread()); 371 DCHECK(IsOnNetworkThread());
372 delete this; 372 delete this;
373 } 373 }
374 374
375 void CronetBidirectionalStream::MaybeOnSucceded() { 375 void BidirectionalStream::MaybeOnSucceded() {
376 DCHECK(environment_->IsOnNetworkThread()); 376 DCHECK(IsOnNetworkThread());
377 if (!bidi_stream_) 377 if (!bidi_stream_)
378 return; 378 return;
379 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) { 379 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
380 read_state_ = write_state_ = SUCCESS; 380 read_state_ = write_state_ = SUCCESS;
381 weak_factory_.InvalidateWeakPtrs(); 381 weak_factory_.InvalidateWeakPtrs();
382 // Delete underlying |bidi_stream_| asynchronously as it may still be used. 382 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
383 environment_->PostToNetworkThread( 383 PostToNetworkThread(
384 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>, 384 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>,
385 bidi_stream_.release())); 385 bidi_stream_.release()));
386 delegate_->OnSucceeded(); 386 delegate_->OnSucceeded();
387 } 387 }
388 } 388 }
389 389
390 bool BidirectionalStream::IsOnNetworkThread() {
391 return request_context_getter_->GetNetworkTaskRunner()
392 ->BelongsToCurrentThread();
393 }
394
395 void BidirectionalStream::PostToNetworkThread(
396 const tracked_objects::Location& from_here,
397 const base::Closure& task) {
398 request_context_getter_->GetNetworkTaskRunner()->PostTask(from_here, task);
399 }
400
390 } // namespace cronet 401 } // namespace cronet
OLDNEW
« no previous file with comments | « components/grpc_support/bidirectional_stream.h ('k') | components/grpc_support/bidirectional_stream_c.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698