Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(35)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix bug and added a test Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/request_priority.h"
12 #include "net/spdy/spdy_buffer.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15 #include "net/spdy/spdy_stream.h"
16
17 namespace net {
18
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
20
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
22 const base::WeakPtr<SpdySession>& spdy_session,
23 scoped_ptr<base::Timer> timer)
24 : spdy_session_(spdy_session),
25 timer_(timer.release()),
26 stream_closed_(false),
27 closed_stream_status_(ERR_FAILED),
28 more_read_data_pending_(false),
29 weak_factory_(this) {}
30
31 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
32 const base::WeakPtr<SpdySession>& spdy_session)
33 : BidirectionalStreamSpdyJob(
34 spdy_session,
35 make_scoped_ptr(new base::Timer(false, false))) {}
36
37 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
38 if (stream_.get()) {
39 stream_->DetachDelegate();
40 DCHECK(!stream_.get());
41 }
42 }
43
44 void BidirectionalStreamSpdyJob::Start(
45 const HttpRequestInfo& request_info,
46 RequestPriority priority,
47 const BoundNetLog& net_log,
48 BidirectionalStreamJob::Delegate* delegate) {
49 delegate_ = delegate;
50 DCHECK(!stream_);
51 if (!spdy_session_) {
52 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
53 return;
54 }
55
56 request_info_ = request_info;
57
58 int rv = stream_request_.StartRequest(
59 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority,
60 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
61 weak_factory_.GetWeakPtr()));
62 if (rv != ERR_IO_PENDING)
63 OnStreamInitialized(rv);
64 }
65
66 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
67 if (stream_.get())
68 CHECK(!stream_->IsIdle());
69
70 CHECK(buf);
71 CHECK(buf_len);
72 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
73
74 if (!stream_closed_)
75 CHECK(stream_);
76
77 // If there is data buffered, complete the IO immediately.
78 if (!data_queue_.IsEmpty()) {
79 return data_queue_.Dequeue(buf->data(), buf_len);
80 } else if (stream_closed_) {
81 return closed_stream_status_;
mef 2015/12/07 21:18:07 Shouldn't it return 0? The comment says that it r
xunjieli 2015/12/08 16:08:46 It should return a non-negative number that indica
82 }
83 // Read will complete asynchronously and Delegate::OnReadCompleted will be
84 // called upon completion.
85 user_buffer_ = buf;
86 user_buffer_len_ = buf_len;
87 return ERR_IO_PENDING;
88 }
89
90 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
91 int length,
92 bool end_stream) {
93 CHECK(!stream_closed_);
94 CHECK(stream_);
95
96 stream_->SendData(data, length,
97 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
98 }
99
100 void BidirectionalStreamSpdyJob::Cancel() {
101 if (!stream_)
102 return;
103 // Cancels the stream and detaches the delegate so it doesn't get called back.
104 stream_->DetachDelegate();
105 DCHECK(!stream_);
106 }
107
108 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
109 delegate_->OnRequestHeadersSent();
110 }
111
112 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
113 const SpdyHeaderBlock& response_headers) {
114 delegate_->OnHeaders(response_headers);
115 return RESPONSE_HEADERS_ARE_COMPLETE;
116 }
117
118 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
119 DCHECK(stream_);
120 DCHECK(!stream_closed_);
121
122 if (buffer) {
123 data_queue_.Enqueue(buffer.Pass());
124 if (user_buffer_) {
125 // Handing small chunks of data to the caller creates measurable overhead.
126 // So buffer data in short time-spans and send a single read notification.
127 ScheduleBufferedRead();
128 }
129 }
130 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
131 // SpdyStream to indicate the end of stream.
132 }
133
134 void BidirectionalStreamSpdyJob::OnDataSent() {
135 DCHECK(stream_);
136 DCHECK(!stream_closed_);
137
138 delegate_->OnDataSent();
139 }
140
141 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
142 DCHECK(stream_);
143 DCHECK(!stream_closed_);
144
145 delegate_->OnTrailers(trailers);
146 }
147
148 void BidirectionalStreamSpdyJob::OnClose(int status) {
149 DCHECK(stream_);
150
151 stream_closed_ = true;
152 closed_stream_status_ = status;
153 stream_.reset();
154
155 if (status != OK) {
156 delegate_->OnFailed(status);
157 return;
158 }
159 // Complete any remaining read, as all data has been buffered.
160 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will
161 // do nothing.
162 DCHECK_EQ(OK, status);
163 timer_->Stop();
164 DoBufferedRead();
165 }
166
167 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
168 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
169 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
170 stream_->GetProtocolVersion(), true,
171 headers.get());
172 bool end_stream = (request_info_.method == "GET");
mef 2015/12/07 21:18:07 I suggest that we add at least 'HEAD', but I would
xunjieli 2015/12/08 16:08:46 Done.
mef 2015/12/11 20:06:12 Per our chat I've meant that we need to pass an ex
xunjieli 2015/12/11 23:48:39 Done.
173 stream_->SendRequestHeaders(
174 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
175 }
176
177 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
178 DCHECK_NE(ERR_IO_PENDING, rv);
179 if (rv == OK) {
180 stream_ = stream_request_.ReleaseStream();
181 stream_->SetDelegate(this);
182 SendRequestHeaders();
183 return;
184 }
185 delegate_->OnFailed(static_cast<Error>(rv));
186 }
187
188 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
189 // If there is already a scheduled DoBufferedRead, don't issue
190 // another one. Mark that we have received more data and return.
191 if (timer_->IsRunning()) {
192 more_read_data_pending_ = true;
193 return;
194 }
195
196 more_read_data_pending_ = false;
197 timer_->Start(FROM_HERE, kBufferTime,
198 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
199 weak_factory_.GetWeakPtr()));
200 }
201
202 void BidirectionalStreamSpdyJob::DoBufferedRead() {
203 DCHECK(!timer_->IsRunning());
204 // If the stream errored out, do not complete the read.
205 if (!stream_ && !stream_closed_)
206 return;
207 if (stream_closed_ && closed_stream_status_ != OK)
208 return;
209
210 // When |more_read_data_pending_| is true, it means that more data has arrived
211 // since started waiting. Wait a little longer and continue to buffer.
212 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
213 ScheduleBufferedRead();
214 return;
215 }
216
217 int rv = 0;
218 if (user_buffer_) {
219 rv = ReadData(user_buffer_.get(), user_buffer_len_);
220 DCHECK_NE(ERR_IO_PENDING, rv);
221 user_buffer_ = nullptr;
222 user_buffer_len_ = 0;
223 delegate_->OnReadCompleted(rv);
224 }
225 }
226
227 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
228 if (stream_closed_)
229 return false;
230 DCHECK_GT(user_buffer_len_, 0);
231 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
232 }
233
234 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698