Chromium Code Reviews

Side by Side Diff: mojo/services/html_viewer/websockethandle_impl.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff |
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/html_viewer/websockethandle_impl.h" 5 #include "mojo/services/html_viewer/websockethandle_impl.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h"
10 #include "base/memory/scoped_vector.h"
9 #include "mojo/services/html_viewer/blink_basic_type_converters.h" 11 #include "mojo/services/html_viewer/blink_basic_type_converters.h"
12 #include "mojo/services/network/web_socket_data_pipe_queue.h"
10 #include "mojo/services/public/interfaces/network/network_service.mojom.h" 13 #include "mojo/services/public/interfaces/network/network_service.mojom.h"
11 #include "third_party/WebKit/public/platform/WebSerializedOrigin.h" 14 #include "third_party/WebKit/public/platform/WebSerializedOrigin.h"
12 #include "third_party/WebKit/public/platform/WebSocketHandleClient.h" 15 #include "third_party/WebKit/public/platform/WebSocketHandleClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h" 16 #include "third_party/WebKit/public/platform/WebString.h"
14 #include "third_party/WebKit/public/platform/WebURL.h" 17 #include "third_party/WebKit/public/platform/WebURL.h"
15 #include "third_party/WebKit/public/platform/WebVector.h" 18 #include "third_party/WebKit/public/platform/WebVector.h"
16 19
17 using blink::WebSerializedOrigin; 20 using blink::WebSerializedOrigin;
18 using blink::WebSocketHandle; 21 using blink::WebSocketHandle;
19 using blink::WebSocketHandleClient; 22 using blink::WebSocketHandleClient;
(...skipping 40 matching lines...)
60 // Blink WebSocketHandleClient interface. 63 // Blink WebSocketHandleClient interface.
61 class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> { 64 class WebSocketClientImpl : public InterfaceImpl<WebSocketClient> {
62 public: 65 public:
63 explicit WebSocketClientImpl(WebSocketHandleImpl* handle, 66 explicit WebSocketClientImpl(WebSocketHandleImpl* handle,
64 blink::WebSocketHandleClient* client) 67 blink::WebSocketHandleClient* client)
65 : handle_(handle), client_(client) {} 68 : handle_(handle), client_(client) {}
66 virtual ~WebSocketClientImpl() {} 69 virtual ~WebSocketClientImpl() {}
67 70
68 private: 71 private:
69 // WebSocketClient methods: 72 // WebSocketClient methods:
70 virtual void DidConnect( 73 virtual void DidConnect(bool fail,
71 bool fail, 74 const String& selected_subprotocol,
72 const String& selected_subprotocol, 75 const String& extensions,
73 const String& extensions) OVERRIDE { 76 ScopedDataPipeConsumerHandle receive_stream)
77 OVERRIDE {
74 blink::WebSocketHandleClient* client = client_; 78 blink::WebSocketHandleClient* client = client_;
75 WebSocketHandleImpl* handle = handle_; 79 WebSocketHandleImpl* handle = handle_;
80 receive_stream_ = receive_stream.Pass();
81 read_queue_.reset(new WebSocketReadQueue(receive_stream_.get()));
76 if (fail) 82 if (fail)
77 handle->Disconnect(); // deletes |this| 83 handle->Disconnect(); // deletes |this|
78 client->didConnect(handle, 84 client->didConnect(handle,
79 fail, 85 fail,
80 selected_subprotocol.To<WebString>(), 86 selected_subprotocol.To<WebString>(),
81 extensions.To<WebString>()); 87 extensions.To<WebString>());
82 // |handle| can be deleted here. 88 // |handle| can be deleted here.
83 } 89 }
84 90
85 virtual void DidReceiveData(bool fin, 91 virtual void DidReceiveData(bool fin,
86 WebSocket::MessageType type, 92 WebSocket::MessageType type,
87 ScopedDataPipeConsumerHandle data_pipe) OVERRIDE { 93 uint32_t num_bytes) OVERRIDE {
88 uint32_t num_bytes; 94 read_queue_->Read(num_bytes,
89 ReadDataRaw(data_pipe.get(), NULL, &num_bytes, MOJO_READ_DATA_FLAG_QUERY); 95 base::Bind(&WebSocketClientImpl::DidReadFromReceiveStream,
90 std::vector<char> data(num_bytes); 96 base::Unretained(this),
91 ReadDataRaw( 97 fin, type, num_bytes));
92 data_pipe.get(), &data[0], &num_bytes, MOJO_READ_DATA_FLAG_NONE);
93 const char* data_ptr = data.empty() ? NULL : &data[0];
94 client_->didReceiveData(handle_,
95 fin,
96 ConvertTo<WebSocketHandle::MessageType>(type),
97 data_ptr,
98 data.size());
99 // |handle| can be deleted here.
100 } 98 }
101 99
102 virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE { 100 virtual void DidReceiveFlowControl(int64_t quota) OVERRIDE {
103 client_->didReceiveFlowControl(handle_, quota); 101 client_->didReceiveFlowControl(handle_, quota);
104 // |handle| can be deleted here. 102 // |handle| can be deleted here.
105 } 103 }
106 104
107 virtual void DidFail(const String& message) OVERRIDE { 105 virtual void DidFail(const String& message) OVERRIDE {
108 blink::WebSocketHandleClient* client = client_; 106 blink::WebSocketHandleClient* client = client_;
109 WebSocketHandleImpl* handle = handle_; 107 WebSocketHandleImpl* handle = handle_;
110 handle->Disconnect(); // deletes |this| 108 handle->Disconnect(); // deletes |this|
111 client->didFail(handle, message.To<WebString>()); 109 client->didFail(handle, message.To<WebString>());
112 // |handle| can be deleted here. 110 // |handle| can be deleted here.
113 } 111 }
114 112
115 virtual void DidClose(bool was_clean, 113 virtual void DidClose(bool was_clean,
116 uint16_t code, 114 uint16_t code,
117 const String& reason) OVERRIDE { 115 const String& reason) OVERRIDE {
118 blink::WebSocketHandleClient* client = client_; 116 blink::WebSocketHandleClient* client = client_;
119 WebSocketHandleImpl* handle = handle_; 117 WebSocketHandleImpl* handle = handle_;
120 handle->Disconnect(); // deletes |this| 118 handle->Disconnect(); // deletes |this|
121 client->didClose(handle, was_clean, code, reason.To<WebString>()); 119 client->didClose(handle, was_clean, code, reason.To<WebString>());
122 // |handle| can be deleted here. 120 // |handle| can be deleted here.
123 } 121 }
124 122
123 void DidReadFromReceiveStream(bool fin,
124 WebSocket::MessageType type,
125 uint32_t num_bytes,
126 const char* data) {
127 client_->didReceiveData(handle_,
128 fin,
129 ConvertTo<WebSocketHandle::MessageType>(type),
130 data,
131 num_bytes);
132 // |handle_| can be deleted here.
133 }
134
135 // |handle_| owns this object, so it is guaranteed to outlive us.
125 WebSocketHandleImpl* handle_; 136 WebSocketHandleImpl* handle_;
126 blink::WebSocketHandleClient* client_; 137 blink::WebSocketHandleClient* client_;
138 ScopedDataPipeConsumerHandle receive_stream_;
139 scoped_ptr<WebSocketReadQueue> read_queue_;
127 140
128 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); 141 DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl);
129 }; 142 };
130 143
131 WebSocketHandleImpl::WebSocketHandleImpl(NetworkService* network_service) 144 WebSocketHandleImpl::WebSocketHandleImpl(NetworkService* network_service)
132 : did_close_(false) { 145 : did_close_(false) {
133 network_service->CreateWebSocket(Get(&web_socket_)); 146 network_service->CreateWebSocket(Get(&web_socket_));
134 } 147 }
135 148
136 WebSocketHandleImpl::~WebSocketHandleImpl() { 149 WebSocketHandleImpl::~WebSocketHandleImpl() {
137 if (!did_close_) { 150 if (!did_close_) {
138 // The connection is abruptly disconnected by the renderer without 151 // The connection is abruptly disconnected by the renderer without
139 // closing handshake. 152 // closing handshake.
140 web_socket_->Close(WebSocket::kAbnormalCloseCode, String()); 153 web_socket_->Close(WebSocket::kAbnormalCloseCode, String());
141 } 154 }
142 } 155 }
143 156
144 void WebSocketHandleImpl::connect(const WebURL& url, 157 void WebSocketHandleImpl::connect(const WebURL& url,
145 const WebVector<WebString>& protocols, 158 const WebVector<WebString>& protocols,
146 const WebSerializedOrigin& origin, 159 const WebSerializedOrigin& origin,
147 WebSocketHandleClient* client) { 160 WebSocketHandleClient* client) {
148 client_.reset(new WebSocketClientImpl(this, client)); 161 client_.reset(new WebSocketClientImpl(this, client));
149 WebSocketClientPtr client_ptr; 162 WebSocketClientPtr client_ptr;
150 // TODO(mpcomplete): Is this the right ownership model? Or should mojo own 163 // TODO(mpcomplete): Is this the right ownership model? Or should mojo own
151 // |client_|? 164 // |client_|?
152 WeakBindToProxy(client_.get(), &client_ptr); 165 WeakBindToProxy(client_.get(), &client_ptr);
166
167 DataPipe data_pipe;
168 send_stream_ = data_pipe.producer_handle.Pass();
169 write_queue_.reset(new WebSocketWriteQueue(send_stream_.get()));
153 web_socket_->Connect(url.string().utf8(), 170 web_socket_->Connect(url.string().utf8(),
154 Array<String>::From(protocols), 171 Array<String>::From(protocols),
155 origin.string().utf8(), 172 origin.string().utf8(),
173 data_pipe.consumer_handle.Pass(),
156 client_ptr.Pass()); 174 client_ptr.Pass());
157 } 175 }
158 176
159 void WebSocketHandleImpl::send(bool fin, 177 void WebSocketHandleImpl::send(bool fin,
160 WebSocketHandle::MessageType type, 178 WebSocketHandle::MessageType type,
161 const char* data, 179 const char* data,
162 size_t size) { 180 size_t size) {
163 if (!client_) 181 if (!client_)
164 return; 182 return;
165 183
166 // TODO(mpcomplete): reuse the data pipe for subsequent sends. 184 write_queue_->Write(
167 uint32_t num_bytes = static_cast<uint32_t>(size); 185 data, size,
168 MojoCreateDataPipeOptions options; 186 base::Bind(&WebSocketHandleImpl::DidWriteToSendStream,
169 options.struct_size = sizeof(MojoCreateDataPipeOptions); 187 base::Unretained(this),
170 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; 188 fin, type, size));
171 options.element_num_bytes = 1;
172 options.capacity_num_bytes = num_bytes;
173 DataPipe data_pipe(options);
174 WriteDataRaw(data_pipe.producer_handle.get(),
175 data,
176 &num_bytes,
177 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
178 web_socket_->Send(
179 fin,
180 ConvertTo<WebSocket::MessageType>(type),
181 data_pipe.consumer_handle.Pass());
182 } 189 }
183 190
184 void WebSocketHandleImpl::flowControl(int64_t quota) { 191 void WebSocketHandleImpl::flowControl(int64_t quota) {
185 if (!client_) 192 if (!client_)
186 return; 193 return;
187 194
188 web_socket_->FlowControl(quota); 195 web_socket_->FlowControl(quota);
189 } 196 }
190 197
191 void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) { 198 void WebSocketHandleImpl::close(unsigned short code, const WebString& reason) {
192 web_socket_->Close(code, reason.utf8()); 199 web_socket_->Close(code, reason.utf8());
193 } 200 }
194 201
202 void WebSocketHandleImpl::DidWriteToSendStream(
203 bool fin,
204 WebSocketHandle::MessageType type,
205 uint32_t num_bytes,
206 const char* data) {
207 web_socket_->Send(fin, ConvertTo<WebSocket::MessageType>(type), num_bytes);
208 }
209
195 void WebSocketHandleImpl::Disconnect() { 210 void WebSocketHandleImpl::Disconnect() {
196 did_close_ = true; 211 did_close_ = true;
197 client_.reset(); 212 client_.reset();
198 } 213 }
199 214
200 } // namespace mojo 215 } // namespace mojo
OLDNEW

Powered by Google App Engine