Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(395)

Side by Side Diff: mojo/services/html_viewer/websockethandle_impl.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: give up Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/html_viewer/websockethandle_impl.h" 5 #include "mojo/services/html_viewer/websockethandle_impl.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h"
10 #include "base/memory/scoped_vector.h"
9 #include "mojo/services/html_viewer/blink_basic_type_converters.h" 11 #include "mojo/services/html_viewer/blink_basic_type_converters.h"
12 #include "mojo/services/public/cpp/network/web_socket_read_queue.h"
13 #include "mojo/services/public/cpp/network/web_socket_write_queue.h"
10 #include "mojo/services/public/interfaces/network/network_service.mojom.h" 14 #include "mojo/services/public/interfaces/network/network_service.mojom.h"
11 #include "third_party/WebKit/public/platform/WebSerializedOrigin.h" 15 #include "third_party/WebKit/public/platform/WebSerializedOrigin.h"
12 #include "third_party/WebKit/public/platform/WebSocketHandleClient.h" 16 #include "third_party/WebKit/public/platform/WebSocketHandleClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h" 17 #include "third_party/WebKit/public/platform/WebString.h"
14 #include "third_party/WebKit/public/platform/WebURL.h" 18 #include "third_party/WebKit/public/platform/WebURL.h"
15 #include "third_party/WebKit/public/platform/WebVector.h" 19 #include "third_party/WebKit/public/platform/WebVector.h"
16 20
17 using blink::WebSerializedOrigin; 21 using blink::WebSerializedOrigin;
18 using blink::WebSocketHandle; 22 using blink::WebSocketHandle;
19 using blink::WebSocketHandleClient; 23 using blink::WebSocketHandleClient;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 // Blink WebSocketHandleClient interface. 64 // Blink WebSocketHandleClient interface.
61 class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { 65 class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
62 public: 66 public:
63 explicit WebSocketClientImpl(WebSocketHandleImpl* handle, 67 explicit WebSocketClientImpl(WebSocketHandleImpl* handle,
64 blink::WebSocketHandleClient* client) 68 blink::WebSocketHandleClient* client)
65 : handle_(handle), client_(client) {} 69 : handle_(handle), client_(client) {}
66 virtual ~WebSocketClientImpl() {} 70 virtual ~WebSocketClientImpl() {}
67 71
68 private: 72 private:
69 // WebSocketClient methods: 73 // WebSocketClient methods:
70 virtual void DidConnect( 74 virtual void DidConnect(bool fail,
71 bool fail, 75 const String& selected_subprotocol,
72 const String& selected_subprotocol, 76 const String& extensions,
73 const String& extensions) OVERRIDE { 77 ScopedDataPipeConsumerHandle receive_stream)
78 OVERRIDE {
74 blink::WebSocketHandleClient* client = client_; 79 blink::WebSocketHandleClient* client = client_;
75 WebSocketHandleImpl* handle = handle_; 80 WebSocketHandleImpl* handle = handle_;
81 receive_stream_ = receive_stream.Pass();
82 read_queue_.reset(new WebSocketReadQueue(receive_stream_.get()));
76 if (fail) 83 if (fail)
77 handle->Disconnect(); // deletes |this| 84 handle->Disconnect(); // deletes |this|
78 client->didConnect(handle, 85 client->didConnect(handle,
79 fail, 86 fail,
80 selected_subprotocol.To<WebString>(), 87 selected_subprotocol.To<WebString>(),
81 extensions.To<WebString>()); 88 extensions.To<WebString>());
82 // |handle| can be deleted here. 89 // |handle| can be deleted here.
83 } 90 }
84 91
85 virtual void DidReceiveData(bool fin, 92 virtual void DidReceiveData(bool fin,
86 WebSocket::MessageType type, 93 WebSocket::MessageType type,
87 ScopedDataPipeConsumerHandle data_pipe) OVERRIDE { 94 uint32_t num_bytes) OVERRIDE {
88 uint32_t num_bytes; 95 read_queue_->Read(num_bytes,
89 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); 96 base::Bind(&WebSocketClientImpl::DidReadFromReceiveStream,
90 std::vector<char> data(num_bytes); 97 base::Unretained(this),
91 ReadDataRaw( 98 fin, type, num_bytes));
92 data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
93 const char* data_ptr = data.empty() ? NULL : &data[0];
94 client_->didReceiveData(handle_,
95 fin,
96 ConvertTo<WebSocketHandle::MessageType>(type),
97 data_ptr,
98 data.size());
99 // |handle| can be deleted here.
100 } 99 }
101 100
102 virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE { 101 virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE {
103 client_->didReceiveFlowControl(handle_, quota); 102 client_->didReceiveFlowControl(handle_, quota);
104 // |handle| can be deleted here. 103 // |handle| can be deleted here.
105 } 104 }
106 105
107 virtual void DidFail(const String& message) OVERRIDE { 106 virtual void DidFail(const String& message) OVERRIDE {
108 blink::WebSocketHandleClient* client = client_; 107 blink::WebSocketHandleClient* client = client_;
109 WebSocketHandleImpl* handle = handle_; 108 WebSocketHandleImpl* handle = handle_;
110 handle->Disconnect(); // deletes |this| 109 handle->Disconnect(); // deletes |this|
111 client->didFail(handle, message.To<WebString>()); 110 client->didFail(handle, message.To<WebString>());
112 // |handle| can be deleted here. 111 // |handle| can be deleted here.
113 } 112 }
114 113
115 virtual void DidClose(bool was_clean, 114 virtual void DidClose(bool was_clean,
116 uint16_t code, 115 uint16_t code,
117 const String& reason) OVERRIDE { 116 const String& reason) OVERRIDE {
118 blink::WebSocketHandleClient* client = client_; 117 blink::WebSocketHandleClient* client = client_;
119 WebSocketHandleImpl* handle = handle_; 118 WebSocketHandleImpl* handle = handle_;
120 handle->Disconnect(); // deletes |this| 119 handle->Disconnect(); // deletes |this|
121 client->didClose(handle, was_clean, code, reason.To<WebString>()); 120 client->didClose(handle, was_clean, code, reason.To<WebString>());
122 // |handle| can be deleted here. 121 // |handle| can be deleted here.
123 } 122 }
124 123
124 void DidReadFromReceiveStream(bool fin,
125 WebSocket::MessageType type,
126 uint32_t num_bytes,
127 const char* data) {
128 client_->didReceiveData(handle_,
129 fin,
130 ConvertTo<WebSocketHandle::MessageType>(type),
131 data,
132 num_bytes);
133 // |handle_| can be deleted here.
134 }
135
136 // |handle_| owns this object, so it is guaranteed to outlive us.
125 WebSocketHandleImpl* handle_; 137 WebSocketHandleImpl* handle_;
126 blink::WebSocketHandleClient* client_; 138 blink::WebSocketHandleClient* client_;
139 ScopedDataPipeConsumerHandle receive_stream_;
140 scoped_ptr<WebSocketReadQueue> read_queue_;
127 141
128 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); 142 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
129 }; 143 };
130 144
131 WebSocketHandleImpl::WebSocketHandleImpl(NetworkService* network_service) 145 WebSocketHandleImpl::WebSocketHandleImpl(NetworkService* network_service)
132 : did_close_(false) { 146 : did_close_(false) {
133 network_service->CreateWebSocket(Get(&web_socket_)); 147 network_service->CreateWebSocket(Get(&web_socket_));
134 } 148 }
135 149
136 WebSocketHandleImpl::~WebSocketHandleImpl() { 150 WebSocketHandleImpl::~WebSocketHandleImpl() {
137 if (!did_close_) { 151 if (!did_close_) {
138 // The connection is abruptly disconnected by the renderer without 152 // The connection is abruptly disconnected by the renderer without
139 // closing handshake. 153 // closing handshake.
140 web_socket_->Close(WebSocket::kAbnormalCloseCode, String()); 154 web_socket_->Close(WebSocket::kAbnormalCloseCode, String());
141 } 155 }
142 } 156 }
143 157
144 void WebSocketHandleImpl::connect(const WebURL& url, 158 void WebSocketHandleImpl::connect(const WebURL& url,
145 const WebVector<WebString>& protocols, 159 const WebVector<WebString>& protocols,
146 const WebSerializedOrigin& origin, 160 const WebSerializedOrigin& origin,
147 WebSocketHandleClient* client) { 161 WebSocketHandleClient* client) {
148 client_.reset(new WebSocketClientImpl(this, client)); 162 client_.reset(new WebSocketClientImpl(this, client));
149 WebSocketClientPtr client_ptr; 163 WebSocketClientPtr client_ptr;
150 // TODO(mpcomplete): Is this the right ownership model? Or should mojo own 164 // TODO(mpcomplete): Is this the right ownership model? Or should mojo own
151 // |client_|? 165 // |client_|?
152 WeakBindToProxy(client_.get(), &client_ptr); 166 WeakBindToProxy(client_.get(), &client_ptr);
167
168 DataPipe data_pipe;
169 send_stream_ = data_pipe.producer_handle.Pass();
170 write_queue_.reset(new WebSocketWriteQueue(send_stream_.get()));
153 web_socket_->Connect(url.string().utf8(), 171 web_socket_->Connect(url.string().utf8(),
154 Array<String>::From(protocols), 172 Array<String>::From(protocols),
155 origin.string().utf8(), 173 origin.string().utf8(),
174 data_pipe.consumer_handle.Pass(),
156 client_ptr.Pass()); 175 client_ptr.Pass());
157 } 176 }
158 177
159 void WebSocketHandleImpl::send(bool fin, 178 void WebSocketHandleImpl::send(bool fin,
160 WebSocketHandle::MessageType type, 179 WebSocketHandle::MessageType type,
161 const char* data, 180 const char* data,
162 size_t size) { 181 size_t size) {
163 if (!client_) 182 if (!client_)
164 return; 183 return;
165 184
166 // TODO(mpcomplete): reuse the data pipe for subsequent sends. 185 uint32_t size32 = static_cast<uint32_t>(size);
167 uint32_t num_bytes = static_cast<uint32_t>(size); 186 write_queue_->Write(
168 MojoCreateDataPipeOptions options; 187 data, size32,
169 options.struct_size = sizeof(MojoCreateDataPipeOptions); 188 base::Bind(&WebSocketHandleImpl::DidWriteToSendStream,
170 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; 189 base::Unretained(this),
171 options.element_num_bytes = 1; 190 fin, type, size32));
172 options.capacity_num_bytes = num_bytes;
173 DataPipe data_pipe(options);
174 WriteDataRaw(data_pipe.producer_handle.get(),
175 data,
176 &num_bytes,
177 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
178 web_socket_->Send(
179 fin,
180 ConvertTo<WebSocket::MessageType>(type),
181 data_pipe.consumer_handle.Pass());
182 } 191 }
183 192
184 void WebSocketHandleImpl::flowControl(int64_t quota) { 193 void WebSocketHandleImpl::flowControl(int64_t quota) {
185 if (!client_) 194 if (!client_)
186 return; 195 return;
187 196
188 web_socket_->FlowControl(quota); 197 web_socket_->FlowControl(quota);
189 } 198 }
190 199
191 void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) { 200 void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) {
192 web_socket_->Close(code, reason.utf8()); 201 web_socket_->Close(code, reason.utf8());
193 } 202 }
194 203
204 void WebSocketHandleImpl::DidWriteToSendStream(
205 bool fin,
206 WebSocketHandle::MessageType type,
207 uint32_t num_bytes,
208 const char* data) {
209 web_socket_->Send(fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
210 }
211
195 void WebSocketHandleImpl::Disconnect() { 212 void WebSocketHandleImpl::Disconnect() {
196 did_close_ = true; 213 did_close_ = true;
197 client_.reset(); 214 client_.reset();
198 } 215 }
199 216
200 } // namespace mojo 217 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698