Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: components/grpc_support/bidirectional_stream_c.cc

Issue 2492703002: Third try at landing gRPC refactoring. Previous issue failed on the waterfall (Closed)
Patch Set: Change DCHECK Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/cronet/ios/cronet_c_for_grpc.h" 5 #include "components/grpc_support/include/bidirectional_stream_c.h"
6 6
7 #include <stdbool.h> 7 #include <stdbool.h>
8 8
9 #include <memory> 9 #include <memory>
10 #include <string> 10 #include <string>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/bind.h" 13 #include "base/bind.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h" 16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h" 18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_number_conversions.h" 19 #include "base/strings/string_number_conversions.h"
20 #include "base/strings/string_split.h" 20 #include "base/strings/string_split.h"
21 #include "components/cronet/ios/cronet_bidirectional_stream.h" 21 #include "components/grpc_support/bidirectional_stream.h"
22 #include "components/cronet/ios/cronet_environment.h"
23 #include "net/base/io_buffer.h" 22 #include "net/base/io_buffer.h"
24 #include "net/base/net_errors.h" 23 #include "net/base/net_errors.h"
25 #include "net/base/request_priority.h" 24 #include "net/base/request_priority.h"
26 #include "net/http/bidirectional_stream.h" 25 #include "net/http/bidirectional_stream.h"
27 #include "net/http/bidirectional_stream_request_info.h" 26 #include "net/http/bidirectional_stream_request_info.h"
28 #include "net/http/http_network_session.h" 27 #include "net/http/http_network_session.h"
29 #include "net/http/http_response_headers.h" 28 #include "net/http/http_response_headers.h"
30 #include "net/http/http_status_code.h" 29 #include "net/http/http_status_code.h"
31 #include "net/http/http_transaction_factory.h" 30 #include "net/http/http_transaction_factory.h"
32 #include "net/http/http_util.h" 31 #include "net/http/http_util.h"
33 #include "net/spdy/spdy_header_block.h" 32 #include "net/spdy/spdy_header_block.h"
34 #include "net/ssl/ssl_info.h" 33 #include "net/ssl/ssl_info.h"
35 #include "net/url_request/http_user_agent_settings.h" 34 #include "net/url_request/http_user_agent_settings.h"
36 #include "net/url_request/url_request_context.h" 35 #include "net/url_request/url_request_context.h"
37 #include "url/gurl.h" 36 #include "url/gurl.h"
38 37
39 namespace { 38 namespace {
40 39
41 class HeadersArray : public cronet_bidirectional_stream_header_array { 40 class HeadersArray : public bidirectional_stream_header_array {
42 public: 41 public:
43 HeadersArray(const net::SpdyHeaderBlock& header_block); 42 HeadersArray(const net::SpdyHeaderBlock& header_block);
44 ~HeadersArray(); 43 ~HeadersArray();
45 44
46 private: 45 private:
47 DISALLOW_COPY_AND_ASSIGN(HeadersArray); 46 DISALLOW_COPY_AND_ASSIGN(HeadersArray);
48 base::StringPairs headers_strings_; 47 base::StringPairs headers_strings_;
49 }; 48 };
50 49
51 HeadersArray::HeadersArray(const net::SpdyHeaderBlock& header_block) 50 HeadersArray::HeadersArray(const net::SpdyHeaderBlock& header_block)
52 : headers_strings_(header_block.size()) { 51 : headers_strings_(header_block.size()) {
53 // Count and headers are inherited from parent structure. 52 // Count and headers are inherited from parent structure.
54 count = capacity = header_block.size(); 53 count = capacity = header_block.size();
55 headers = new cronet_bidirectional_stream_header[count]; 54 headers = new bidirectional_stream_header[count];
56 size_t i = 0; 55 size_t i = 0;
57 // Copy headers into |headers_strings_| because string pieces are not 56 // Copy headers into |headers_strings_| because string pieces are not
58 // '\0'-terminated. 57 // '\0'-terminated.
59 for (const auto& it : header_block) { 58 for (const auto& it : header_block) {
60 headers_strings_[i].first = it.first.as_string(); 59 headers_strings_[i].first = it.first.as_string();
61 headers_strings_[i].second = it.second.as_string(); 60 headers_strings_[i].second = it.second.as_string();
62 headers[i].key = headers_strings_[i].first.c_str(); 61 headers[i].key = headers_strings_[i].first.c_str();
63 headers[i].value = headers_strings_[i].second.c_str(); 62 headers[i].value = headers_strings_[i].second.c_str();
64 ++i; 63 ++i;
65 } 64 }
66 } 65 }
67 66
68 HeadersArray::~HeadersArray() { 67 HeadersArray::~HeadersArray() {
69 delete[] headers; 68 delete[] headers;
70 } 69 }
71 70
72 class CronetBidirectionalStreamAdapter 71 class BidirectionalStreamAdapter
73 : public cronet::CronetBidirectionalStream::Delegate { 72 : public grpc_support::BidirectionalStream::Delegate {
74 public: 73 public:
75 CronetBidirectionalStreamAdapter( 74 BidirectionalStreamAdapter(stream_engine* engine,
76 cronet_engine* engine, 75 void* annotation,
77 void* annotation, 76 bidirectional_stream_callback* callback);
78 cronet_bidirectional_stream_callback* callback);
79 77
80 virtual ~CronetBidirectionalStreamAdapter(); 78 virtual ~BidirectionalStreamAdapter();
81 79
82 void OnStreamReady() override; 80 void OnStreamReady() override;
83 81
84 void OnHeadersReceived(const net::SpdyHeaderBlock& headers_block, 82 void OnHeadersReceived(const net::SpdyHeaderBlock& headers_block,
85 const char* negotiated_protocol) override; 83 const char* negotiated_protocol) override;
86 84
87 void OnDataRead(char* data, int size) override; 85 void OnDataRead(char* data, int size) override;
88 86
89 void OnDataSent(const char* data) override; 87 void OnDataSent(const char* data) override;
90 88
91 void OnTrailersReceived(const net::SpdyHeaderBlock& trailers_block) override; 89 void OnTrailersReceived(const net::SpdyHeaderBlock& trailers_block) override;
92 90
93 void OnSucceeded() override; 91 void OnSucceeded() override;
94 92
95 void OnFailed(int error) override; 93 void OnFailed(int error) override;
96 94
97 void OnCanceled() override; 95 void OnCanceled() override;
98 96
99 cronet_bidirectional_stream* c_stream() const { return c_stream_.get(); } 97 bidirectional_stream* c_stream() const { return c_stream_.get(); }
100 98
101 static cronet::CronetBidirectionalStream* GetCronetStream( 99 static grpc_support::BidirectionalStream* GetStream(
102 cronet_bidirectional_stream* stream); 100 bidirectional_stream* stream);
103 101
104 static void DestroyAdapterForStream(cronet_bidirectional_stream* stream); 102 static void DestroyAdapterForStream(bidirectional_stream* stream);
105 103
106 private: 104 private:
107 void DestroyOnNetworkThread(); 105 void DestroyOnNetworkThread();
108 106
109 // None of these objects are owned by |this|. 107 // None of these objects are owned by |this|.
110 cronet::CronetEnvironment* cronet_environment_; 108 net::URLRequestContextGetter* request_context_getter_;
111 cronet::CronetBidirectionalStream* cronet_bidirectional_stream_; 109 grpc_support::BidirectionalStream* bidirectional_stream_;
112 // C side 110 // C side
113 std::unique_ptr<cronet_bidirectional_stream> c_stream_; 111 std::unique_ptr<bidirectional_stream> c_stream_;
114 cronet_bidirectional_stream_callback* c_callback_; 112 bidirectional_stream_callback* c_callback_;
115 }; 113 };
116 114
117 CronetBidirectionalStreamAdapter::CronetBidirectionalStreamAdapter( 115 BidirectionalStreamAdapter::BidirectionalStreamAdapter(
118 cronet_engine* engine, 116 stream_engine* engine,
119 void* annotation, 117 void* annotation,
120 cronet_bidirectional_stream_callback* callback) 118 bidirectional_stream_callback* callback)
121 : cronet_environment_( 119 : request_context_getter_(
122 reinterpret_cast<cronet::CronetEnvironment*>(engine->obj)), 120 reinterpret_cast<net::URLRequestContextGetter*>(engine->obj)),
123 c_stream_(base::MakeUnique<cronet_bidirectional_stream>()), 121 c_stream_(base::MakeUnique<bidirectional_stream>()),
124 c_callback_(callback) { 122 c_callback_(callback) {
125 DCHECK(cronet_environment_); 123 DCHECK(request_context_getter_);
126 cronet_bidirectional_stream_ = 124 bidirectional_stream_ =
127 new cronet::CronetBidirectionalStream(cronet_environment_, this); 125 new grpc_support::BidirectionalStream(request_context_getter_, this);
128 c_stream_->obj = this; 126 c_stream_->obj = this;
129 c_stream_->annotation = annotation; 127 c_stream_->annotation = annotation;
130 } 128 }
131 129
132 CronetBidirectionalStreamAdapter::~CronetBidirectionalStreamAdapter() {} 130 BidirectionalStreamAdapter::~BidirectionalStreamAdapter() {}
133 131
134 void CronetBidirectionalStreamAdapter::OnStreamReady() { 132 void BidirectionalStreamAdapter::OnStreamReady() {
135 DCHECK(c_callback_->on_stream_ready); 133 DCHECK(c_callback_->on_response_headers_received);
136 c_callback_->on_stream_ready(c_stream()); 134 c_callback_->on_stream_ready(c_stream());
137 } 135 }
138 136
139 void CronetBidirectionalStreamAdapter::OnHeadersReceived( 137 void BidirectionalStreamAdapter::OnHeadersReceived(
140 const net::SpdyHeaderBlock& headers_block, 138 const net::SpdyHeaderBlock& headers_block,
141 const char* negotiated_protocol) { 139 const char* negotiated_protocol) {
142 DCHECK(c_callback_->on_response_headers_received); 140 DCHECK(c_callback_->on_response_headers_received);
143 HeadersArray response_headers(headers_block); 141 HeadersArray response_headers(headers_block);
144 c_callback_->on_response_headers_received(c_stream(), &response_headers, 142 c_callback_->on_response_headers_received(c_stream(), &response_headers,
145 negotiated_protocol); 143 negotiated_protocol);
146 } 144 }
147 145
148 void CronetBidirectionalStreamAdapter::OnDataRead(char* data, int size) { 146 void BidirectionalStreamAdapter::OnDataRead(char* data, int size) {
149 DCHECK(c_callback_->on_read_completed); 147 DCHECK(c_callback_->on_read_completed);
150 c_callback_->on_read_completed(c_stream(), data, size); 148 c_callback_->on_read_completed(c_stream(), data, size);
151 } 149 }
152 150
153 void CronetBidirectionalStreamAdapter::OnDataSent(const char* data) { 151 void BidirectionalStreamAdapter::OnDataSent(const char* data) {
154 DCHECK(c_callback_->on_write_completed); 152 DCHECK(c_callback_->on_write_completed);
155 c_callback_->on_write_completed(c_stream(), data); 153 c_callback_->on_write_completed(c_stream(), data);
156 } 154 }
157 155
158 void CronetBidirectionalStreamAdapter::OnTrailersReceived( 156 void BidirectionalStreamAdapter::OnTrailersReceived(
159 const net::SpdyHeaderBlock& trailers_block) { 157 const net::SpdyHeaderBlock& trailers_block) {
160 DCHECK(c_callback_->on_response_trailers_received); 158 DCHECK(c_callback_->on_response_trailers_received);
161 HeadersArray response_trailers(trailers_block); 159 HeadersArray response_trailers(trailers_block);
162 c_callback_->on_response_trailers_received(c_stream(), &response_trailers); 160 c_callback_->on_response_trailers_received(c_stream(), &response_trailers);
163 } 161 }
164 162
165 void CronetBidirectionalStreamAdapter::OnSucceeded() { 163 void BidirectionalStreamAdapter::OnSucceeded() {
166 DCHECK(c_callback_->on_succeded); 164 DCHECK(c_callback_->on_succeded);
167 c_callback_->on_succeded(c_stream()); 165 c_callback_->on_succeded(c_stream());
168 } 166 }
169 167
170 void CronetBidirectionalStreamAdapter::OnFailed(int error) { 168 void BidirectionalStreamAdapter::OnFailed(int error) {
171 DCHECK(c_callback_->on_failed); 169 DCHECK(c_callback_->on_failed);
172 c_callback_->on_failed(c_stream(), error); 170 c_callback_->on_failed(c_stream(), error);
173 } 171 }
174 172
175 void CronetBidirectionalStreamAdapter::OnCanceled() { 173 void BidirectionalStreamAdapter::OnCanceled() {
176 DCHECK(c_callback_->on_canceled); 174 DCHECK(c_callback_->on_canceled);
177 c_callback_->on_canceled(c_stream()); 175 c_callback_->on_canceled(c_stream());
178 } 176 }
179 177
180 cronet::CronetBidirectionalStream* 178 grpc_support::BidirectionalStream* BidirectionalStreamAdapter::GetStream(
181 CronetBidirectionalStreamAdapter::GetCronetStream( 179 bidirectional_stream* stream) {
182 cronet_bidirectional_stream* stream) {
183 DCHECK(stream); 180 DCHECK(stream);
184 CronetBidirectionalStreamAdapter* adapter = 181 BidirectionalStreamAdapter* adapter =
185 static_cast<CronetBidirectionalStreamAdapter*>(stream->obj); 182 static_cast<BidirectionalStreamAdapter*>(stream->obj);
186 DCHECK(adapter->c_stream() == stream); 183 DCHECK(adapter->c_stream() == stream);
187 DCHECK(adapter->cronet_bidirectional_stream_); 184 DCHECK(adapter->bidirectional_stream_);
188 return adapter->cronet_bidirectional_stream_; 185 return adapter->bidirectional_stream_;
189 } 186 }
190 187
191 void CronetBidirectionalStreamAdapter::DestroyAdapterForStream( 188 void BidirectionalStreamAdapter::DestroyAdapterForStream(
192 cronet_bidirectional_stream* stream) { 189 bidirectional_stream* stream) {
193 DCHECK(stream); 190 DCHECK(stream);
194 CronetBidirectionalStreamAdapter* adapter = 191 BidirectionalStreamAdapter* adapter =
195 static_cast<CronetBidirectionalStreamAdapter*>(stream->obj); 192 static_cast<BidirectionalStreamAdapter*>(stream->obj);
196 DCHECK(adapter->c_stream() == stream); 193 DCHECK(adapter->c_stream() == stream);
197 // Destroy could be called from any thread, including network thread (if 194 // Destroy could be called from any thread, including network thread (if
198 // posting task to executor throws an exception), but is posted, so |this| 195 // posting task to executor throws an exception), but is posted, so |this|
199 // is valid until calling task is complete. 196 // is valid until calling task is complete.
200 adapter->cronet_bidirectional_stream_->Destroy(); 197 adapter->bidirectional_stream_->Destroy();
201 adapter->cronet_environment_->PostToNetworkThread( 198 adapter->request_context_getter_->GetNetworkTaskRunner()->PostTask(
202 FROM_HERE, 199 FROM_HERE, base::Bind(&BidirectionalStreamAdapter::DestroyOnNetworkThread,
203 base::Bind(&CronetBidirectionalStreamAdapter::DestroyOnNetworkThread, 200 base::Unretained(adapter)));
204 base::Unretained(adapter)));
205 } 201 }
206 202
207 void CronetBidirectionalStreamAdapter::DestroyOnNetworkThread() { 203 void BidirectionalStreamAdapter::DestroyOnNetworkThread() {
208 DCHECK(cronet_environment_->IsOnNetworkThread()); 204 DCHECK(request_context_getter_->GetNetworkTaskRunner()
205 ->BelongsToCurrentThread());
209 delete this; 206 delete this;
210 } 207 }
211 208
212 } // namespace 209 } // namespace
213 210
214 cronet_bidirectional_stream* cronet_bidirectional_stream_create( 211 bidirectional_stream* bidirectional_stream_create(
215 cronet_engine* engine, 212 stream_engine* engine,
216 void* annotation, 213 void* annotation,
217 cronet_bidirectional_stream_callback* callback) { 214 bidirectional_stream_callback* callback) {
218 // Allocate new C++ adapter that will invoke |callback|. 215 // Allocate new C++ adapter that will invoke |callback|.
219 CronetBidirectionalStreamAdapter* stream_adapter = 216 BidirectionalStreamAdapter* stream_adapter =
220 new CronetBidirectionalStreamAdapter(engine, annotation, callback); 217 new BidirectionalStreamAdapter(engine, annotation, callback);
221 return stream_adapter->c_stream(); 218 return stream_adapter->c_stream();
222 } 219 }
223 220
224 int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) { 221 int bidirectional_stream_destroy(bidirectional_stream* stream) {
225 CronetBidirectionalStreamAdapter::DestroyAdapterForStream(stream); 222 BidirectionalStreamAdapter::DestroyAdapterForStream(stream);
226 return 1; 223 return 1;
227 } 224 }
228 225
229 void cronet_bidirectional_stream_disable_auto_flush( 226 void bidirectional_stream_disable_auto_flush(bidirectional_stream* stream,
230 cronet_bidirectional_stream* stream, 227 bool disable_auto_flush) {
231 bool disable_auto_flush) { 228 BidirectionalStreamAdapter::GetStream(stream)->disable_auto_flush(
232 CronetBidirectionalStreamAdapter::GetCronetStream(stream)->disable_auto_flush(
233 disable_auto_flush); 229 disable_auto_flush);
234 } 230 }
235 231
236 void cronet_bidirectional_stream_delay_request_headers_until_flush( 232 void bidirectional_stream_delay_request_headers_until_flush(
237 cronet_bidirectional_stream* stream, 233 bidirectional_stream* stream,
238 bool delay_headers_until_flush) { 234 bool delay_headers_until_flush) {
239 CronetBidirectionalStreamAdapter::GetCronetStream(stream) 235 BidirectionalStreamAdapter::GetStream(stream)->delay_headers_until_flush(
240 ->delay_headers_until_flush(delay_headers_until_flush); 236 delay_headers_until_flush);
241 } 237 }
242 238
243 int cronet_bidirectional_stream_start( 239 int bidirectional_stream_start(bidirectional_stream* stream,
244 cronet_bidirectional_stream* stream, 240 const char* url,
245 const char* url, 241 int priority,
246 int priority, 242 const char* method,
247 const char* method, 243 const bidirectional_stream_header_array* headers,
248 const cronet_bidirectional_stream_header_array* headers, 244 bool end_of_stream) {
249 bool end_of_stream) { 245 grpc_support::BidirectionalStream* internal_stream =
250 cronet::CronetBidirectionalStream* cronet_stream = 246 BidirectionalStreamAdapter::GetStream(stream);
251 CronetBidirectionalStreamAdapter::GetCronetStream(stream);
252 net::HttpRequestHeaders request_headers; 247 net::HttpRequestHeaders request_headers;
253 if (headers) { 248 if (headers) {
254 for (size_t i = 0; i < headers->count; ++i) { 249 for (size_t i = 0; i < headers->count; ++i) {
255 std::string name(headers->headers[i].key); 250 std::string name(headers->headers[i].key);
256 std::string value(headers->headers[i].value); 251 std::string value(headers->headers[i].value);
257 if (!net::HttpUtil::IsValidHeaderName(name) || 252 if (!net::HttpUtil::IsValidHeaderName(name) ||
258 !net::HttpUtil::IsValidHeaderValue(value)) { 253 !net::HttpUtil::IsValidHeaderValue(value)) {
259 DLOG(ERROR) << "Invalid Header " << name << "=" << value; 254 DLOG(ERROR) << "Invalid Header " << name << "=" << value;
260 return i + 1; 255 return i + 1;
261 } 256 }
262 request_headers.SetHeader(name, value); 257 request_headers.SetHeader(name, value);
263 } 258 }
264 } 259 }
265 return cronet_stream->Start(url, priority, method, request_headers, 260 return internal_stream->Start(url, priority, method, request_headers,
266 end_of_stream); 261 end_of_stream);
267 } 262 }
268 263
269 int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, 264 int bidirectional_stream_read(bidirectional_stream* stream,
270 char* buffer, 265 char* buffer,
271 int capacity) { 266 int capacity) {
272 return CronetBidirectionalStreamAdapter::GetCronetStream(stream)->ReadData( 267 return BidirectionalStreamAdapter::GetStream(stream)->ReadData(buffer,
273 buffer, capacity); 268 capacity);
274 } 269 }
275 270
276 int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, 271 int bidirectional_stream_write(bidirectional_stream* stream,
277 const char* buffer, 272 const char* buffer,
278 int count, 273 int count,
279 bool end_of_stream) { 274 bool end_of_stream) {
280 return CronetBidirectionalStreamAdapter::GetCronetStream(stream)->WriteData( 275 return BidirectionalStreamAdapter::GetStream(stream)->WriteData(
281 buffer, count, end_of_stream); 276 buffer, count, end_of_stream);
282 } 277 }
283 278
284 void cronet_bidirectional_stream_flush(cronet_bidirectional_stream* stream) { 279 void bidirectional_stream_flush(bidirectional_stream* stream) {
285 return CronetBidirectionalStreamAdapter::GetCronetStream(stream)->Flush(); 280 return BidirectionalStreamAdapter::GetStream(stream)->Flush();
286 } 281 }
287 282
288 void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { 283 void bidirectional_stream_cancel(bidirectional_stream* stream) {
289 CronetBidirectionalStreamAdapter::GetCronetStream(stream)->Cancel(); 284 BidirectionalStreamAdapter::GetStream(stream)->Cancel();
290 } 285 }
OLDNEW
« no previous file with comments | « components/grpc_support/bidirectional_stream.cc ('k') | components/grpc_support/bidirectional_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698