Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(507)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Use SPDY_BIDIRECTIONAL_STREAM Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/request_priority.h"
12 #include "net/spdy/spdy_buffer.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15 #include "net/spdy/spdy_stream.h"
16
17 namespace net {
18
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
20
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
22 const base::WeakPtr<SpdySession>& spdy_session,
23 scoped_ptr<base::Timer> timer)
24 : spdy_session_(spdy_session),
25 timer_(timer.release()),
26 stream_closed_(false),
27 closed_stream_status_(ERR_FAILED),
28 more_read_data_pending_(false),
29 weak_factory_(this) {}
30
31 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
32 const base::WeakPtr<SpdySession>& spdy_session)
33 : BidirectionalStreamSpdyJob(
34 spdy_session,
35 make_scoped_ptr(new base::Timer(false, false))) {}
36
37 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
38 if (stream_.get()) {
39 stream_->DetachDelegate();
40 DCHECK(!stream_.get());
41 }
42 }
43
44 void BidirectionalStreamSpdyJob::Start(
45 const HttpRequestInfo& request_info,
46 RequestPriority priority,
47 const BoundNetLog& net_log,
48 BidirectionalStreamJob::Delegate* delegate) {
49 delegate_ = delegate;
50 DCHECK(!stream_);
51 if (!spdy_session_) {
52 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
53 return;
54 }
55
56 request_info_ = request_info;
57
58 int rv = stream_request_.StartRequest(
59 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority,
60 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
61 weak_factory_.GetWeakPtr()));
62 if (rv != ERR_IO_PENDING)
63 OnStreamInitialized(rv);
64 }
65
66 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
67 if (stream_.get())
68 CHECK(!stream_->IsIdle());
69
70 CHECK(buf);
71 CHECK(buf_len);
72 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
73
74 if (!stream_closed_)
75 CHECK(stream_);
76
77 // If there is data buffered, complete the IO immediately.
78 if (!data_queue_.IsEmpty()) {
79 return data_queue_.Dequeue(buf->data(), buf_len);
80 } else if (stream_closed_) {
81 return closed_stream_status_;
82 }
83 // Read will complete asynchronously and Delegate::OnReadCompleted will be
84 // called upon completion.
85 user_buffer_ = buf;
86 user_buffer_len_ = buf_len;
87 return ERR_IO_PENDING;
88 }
89
90 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
91 int length,
92 bool end_stream) {
93 CHECK(!stream_closed_);
94 CHECK(stream_);
95
96 stream_->SendData(data, length,
97 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
98 }
99
100 void BidirectionalStreamSpdyJob::Cancel() {
101 if (!stream_)
102 return;
103 // Cancels the stream and detaches the delegate so it doesn't get called back.
104 stream_->DetachDelegate();
105 DCHECK(!stream_);
106 }
107
108 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
109 delegate_->OnRequestHeadersSent();
110 }
111
112 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
113 const SpdyHeaderBlock& response_headers) {
114 delegate_->OnHeaders(response_headers);
115 return RESPONSE_HEADERS_ARE_COMPLETE;
116 }
117
118 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
119 DCHECK(stream_);
120 DCHECK(!stream_closed_);
121
122 if (buffer) {
123 data_queue_.Enqueue(buffer.Pass());
124 if (user_buffer_) {
125 // Handing small chunks of data to the caller creates measurable overhead.
126 // So buffer data in short time-spans and send a single read notification.
127 ScheduleBufferedRead();
128 }
129 }
130 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
131 // SpdyStream to indicate the end of stream.
132 }
133
134 void BidirectionalStreamSpdyJob::OnDataSent() {
135 DCHECK(stream_);
136 DCHECK(!stream_closed_);
137
138 delegate_->OnDataSent();
139 }
140
141 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
142 DCHECK(stream_);
143 DCHECK(!stream_closed_);
144
145 delegate_->OnTrailers(trailers);
146 }
147
148 void BidirectionalStreamSpdyJob::OnClose(int status) {
149 DCHECK(stream_);
150
151 stream_closed_ = true;
152 closed_stream_status_ = status;
153 stream_.reset();
154
155 if (status != OK) {
156 delegate_->OnFailed(status);
157 return;
158 }
159 // Complete any remaining read, as all data has been buffered.
160 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will
161 // do nothing.
162 if (!data_queue_.IsEmpty()) {
xunjieli 2015/12/07 17:35:54 BUG: If a former ReadData returned ERR_IO_PENDING,
xunjieli 2015/12/07 19:05:45 Done.
163 DCHECK_EQ(OK, status);
164 timer_->Stop();
165 DoBufferedRead();
166 }
167 }
168
169 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
170 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
171 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
172 stream_->GetProtocolVersion(), true,
173 headers.get());
174 bool end_stream = (request_info_.method == "GET");
175 stream_->SendRequestHeaders(
176 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
177 }
178
179 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
180 DCHECK_NE(ERR_IO_PENDING, rv);
181 if (rv == OK) {
182 stream_ = stream_request_.ReleaseStream();
183 stream_->SetDelegate(this);
184 SendRequestHeaders();
185 return;
186 }
187 delegate_->OnFailed(static_cast<Error>(rv));
188 }
189
190 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
191 // If there is already a scheduled DoBufferedRead, don't issue
192 // another one. Mark that we have received more data and return.
193 if (timer_->IsRunning()) {
194 more_read_data_pending_ = true;
195 return;
196 }
197
198 more_read_data_pending_ = false;
199 timer_->Start(FROM_HERE, kBufferTime,
200 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
201 weak_factory_.GetWeakPtr()));
202 }
203
204 void BidirectionalStreamSpdyJob::DoBufferedRead() {
205 DCHECK(!timer_->IsRunning());
206 // If the stream errored out, do not complete the read.
207 if (!stream_ && !stream_closed_)
208 return;
209 if (stream_closed_ && closed_stream_status_ != OK)
210 return;
211
212 // When |more_read_data_pending_| is true, it means that more data has arrived
213 // since started waiting. Wait a little longer and continue to buffer.
214 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
215 ScheduleBufferedRead();
216 return;
217 }
218
219 int rv = 0;
220 if (user_buffer_) {
221 rv = ReadData(user_buffer_.get(), user_buffer_len_);
222 DCHECK_NE(ERR_IO_PENDING, rv);
223 user_buffer_ = nullptr;
224 user_buffer_len_ = 0;
225 delegate_->OnReadCompleted(rv);
226 }
227 }
228
229 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
230 if (stream_closed_)
231 return false;
232 DCHECK_GT(user_buffer_len_, 0);
233 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
234 }
235
236 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/bidirectional_stream_spdy_job.h ('k') | net/spdy/bidirectional_stream_spdy_job_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698