Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(212)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Remove unneeded return value Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "net/base/request_priority.h"
11 #include "net/spdy/spdy_buffer.h"
12 #include "net/spdy/spdy_header_block.h"
13 #include "net/spdy/spdy_http_utils.h"
14 #include "net/spdy/spdy_stream.h"
15
16 namespace net {
17
18 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
19
20 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
21 const base::WeakPtr<SpdySession>& spdy_session)
22 : spdy_session_(spdy_session),
23 stream_closed_(false),
24 closed_stream_status_(ERR_FAILED),
25 buffered_read_pending_(false),
26 more_read_data_pending_(false),
27 weak_factory_(this) {}
28
29 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
30 if (stream_.get()) {
31 stream_->DetachDelegate();
32 DCHECK(!stream_.get());
33 }
34 }
35
36 void BidirectionalStreamSpdyJob::Start(
37 const HttpRequestInfo& request_info,
38 RequestPriority priority,
39 const BoundNetLog& net_log,
40 BidirectionalStreamJob::Delegate* delegate) {
41 delegate_ = delegate;
42 DCHECK(!stream_);
43 if (!spdy_session_) {
44 delegate_->OnClose(ERR_CONNECTION_CLOSED);
45 return;
46 }
47
48 request_info_ = request_info;
49
50 int rv = stream_request_.StartRequest(
51 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_.url, priority,
52 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
53 weak_factory_.GetWeakPtr()));
54 if (rv != ERR_IO_PENDING)
55 OnStreamInitialized(rv);
56 }
57
58 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
59 if (stream_.get())
60 CHECK(!stream_->IsIdle());
61
62 CHECK(buf);
63 CHECK(buf_len);
64 if (!stream_closed_)
65 CHECK(stream_);
66
67 // If there is data buffered, complete the IO immediately.
68 if (!data_queue_.IsEmpty()) {
69 return data_queue_.Dequeue(buf->data(), buf_len);
70 } else if (stream_closed_) {
71 return closed_stream_status_;
72 }
73 // Read will complete asynchronously and Delegate::OnReadCompleted will be
74 // called upon completion.
75 user_buffer_ = buf;
76 user_buffer_len_ = buf_len;
77 return ERR_IO_PENDING;
78 }
79
80 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
81 int length,
82 bool end_stream) {
83 stream_->SendData(data, length,
84 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
85 }
86
87 void BidirectionalStreamSpdyJob::Cancel() {
88 if (!stream_)
89 return;
90 // Cancels the stream and detaches the delegate so it doesn't get called back.
91 stream_->DetachDelegate();
92 DCHECK(!stream_);
93 }
94
95 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
96 delegate_->OnRequestHeadersSent();
97 }
98
99 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
100 const SpdyHeaderBlock& response_headers) {
101 delegate_->OnHeaders(response_headers);
102 return RESPONSE_HEADERS_ARE_COMPLETE;
103 }
104
105 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
106 DCHECK(stream_);
107 DCHECK(!stream_closed_);
108
109 if (buffer) {
110 data_queue_.Enqueue(buffer.Pass());
111 if (user_buffer_) {
112 // Handing small chunks of data to the caller creates measurable overhead.
113 // So buffer data in short time-spans and send a single read notification.
114 ScheduleBufferedRead();
115 }
116 }
117 // If |buffer| is null, onClose will be invoked to indicate the end of stream.
118 }
119
120 void BidirectionalStreamSpdyJob::OnDataSent() {
121 DCHECK(stream_);
122 DCHECK(!stream_closed_);
123
124 delegate_->OnDataSent();
125 }
126
127 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
128 DCHECK(stream_);
129 DCHECK(!stream_closed_);
130
131 delegate_->OnTrailers(trailers);
132 }
133
134 void BidirectionalStreamSpdyJob::OnClose(int status) {
135 DCHECK(stream_);
136
137 stream_closed_ = true;
138 closed_stream_status_ = status;
139 stream_.reset();
140
141 if (status == OK) {
142 DoBufferedRead();
143 return;
144 }
145
146 delegate_->OnClose(status);
147 }
148
149 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
150 stream_->SetDelegate(this);
151 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
152 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
153 stream_->GetProtocolVersion(), true,
154 headers.get());
155 bool end_stream = (request_info_.method == "GET");
156 stream_->SendRequestHeaders(
157 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
158 }
159
160 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
161 DCHECK_NE(ERR_IO_PENDING, rv);
162 if (rv == OK) {
163 stream_ = stream_request_.ReleaseStream();
164 SendRequestHeaders();
165 return;
166 }
167 delegate_->OnClose(static_cast<Error>(rv));
168 }
169
170 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
171 // If there is already a scheduled DoBufferedRead, don't issue
172 // another one. Mark that we have received more data and return.
173 if (buffered_read_pending_) {
174 more_read_data_pending_ = true;
175 return;
176 }
177
178 more_read_data_pending_ = false;
179 buffered_read_pending_ = true;
180 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
181 FROM_HERE, base::Bind(base::IgnoreResult(
182 &BidirectionalStreamSpdyJob::DoBufferedRead),
183 weak_factory_.GetWeakPtr()),
184 kBufferTime);
185 }
186
187 void BidirectionalStreamSpdyJob::DoBufferedRead() {
188 buffered_read_pending_ = false;
189 // If the stream errored out, do not complete the read.
190 if (!stream_ && !stream_closed_)
191 return;
192 if (stream_closed_ && closed_stream_status_ != OK)
193 return;
194
195 // When |more_read_data_pending_| is true, it means that more data has arrived
196 // since started waiting. Wait a little longer and continue to buffer.
197 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
198 ScheduleBufferedRead();
199 return;
200 }
201
202 int rv = 0;
203 if (user_buffer_) {
204 rv = ReadData(user_buffer_.get(), user_buffer_len_);
205 DCHECK_NE(ERR_IO_PENDING, rv);
206 user_buffer_ = nullptr;
207 user_buffer_len_ = 0;
208 delegate_->OnReadCompleted(rv);
209
210 // If all data is read, and BidirectionalStreamSpdyJob::onClose is invoked
211 // previously, let the delegate know about the onClose event.
212 if (data_queue_.IsEmpty() && stream_closed_) {
213 DCHECK_EQ(OK, closed_stream_status_);
214 delegate_->OnClose(OK);
215 }
216 }
217 }
218
219 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
220 if (stream_closed_)
221 return false;
222 DCHECK_GT(user_buffer_len_, 0);
223 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
224 }
225
226 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698