Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Get rid of Delegate::OnClose Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/request_priority.h"
12 #include "net/spdy/spdy_buffer.h"
13 #include "net/spdy/spdy_header_block.h"
14 #include "net/spdy/spdy_http_utils.h"
15 #include "net/spdy/spdy_stream.h"
16
17 namespace net {
18
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
20
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
22 const base::WeakPtr<SpdySession>& spdy_session,
23 scoped_ptr<base::Timer> timer)
24 : spdy_session_(spdy_session),
25 timer_(timer.release()),
26 stream_closed_(false),
27 closed_stream_status_(ERR_FAILED),
28 more_read_data_pending_(false),
29 weak_factory_(this) {}
30
31 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
32 const base::WeakPtr<SpdySession>& spdy_session)
33 : BidirectionalStreamSpdyJob(
34 spdy_session,
35 make_scoped_ptr(new base::Timer(false, false))) {}
36
37 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
38 if (stream_.get()) {
39 stream_->DetachDelegate();
40 DCHECK(!stream_.get());
41 }
42 }
43
44 void BidirectionalStreamSpdyJob::Start(
45 const HttpRequestInfo& request_info,
46 RequestPriority priority,
47 const BoundNetLog& net_log,
48 BidirectionalStreamJob::Delegate* delegate) {
49 delegate_ = delegate;
50 DCHECK(!stream_);
51 if (!spdy_session_) {
52 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
53 return;
54 }
55
56 request_info_ = request_info;
57
58 int rv = stream_request_.StartRequest(
59 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_.url, priority,
60 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
61 weak_factory_.GetWeakPtr()));
62 if (rv != ERR_IO_PENDING)
63 OnStreamInitialized(rv);
64 }
65
66 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
67 if (stream_.get())
68 CHECK(!stream_->IsIdle());
69
70 CHECK(buf);
71 CHECK(buf_len);
72 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
73
74 if (!stream_closed_)
75 CHECK(stream_);
76
77 // If there is data buffered, complete the IO immediately.
78 if (!data_queue_.IsEmpty()) {
79 return data_queue_.Dequeue(buf->data(), buf_len);
80 } else if (stream_closed_) {
81 return closed_stream_status_;
82 }
83 // Read will complete asynchronously and Delegate::OnReadCompleted will be
84 // called upon completion.
85 user_buffer_ = buf;
86 user_buffer_len_ = buf_len;
87 return ERR_IO_PENDING;
88 }
89
90 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
91 int length,
92 bool end_stream) {
93 stream_->SendData(data, length,
94 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
95 }
96
97 void BidirectionalStreamSpdyJob::Cancel() {
98 if (!stream_)
99 return;
100 // Cancels the stream and detaches the delegate so it doesn't get called back.
101 stream_->DetachDelegate();
102 DCHECK(!stream_);
103 }
104
105 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
106 delegate_->OnRequestHeadersSent();
107 }
108
109 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
110 const SpdyHeaderBlock& response_headers) {
111 delegate_->OnHeaders(response_headers);
112 return RESPONSE_HEADERS_ARE_COMPLETE;
113 }
114
115 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
116 DCHECK(stream_);
117 DCHECK(!stream_closed_);
118
119 if (buffer) {
120 data_queue_.Enqueue(buffer.Pass());
121 if (user_buffer_) {
122 // Handing small chunks of data to the caller creates measurable overhead.
123 // So buffer data in short time-spans and send a single read notification.
124 ScheduleBufferedRead();
125 }
126 }
127 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
128 // SpdyStream to indicate the end of stream.
129 }
130
131 void BidirectionalStreamSpdyJob::OnDataSent() {
132 DCHECK(stream_);
133 DCHECK(!stream_closed_);
134
135 delegate_->OnDataSent();
136 }
137
138 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
139 DCHECK(stream_);
140 DCHECK(!stream_closed_);
141
142 delegate_->OnTrailers(trailers);
143 }
144
145 void BidirectionalStreamSpdyJob::OnClose(int status) {
146 DCHECK(stream_);
147
148 stream_closed_ = true;
149 closed_stream_status_ = status;
150 stream_.reset();
151
152 if (status != OK) {
153 delegate_->OnFailed(status);
154 return;
155 }
156 // Complete any remaining read, as all data has been buffered.
157 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will
158 // do nothing.
159 if (!data_queue_.IsEmpty()) {
160 DCHECK_EQ(OK, status);
161 timer_->Stop();
162 DoBufferedRead();
163 }
164 }
165
166 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
167 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
168 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
169 stream_->GetProtocolVersion(), true,
170 headers.get());
171 bool end_stream = (request_info_.method == "GET");
172 stream_->SendRequestHeaders(
173 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
174 }
175
176 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
177 DCHECK_NE(ERR_IO_PENDING, rv);
178 if (rv == OK) {
179 stream_ = stream_request_.ReleaseStream();
180 stream_->SetDelegate(this);
181 SendRequestHeaders();
182 return;
183 }
184 delegate_->OnFailed(static_cast<Error>(rv));
185 }
186
187 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
188 // If there is already a scheduled DoBufferedRead, don't issue
189 // another one. Mark that we have received more data and return.
190 if (timer_->IsRunning()) {
191 more_read_data_pending_ = true;
192 return;
193 }
194
195 more_read_data_pending_ = false;
196 timer_->Start(FROM_HERE, kBufferTime,
197 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
198 weak_factory_.GetWeakPtr()));
199 }
200
201 void BidirectionalStreamSpdyJob::DoBufferedRead() {
202 DCHECK(!timer_->IsRunning());
203 // If the stream errored out, do not complete the read.
204 if (!stream_ && !stream_closed_)
205 return;
206 if (stream_closed_ && closed_stream_status_ != OK)
207 return;
208
209 // When |more_read_data_pending_| is true, it means that more data has arrived
210 // since started waiting. Wait a little longer and continue to buffer.
211 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
212 ScheduleBufferedRead();
213 return;
214 }
215
216 int rv = 0;
217 if (user_buffer_) {
218 rv = ReadData(user_buffer_.get(), user_buffer_len_);
219 DCHECK_NE(ERR_IO_PENDING, rv);
220 user_buffer_ = nullptr;
221 user_buffer_len_ = 0;
222 delegate_->OnReadCompleted(rv);
223 }
224 }
225
226 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
227 if (stream_closed_)
228 return false;
229 DCHECK_GT(user_buffer_len_, 0);
230 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
231 }
232
233 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698