Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: Source/modules/websockets/NewWebSocketChannelImpl.cpp

Issue 23464050: Implement suspend / resume in NewWebSocketChannelImpl. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2013 Google Inc. All rights reserved. 2 * Copyright (C) 2013 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 26 matching lines...) Expand all
37 #include "core/platform/NotImplemented.h" 37 #include "core/platform/NotImplemented.h"
38 #include "modules/websockets/WebSocketChannel.h" 38 #include "modules/websockets/WebSocketChannel.h"
39 #include "modules/websockets/WebSocketChannelClient.h" 39 #include "modules/websockets/WebSocketChannelClient.h"
40 #include "public/platform/Platform.h" 40 #include "public/platform/Platform.h"
41 #include "public/platform/WebSocketHandle.h" 41 #include "public/platform/WebSocketHandle.h"
42 #include "public/platform/WebString.h" 42 #include "public/platform/WebString.h"
43 #include "public/platform/WebURL.h" 43 #include "public/platform/WebURL.h"
44 #include "public/platform/WebVector.h" 44 #include "public/platform/WebVector.h"
45 #include "weborigin/SecurityOrigin.h" 45 #include "weborigin/SecurityOrigin.h"
46 #include "wtf/ArrayBuffer.h" 46 #include "wtf/ArrayBuffer.h"
47 #include "wtf/Vector.h"
47 #include "wtf/text/WTFString.h" 48 #include "wtf/text/WTFString.h"
48 49
49 // FIXME: We should implement Inspector notification. 50 // FIXME: We should implement Inspector notification.
50 // FIXME: We should implement send(Blob). 51 // FIXME: We should implement send(Blob).
51 // FIXME: We should implement suspend / resume.
52 // FIXME: We should add log messages. 52 // FIXME: We should add log messages.
53 53
54 using WebKit::WebSocketHandle; 54 using WebKit::WebSocketHandle;
55 55
56 namespace WebCore { 56 namespace WebCore {
57 57
58 namespace { 58 namespace {
59 59
60 bool isClean(int code) 60 bool isClean(int code)
61 { 61 {
62 return code == WebSocketChannel::CloseEventCodeNormalClosure 62 return code == WebSocketChannel::CloseEventCodeNormalClosure
63 || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code 63 || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code
64 && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined); 64 && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined);
65 } 65 }
66 66
67 } // namespace 67 } // namespace
68 68
69 class NewWebSocketChannelImpl::Resumer {
70 public:
71 struct PendingEvent {
abarth-chromium 2013/09/10 16:24:15 I would move this class out from inside Resumer in
yhirano 2013/09/12 01:35:49 Done.
72 enum Type {
73 DidConnectComplete,
74 DidReceiveTextMessage,
75 DidReceiveBinaryMessage,
76 DidReceiveError,
77 DidClose,
78 };
79 Type type;
80 Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMes sage
81 int closingCode; // for DidClose
82 String closingReason; // for DidClose
83
84 PendingEvent(Type type) : type(type), closingCode(0) { }
85 PendingEvent(Type, Vector<char>*);
86 PendingEvent(int code, const String& reason) : type(DidClose), closingCo de(code), closingReason(reason) { }
87 };
88
89 Resumer(NewWebSocketChannelImpl*);
abarth-chromium 2013/09/10 16:24:15 Please mark one-argument constructors explicit.
yhirano 2013/09/12 01:35:49 Done.
90 ~Resumer();
91
92 void append(const PendingEvent&);
93 void suspend();
94 void resumeLater();
95 void abort();
96
97 private:
98 class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
abarth-chromium 2013/09/10 16:24:15 I'd also move this class into the anonymous namesp
yhirano 2013/09/12 01:35:49 Done.
99 public:
100 PendingEventProcessor() : m_isAborted(false) { }
101 virtual ~PendingEventProcessor() { }
102 void abort() { m_isAborted = true; }
103 void append(const PendingEvent& e) { m_events.append(e); }
104 void process(NewWebSocketChannelImpl*);
105
106 private:
107 bool m_isAborted;
108 Vector<PendingEvent> m_events;
109 };
110
111 void resumeNow(Timer<Resumer>*);
112
113 NewWebSocketChannelImpl* m_channel;
114 RefPtr<PendingEventProcessor> m_pendingEventProcessor;
115 Timer<Resumer> m_timer;
116 bool m_isAborted;
117 };
118
119 NewWebSocketChannelImpl::Resumer::PendingEvent::PendingEvent(Type type, Vector<c har>* data) : type(type), closingCode(0)
abarth-chromium 2013/09/10 16:24:15 These initializers go on separate lines.
yhirano 2013/09/12 01:35:49 Done.
120 {
121 ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
122 message.swap(*data);
123 }
124
125 NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
126 : m_channel(channel)
127 , m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
128 , m_timer(this, &Resumer::resumeNow)
129 , m_isAborted(false) { }
130
131 NewWebSocketChannelImpl::Resumer::~Resumer()
132 {
133 abort();
134 }
135
136 void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
137 {
138 if (m_isAborted)
139 return;
140 m_pendingEventProcessor->append(event);
141 }
142
143 void NewWebSocketChannelImpl::Resumer::suspend()
144 {
145 if (m_isAborted)
146 return;
147 m_timer.stop();
148 m_channel->m_isSuspended = true;
149 }
150
151 void NewWebSocketChannelImpl::Resumer::resumeLater()
152 {
153 if (m_isAborted)
154 return;
155 if (!m_timer.isActive())
156 m_timer.startOneShot(0);
157 }
158
159 void NewWebSocketChannelImpl::Resumer::abort()
160 {
161 if (m_isAborted)
162 return;
163 m_isAborted = true;
164 m_timer.stop();
165 m_pendingEventProcessor->abort();
166 m_pendingEventProcessor = 0;
167 }
168
169 void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
170 {
171 ASSERT(!m_isAborted);
172 m_channel->m_isSuspended = false;
173
174 ASSERT(m_channel->m_client);
175 if (m_channel->m_handle) {
176 m_channel->sendInternal();
177 m_channel->flowControlIfNecessary();
178 }
179 m_pendingEventProcessor->process(m_channel);
180 // |this| can be aborted here.
181 // |this| can be deleted here.
182 }
183
184 void NewWebSocketChannelImpl::Resumer::PendingEventProcessor::process(NewWebSock etChannelImpl* channel)
185 {
186 RefPtr<PendingEventProcessor> protect(this);
187 for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
188 PendingEvent& event = m_events[i];
189 switch (event.type) {
190 case PendingEvent::DidConnectComplete:
191 channel->handleDidConnect();
192 // |this| can be detached here.
193 break;
194 case PendingEvent::DidReceiveTextMessage:
195 channel->handleTextMessage(&event.message);
196 // |this| can be detached here.
197 break;
198 case PendingEvent::DidReceiveBinaryMessage:
199 channel->handleBinaryMessage(&event.message);
200 // |this| can be detached here.
201 break;
202 case PendingEvent::DidReceiveError:
203 channel->handleDidReceiveMessageError();
204 // |this| can be detached here.
205 break;
206 case PendingEvent::DidClose:
207 channel->handleDidClose(event.closingCode, event.closingReason);
208 // |this| can be detached here.
209 break;
210 }
211 }
212 m_events.clear();
213 }
214
69 NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context , WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) 215 NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context , WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
70 : ContextLifecycleObserver(context) 216 : ContextLifecycleObserver(context)
71 , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle())) 217 , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
72 , m_client(client) 218 , m_client(client)
219 , m_resumer(adoptPtr(new Resumer(this)))
220 , m_isSuspended(false)
73 , m_sendingQuota(0) 221 , m_sendingQuota(0)
74 , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMa rk * 2) // initial quota 222 , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMa rk * 2) // initial quota
75 , m_bufferedAmount(0) 223 , m_bufferedAmount(0)
76 , m_sentSizeOfTopMessage(0) 224 , m_sentSizeOfTopMessage(0)
77 { 225 {
78 } 226 }
79 227
228 NewWebSocketChannelImpl::~NewWebSocketChannelImpl()
229 {
230 }
231
80 void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol) 232 void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol)
81 { 233 {
82 if (!m_handle) 234 if (!m_handle)
83 return; 235 return;
84 m_url = url; 236 m_url = url;
85 Vector<String> protocols; 237 Vector<String> protocols;
86 // Since protocol is already verified and escaped, we can simply split it. 238 // Since protocol is already verified and escaped, we can simply split it.
87 protocol.split(", ", true, protocols); 239 protocol.split(", ", true, protocols);
88 WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size()); 240 WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size());
89 for (size_t i = 0; i < protocols.size(); ++i) { 241 for (size_t i = 0; i < protocols.size(); ++i) {
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
135 { 287 {
136 ASSERT(m_handle); 288 ASSERT(m_handle);
137 m_handle->close(static_cast<unsigned short>(code), reason); 289 m_handle->close(static_cast<unsigned short>(code), reason);
138 } 290 }
139 291
140 void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con st String& sourceURL, unsigned lineNumber) 292 void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con st String& sourceURL, unsigned lineNumber)
141 { 293 {
142 // m_handle and m_client can be null here. 294 // m_handle and m_client can be null here.
143 if (m_client) 295 if (m_client)
144 m_client->didReceiveMessageError(); 296 m_client->didReceiveMessageError();
145 handleDidClose(CloseEventCodeAbnormalClosure, reason); 297 if (m_isSuspended) {
146 // handleDidClose may delete this object. 298 m_resumer->append(Resumer::PendingEvent(CloseEventCodeAbnormalClosure, r eason));
299 } else {
300 handleDidClose(CloseEventCodeAbnormalClosure, reason);
301 // handleDidClose may delete this object.
302 }
147 } 303 }
148 304
149 void NewWebSocketChannelImpl::disconnect() 305 void NewWebSocketChannelImpl::disconnect()
150 { 306 {
151 if (m_handle) 307 if (m_handle)
152 m_handle->close(CloseEventCodeAbnormalClosure, ""); 308 m_handle->close(CloseEventCodeAbnormalClosure, "");
153 m_handle.clear(); 309 m_handle.clear();
154 m_client = 0; 310 m_client = 0;
311 m_resumer->abort();
155 } 312 }
156 313
157 void NewWebSocketChannelImpl::suspend() 314 void NewWebSocketChannelImpl::suspend()
158 { 315 {
159 notImplemented(); 316 m_resumer->suspend();
160 } 317 }
161 318
162 void NewWebSocketChannelImpl::resume() 319 void NewWebSocketChannelImpl::resume()
163 { 320 {
164 notImplemented(); 321 m_resumer->resumeLater();
165 } 322 }
166 323
167 NewWebSocketChannelImpl::Message::Message(const String& text) 324 NewWebSocketChannelImpl::Message::Message(const String& text)
168 : type(MessageTypeText) 325 : type(MessageTypeText)
169 , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD )) { } 326 , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD )) { }
170 327
171 NewWebSocketChannelImpl::Message::Message(const Blob& blob) 328 NewWebSocketChannelImpl::Message::Message(const Blob& blob)
172 : type(MessageTypeBlob) 329 : type(MessageTypeBlob)
173 , blob(Blob::create(blob.url(), blob.type(), blob.size())) { } 330 , blob(Blob::create(blob.url(), blob.type(), blob.size())) { }
174 331
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 { 382 {
226 if (!m_handle || m_receivedDataSizeForFlowControl < receivedDataSizeForFlowC ontrolHighWaterMark) { 383 if (!m_handle || m_receivedDataSizeForFlowControl < receivedDataSizeForFlowC ontrolHighWaterMark) {
227 return; 384 return;
228 } 385 }
229 m_handle->flowControl(m_receivedDataSizeForFlowControl); 386 m_handle->flowControl(m_receivedDataSizeForFlowControl);
230 m_receivedDataSizeForFlowControl = 0; 387 m_receivedDataSizeForFlowControl = 0;
231 } 388 }
232 389
233 void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions) 390 void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
234 { 391 {
235 if (!m_handle) { 392 ASSERT(m_handle);
236 return;
237 }
238 ASSERT(handle == m_handle); 393 ASSERT(handle == m_handle);
239 ASSERT(m_client); 394 ASSERT(m_client);
240 if (!succeed) { 395 if (!succeed) {
241 failAsError("Cannot connect to " + m_url.string() + "."); 396 failAsError("Cannot connect to " + m_url.string() + ".");
242 // failAsError may delete this object. 397 // failAsError may delete this object.
243 return; 398 return;
244 } 399 }
245 m_subprotocol = selectedProtocol; 400 m_subprotocol = selectedProtocol;
246 m_extensions = extensions; 401 m_extensions = extensions;
247 m_client->didConnect(); 402 if (m_isSuspended)
403 m_resumer->append(Resumer::PendingEvent(Resumer::PendingEvent::DidConnec tComplete));
404 else
405 m_client->didConnect();
248 } 406 }
249 407
250 void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH andle::MessageType type, const char* data, size_t size, bool fin) 408 void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH andle::MessageType type, const char* data, size_t size, bool fin)
251 { 409 {
252 if (!m_handle) { 410 ASSERT(m_handle);
253 return;
254 }
255 ASSERT(handle == m_handle); 411 ASSERT(handle == m_handle);
256 ASSERT(m_client); 412 ASSERT(m_client);
257 // Non-final frames cannot be empty. 413 // Non-final frames cannot be empty.
258 ASSERT(fin || size); 414 ASSERT(fin || size);
259 switch (type) { 415 switch (type) {
260 case WebSocketHandle::MessageTypeText: 416 case WebSocketHandle::MessageTypeText:
261 ASSERT(m_receivingMessageData.isEmpty()); 417 ASSERT(m_receivingMessageData.isEmpty());
262 m_receivingMessageTypeIsText = true; 418 m_receivingMessageTypeIsText = true;
263 break; 419 break;
264 case WebSocketHandle::MessageTypeBinary: 420 case WebSocketHandle::MessageTypeBinary:
265 ASSERT(m_receivingMessageData.isEmpty()); 421 ASSERT(m_receivingMessageData.isEmpty());
266 m_receivingMessageTypeIsText = false; 422 m_receivingMessageTypeIsText = false;
267 break; 423 break;
268 case WebSocketHandle::MessageTypeContinuation: 424 case WebSocketHandle::MessageTypeContinuation:
269 ASSERT(!m_receivingMessageData.isEmpty()); 425 ASSERT(!m_receivingMessageData.isEmpty());
270 break; 426 break;
271 } 427 }
272 m_receivingMessageData.append(data, size); 428 m_receivingMessageData.append(data, size);
273 m_receivedDataSizeForFlowControl += size; 429 m_receivedDataSizeForFlowControl += size;
274 flowControlIfNecessary(); 430 flowControlIfNecessary();
275 if (!fin) { 431 if (!fin) {
276 return; 432 return;
277 } 433 }
434 if (m_isSuspended) {
435 Resumer::PendingEvent::Type type =
436 m_receivingMessageTypeIsText ? Resumer::PendingEvent::DidReceiveText Message : Resumer::PendingEvent::DidReceiveBinaryMessage;
437 m_resumer->append(Resumer::PendingEvent(type, &m_receivingMessageData));
438 return;
439 }
278 Vector<char> messageData; 440 Vector<char> messageData;
279 messageData.swap(m_receivingMessageData); 441 messageData.swap(m_receivingMessageData);
280 if (m_receivingMessageTypeIsText) { 442 if (m_receivingMessageTypeIsText) {
281 handleTextMessage(&messageData); 443 handleTextMessage(&messageData);
282 // handleTextMessage may delete this object. 444 // handleTextMessage may delete this object.
283 } else { 445 } else {
284 handleBinaryMessage(&messageData); 446 handleBinaryMessage(&messageData);
285 } 447 }
286 } 448 }
287 449
288 450
289 void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short c ode, const WebKit::WebString& reason) 451 void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short c ode, const WebKit::WebString& reason)
290 { 452 {
453 ASSERT(m_handle);
454 m_handle.clear();
291 // FIXME: Maybe we should notify an error to m_client for some didClose mess ages. 455 // FIXME: Maybe we should notify an error to m_client for some didClose mess ages.
292 handleDidClose(code, reason); 456 if (m_isSuspended) {
293 // handleDidClose may delete this object. 457 m_resumer->append(Resumer::PendingEvent(code, reason));
458 } else {
459 handleDidClose(code, reason);
460 // handleDidClose may delete this object.
461 }
462 }
463
464 void NewWebSocketChannelImpl::handleDidConnect()
465 {
466 ASSERT(m_client);
467 ASSERT(!m_isSuspended);
468 m_client->didConnect();
294 } 469 }
295 470
296 void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData) 471 void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
297 { 472 {
298 ASSERT(m_client); 473 ASSERT(m_client);
474 ASSERT(!m_isSuspended);
299 ASSERT(messageData); 475 ASSERT(messageData);
300 String message = ""; 476 String message = "";
301 if (m_receivingMessageData.size() > 0) { 477 if (m_receivingMessageData.size() > 0) {
302 message = String::fromUTF8(messageData->data(), messageData->size()); 478 message = String::fromUTF8(messageData->data(), messageData->size());
303 } 479 }
304 // For consistency with handleBinaryMessage, we clear |messageData|. 480 // For consistency with handleBinaryMessage, we clear |messageData|.
305 messageData->clear(); 481 messageData->clear();
306 if (message.isNull()) { 482 if (message.isNull()) {
307 failAsError("Could not decode a text frame as UTF-8."); 483 failAsError("Could not decode a text frame as UTF-8.");
308 // failAsError may delete this object. 484 // failAsError may delete this object.
309 } else { 485 } else {
310 m_client->didReceiveMessage(message); 486 m_client->didReceiveMessage(message);
311 } 487 }
312 } 488 }
313 489
314 void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData) 490 void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
315 { 491 {
316 ASSERT(m_client); 492 ASSERT(m_client);
493 ASSERT(!m_isSuspended);
317 ASSERT(messageData); 494 ASSERT(messageData);
318 OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>); 495 OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
319 messageData->swap(*binaryData); 496 messageData->swap(*binaryData);
320 m_client->didReceiveBinaryData(binaryData.release()); 497 m_client->didReceiveBinaryData(binaryData.release());
321 } 498 }
322 499
500 void NewWebSocketChannelImpl::handleDidReceiveMessageError()
501 {
502 ASSERT(m_client);
503 ASSERT(!m_isSuspended);
504 m_client->didReceiveMessageError();
505 }
506
323 void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason) 507 void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
324 { 508 {
509 ASSERT(!m_isSuspended);
325 m_handle.clear(); 510 m_handle.clear();
511 m_resumer->abort();
326 if (!m_client) { 512 if (!m_client) {
327 return; 513 return;
328 } 514 }
329 WebSocketChannelClient* client = m_client; 515 WebSocketChannelClient* client = m_client;
330 m_client = 0; 516 m_client = 0;
331 WebSocketChannelClient::ClosingHandshakeCompletionStatus status = 517 WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
332 isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSo cketChannelClient::ClosingHandshakeIncomplete; 518 isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSo cketChannelClient::ClosingHandshakeIncomplete;
333 client->didClose(m_bufferedAmount, status, code, reason); 519 client->didClose(m_bufferedAmount, status, code, reason);
334 // client->didClose may delete this object. 520 // client->didClose may delete this object.
335 } 521 }
336 522
337 } // namespace WebCore 523 } // namespace WebCore
OLDNEW
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698