Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: Source/modules/websockets/NewWebSocketChannelImpl.cpp

Issue 23464050: Implement suspend / resume in NewWebSocketChannelImpl. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright (C) 2013 Google Inc. All rights reserved. 2 * Copyright (C) 2013 Google Inc. All rights reserved.
3 * 3 *
4 * Redistribution and use in source and binary forms, with or without 4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are 5 * modification, are permitted provided that the following conditions are
6 * met: 6 * met:
7 * 7 *
8 * * Redistributions of source code must retain the above copyright 8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer. 9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above 10 * * Redistributions in binary form must reproduce the above
(...skipping 26 matching lines...) Expand all
37 #include "core/platform/NotImplemented.h" 37 #include "core/platform/NotImplemented.h"
38 #include "modules/websockets/WebSocketChannel.h" 38 #include "modules/websockets/WebSocketChannel.h"
39 #include "modules/websockets/WebSocketChannelClient.h" 39 #include "modules/websockets/WebSocketChannelClient.h"
40 #include "public/platform/Platform.h" 40 #include "public/platform/Platform.h"
41 #include "public/platform/WebSocketHandle.h" 41 #include "public/platform/WebSocketHandle.h"
42 #include "public/platform/WebString.h" 42 #include "public/platform/WebString.h"
43 #include "public/platform/WebURL.h" 43 #include "public/platform/WebURL.h"
44 #include "public/platform/WebVector.h" 44 #include "public/platform/WebVector.h"
45 #include "weborigin/SecurityOrigin.h" 45 #include "weborigin/SecurityOrigin.h"
46 #include "wtf/ArrayBuffer.h" 46 #include "wtf/ArrayBuffer.h"
47 #include "wtf/Vector.h"
47 #include "wtf/text/WTFString.h" 48 #include "wtf/text/WTFString.h"
48 49
49 // FIXME: We should implement Inspector notification. 50 // FIXME: We should implement Inspector notification.
50 // FIXME: We should implement send(Blob). 51 // FIXME: We should implement send(Blob).
51 // FIXME: We should implement suspend / resume.
52 // FIXME: We should add log messages. 52 // FIXME: We should add log messages.
53 53
54 using WebKit::WebSocketHandle; 54 using WebKit::WebSocketHandle;
55 55
56 namespace WebCore { 56 namespace WebCore {
57 57
58 namespace { 58 namespace {
59 59
60 struct PendingEvent {
61 enum Type {
62 DidConnectComplete,
63 DidReceiveTextMessage,
64 DidReceiveBinaryMessage,
65 DidReceiveError,
66 DidClose,
67 };
68 Type type;
69 Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage
70 int closingCode; // for DidClose
71 String closingReason; // for DidClose
72
73 explicit PendingEvent(Type type) : type(type), closingCode(0) { }
74 PendingEvent(Type, Vector<char>*);
75 PendingEvent(int code, const String& reason) : type(DidClose), closingCode(c ode), closingReason(reason) { }
76 };
77
78 class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
79 public:
80 PendingEventProcessor() : m_isAborted(false) { }
81 virtual ~PendingEventProcessor() { }
82 void abort() { m_isAborted = true; }
83 void append(const PendingEvent& e) { m_events.append(e); }
84 void process(NewWebSocketChannelImpl*);
85
86 private:
87 bool m_isAborted;
88 Vector<PendingEvent> m_events;
89 };
90
91 PendingEvent::PendingEvent(Type type, Vector<char>* data)
92 : type(type)
93 , closingCode(0)
94 {
95 ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
96 message.swap(*data);
97 }
98
99 void PendingEventProcessor::process(NewWebSocketChannelImpl* channel)
100 {
101 RefPtr<PendingEventProcessor> protect(this);
102 for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
103 PendingEvent& event = m_events[i];
104 switch (event.type) {
105 case PendingEvent::DidConnectComplete:
106 channel->handleDidConnect();
107 // |this| can be detached here.
108 break;
109 case PendingEvent::DidReceiveTextMessage:
110 channel->handleTextMessage(&event.message);
111 // |this| can be detached here.
112 break;
113 case PendingEvent::DidReceiveBinaryMessage:
114 channel->handleBinaryMessage(&event.message);
115 // |this| can be detached here.
116 break;
117 case PendingEvent::DidReceiveError:
118 channel->handleDidReceiveMessageError();
119 // |this| can be detached here.
120 break;
121 case PendingEvent::DidClose:
122 channel->handleDidClose(event.closingCode, event.closingReason);
123 // |this| can be detached here.
124 break;
125 }
126 }
127 m_events.clear();
128 }
129
60 bool isClean(int code) 130 bool isClean(int code)
61 { 131 {
62 return code == WebSocketChannel::CloseEventCodeNormalClosure 132 return code == WebSocketChannel::CloseEventCodeNormalClosure
63 || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code 133 || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code
64 && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined); 134 && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined);
65 } 135 }
66 136
67 } // namespace 137 } // namespace
68 138
139 class NewWebSocketChannelImpl::Resumer {
140 public:
141 explicit Resumer(NewWebSocketChannelImpl*);
142 ~Resumer();
143
144 void append(const PendingEvent&);
145 void suspend();
146 void resumeLater();
147 void abort();
148
149 private:
150 void resumeNow(Timer<Resumer>*);
151
152 NewWebSocketChannelImpl* m_channel;
153 RefPtr<PendingEventProcessor> m_pendingEventProcessor;
154 Timer<Resumer> m_timer;
155 bool m_isAborted;
156 };
157
158 NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
159 : m_channel(channel)
160 , m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
161 , m_timer(this, &Resumer::resumeNow)
162 , m_isAborted(false) { }
163
164 NewWebSocketChannelImpl::Resumer::~Resumer()
165 {
166 abort();
167 }
168
169 void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
170 {
171 if (m_isAborted)
172 return;
173 m_pendingEventProcessor->append(event);
174 }
175
176 void NewWebSocketChannelImpl::Resumer::suspend()
177 {
178 if (m_isAborted)
179 return;
180 m_timer.stop();
181 m_channel->m_isSuspended = true;
182 }
183
184 void NewWebSocketChannelImpl::Resumer::resumeLater()
185 {
186 if (m_isAborted)
187 return;
188 if (!m_timer.isActive())
189 m_timer.startOneShot(0);
190 }
191
192 void NewWebSocketChannelImpl::Resumer::abort()
193 {
194 if (m_isAborted)
195 return;
196 m_isAborted = true;
197 m_timer.stop();
198 m_pendingEventProcessor->abort();
199 m_pendingEventProcessor = 0;
200 }
201
202 void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
203 {
204 ASSERT(!m_isAborted);
205 m_channel->m_isSuspended = false;
206
207 ASSERT(m_channel->m_client);
208 if (m_channel->m_handle) {
209 m_channel->sendInternal();
210 m_channel->flowControlIfNecessary();
tyoshino (SeeGerritForStatus) 2013/09/12 05:27:10 these two operations look unnecessary.
yhirano 2013/09/12 06:02:09 Done.
211 }
212 m_pendingEventProcessor->process(m_channel);
213 // |this| can be aborted here.
214 // |this| can be deleted here.
215 }
216
69 NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context , WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) 217 NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context , WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
70 : ContextLifecycleObserver(context) 218 : ContextLifecycleObserver(context)
71 , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle())) 219 , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
72 , m_client(client) 220 , m_client(client)
221 , m_resumer(adoptPtr(new Resumer(this)))
222 , m_isSuspended(false)
73 , m_sendingQuota(0) 223 , m_sendingQuota(0)
74 , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMa rk * 2) // initial quota 224 , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMa rk * 2) // initial quota
75 , m_bufferedAmount(0) 225 , m_bufferedAmount(0)
76 , m_sentSizeOfTopMessage(0) 226 , m_sentSizeOfTopMessage(0)
77 { 227 {
78 } 228 }
79 229
230 NewWebSocketChannelImpl::~NewWebSocketChannelImpl()
231 {
232 }
233
80 void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol) 234 void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol)
81 { 235 {
82 if (!m_handle) 236 if (!m_handle)
83 return; 237 return;
84 m_url = url; 238 m_url = url;
85 Vector<String> protocols; 239 Vector<String> protocols;
86 // Since protocol is already verified and escaped, we can simply split it. 240 // Since protocol is already verified and escaped, we can simply split it.
87 protocol.split(", ", true, protocols); 241 protocol.split(", ", true, protocols);
88 WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size()); 242 WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size());
89 for (size_t i = 0; i < protocols.size(); ++i) { 243 for (size_t i = 0; i < protocols.size(); ++i) {
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
135 { 289 {
136 ASSERT(m_handle); 290 ASSERT(m_handle);
137 m_handle->close(static_cast<unsigned short>(code), reason); 291 m_handle->close(static_cast<unsigned short>(code), reason);
138 } 292 }
139 293
140 void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con st String& sourceURL, unsigned lineNumber) 294 void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con st String& sourceURL, unsigned lineNumber)
141 { 295 {
142 // m_handle and m_client can be null here. 296 // m_handle and m_client can be null here.
143 if (m_client) 297 if (m_client)
144 m_client->didReceiveMessageError(); 298 m_client->didReceiveMessageError();
145 handleDidClose(CloseEventCodeAbnormalClosure, reason); 299 if (m_isSuspended) {
146 // handleDidClose may delete this object. 300 m_resumer->append(PendingEvent(CloseEventCodeAbnormalClosure, reason));
301 } else {
302 handleDidClose(CloseEventCodeAbnormalClosure, reason);
303 // handleDidClose may delete this object.
304 }
147 } 305 }
148 306
149 void NewWebSocketChannelImpl::disconnect() 307 void NewWebSocketChannelImpl::disconnect()
150 { 308 {
151 if (m_handle) 309 if (m_handle)
152 m_handle->close(CloseEventCodeAbnormalClosure, ""); 310 m_handle->close(CloseEventCodeAbnormalClosure, "");
153 m_handle.clear(); 311 m_handle.clear();
154 m_client = 0; 312 m_client = 0;
313 m_resumer->abort();
155 } 314 }
156 315
157 void NewWebSocketChannelImpl::suspend() 316 void NewWebSocketChannelImpl::suspend()
158 { 317 {
159 notImplemented(); 318 m_resumer->suspend();
160 } 319 }
161 320
162 void NewWebSocketChannelImpl::resume() 321 void NewWebSocketChannelImpl::resume()
163 { 322 {
164 notImplemented(); 323 m_resumer->resumeLater();
165 } 324 }
166 325
326 void NewWebSocketChannelImpl::handleDidConnect()
327 {
328 ASSERT(m_client);
329 ASSERT(!m_isSuspended);
330 m_client->didConnect();
331 }
332
333 void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
334 {
335 ASSERT(m_client);
336 ASSERT(!m_isSuspended);
337 ASSERT(messageData);
338 String message = "";
339 if (m_receivingMessageData.size() > 0) {
tyoshino (SeeGerritForStatus) 2013/09/12 05:27:10 wow. not an issue of this CL, but this should be m
yhirano 2013/09/12 06:02:09 You are right. By the way, I confirmed that String
340 message = String::fromUTF8(messageData->data(), messageData->size());
341 }
342 // For consistency with handleBinaryMessage, we clear |messageData|.
343 messageData->clear();
344 if (message.isNull()) {
345 failAsError("Could not decode a text frame as UTF-8.");
346 // failAsError may delete this object.
347 } else {
348 m_client->didReceiveMessage(message);
349 }
350 }
351
352 void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
353 {
354 ASSERT(m_client);
355 ASSERT(!m_isSuspended);
356 ASSERT(messageData);
357 OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
358 messageData->swap(*binaryData);
359 m_client->didReceiveBinaryData(binaryData.release());
360 }
361
362 void NewWebSocketChannelImpl::handleDidReceiveMessageError()
363 {
364 ASSERT(m_client);
365 ASSERT(!m_isSuspended);
366 m_client->didReceiveMessageError();
367 }
368
369 void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
370 {
371 ASSERT(!m_isSuspended);
372 m_handle.clear();
373 m_resumer->abort();
374 if (!m_client) {
375 return;
376 }
377 WebSocketChannelClient* client = m_client;
378 m_client = 0;
379 WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
380 isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSo cketChannelClient::ClosingHandshakeIncomplete;
381 client->didClose(m_bufferedAmount, status, code, reason);
382 // client->didClose may delete this object.
383 }
384
385
167 NewWebSocketChannelImpl::Message::Message(const String& text) 386 NewWebSocketChannelImpl::Message::Message(const String& text)
168 : type(MessageTypeText) 387 : type(MessageTypeText)
169 , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD )) { } 388 , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD )) { }
170 389
171 NewWebSocketChannelImpl::Message::Message(const Blob& blob) 390 NewWebSocketChannelImpl::Message::Message(const Blob& blob)
172 : type(MessageTypeBlob) 391 : type(MessageTypeBlob)
173 , blob(Blob::create(blob.url(), blob.type(), blob.size())) { } 392 , blob(Blob::create(blob.url(), blob.type(), blob.size())) { }
174 393
175 NewWebSocketChannelImpl::Message::Message(PassRefPtr<ArrayBuffer> arrayBuffer) 394 NewWebSocketChannelImpl::Message::Message(PassRefPtr<ArrayBuffer> arrayBuffer)
176 : type(MessageTypeArrayBuffer) 395 : type(MessageTypeArrayBuffer)
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 { 444 {
226 if (!m_handle || m_receivedDataSizeForFlowControl < receivedDataSizeForFlowC ontrolHighWaterMark) { 445 if (!m_handle || m_receivedDataSizeForFlowControl < receivedDataSizeForFlowC ontrolHighWaterMark) {
227 return; 446 return;
228 } 447 }
229 m_handle->flowControl(m_receivedDataSizeForFlowControl); 448 m_handle->flowControl(m_receivedDataSizeForFlowControl);
230 m_receivedDataSizeForFlowControl = 0; 449 m_receivedDataSizeForFlowControl = 0;
231 } 450 }
232 451
233 void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions) 452 void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
234 { 453 {
235 if (!m_handle) { 454 ASSERT(m_handle);
236 return;
237 }
238 ASSERT(handle == m_handle); 455 ASSERT(handle == m_handle);
239 ASSERT(m_client); 456 ASSERT(m_client);
240 if (!succeed) { 457 if (!succeed) {
241 failAsError("Cannot connect to " + m_url.string() + "."); 458 failAsError("Cannot connect to " + m_url.string() + ".");
242 // failAsError may delete this object. 459 // failAsError may delete this object.
243 return; 460 return;
244 } 461 }
245 m_subprotocol = selectedProtocol; 462 m_subprotocol = selectedProtocol;
246 m_extensions = extensions; 463 m_extensions = extensions;
247 m_client->didConnect(); 464 if (m_isSuspended)
465 m_resumer->append(PendingEvent(PendingEvent::DidConnectComplete));
466 else
467 m_client->didConnect();
248 } 468 }
249 469
250 void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH andle::MessageType type, const char* data, size_t size, bool fin) 470 void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH andle::MessageType type, const char* data, size_t size, bool fin)
251 { 471 {
252 if (!m_handle) { 472 ASSERT(m_handle);
253 return;
254 }
255 ASSERT(handle == m_handle); 473 ASSERT(handle == m_handle);
256 ASSERT(m_client); 474 ASSERT(m_client);
257 // Non-final frames cannot be empty. 475 // Non-final frames cannot be empty.
258 ASSERT(fin || size); 476 ASSERT(fin || size);
259 switch (type) { 477 switch (type) {
260 case WebSocketHandle::MessageTypeText: 478 case WebSocketHandle::MessageTypeText:
261 ASSERT(m_receivingMessageData.isEmpty()); 479 ASSERT(m_receivingMessageData.isEmpty());
262 m_receivingMessageTypeIsText = true; 480 m_receivingMessageTypeIsText = true;
263 break; 481 break;
264 case WebSocketHandle::MessageTypeBinary: 482 case WebSocketHandle::MessageTypeBinary:
265 ASSERT(m_receivingMessageData.isEmpty()); 483 ASSERT(m_receivingMessageData.isEmpty());
266 m_receivingMessageTypeIsText = false; 484 m_receivingMessageTypeIsText = false;
267 break; 485 break;
268 case WebSocketHandle::MessageTypeContinuation: 486 case WebSocketHandle::MessageTypeContinuation:
269 ASSERT(!m_receivingMessageData.isEmpty()); 487 ASSERT(!m_receivingMessageData.isEmpty());
270 break; 488 break;
271 } 489 }
272 m_receivingMessageData.append(data, size); 490 m_receivingMessageData.append(data, size);
273 m_receivedDataSizeForFlowControl += size; 491 m_receivedDataSizeForFlowControl += size;
274 flowControlIfNecessary(); 492 flowControlIfNecessary();
275 if (!fin) { 493 if (!fin) {
276 return; 494 return;
277 } 495 }
496 if (m_isSuspended) {
497 PendingEvent::Type type =
498 m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage;
499 m_resumer->append(PendingEvent(type, &m_receivingMessageData));
500 return;
501 }
278 Vector<char> messageData; 502 Vector<char> messageData;
279 messageData.swap(m_receivingMessageData); 503 messageData.swap(m_receivingMessageData);
280 if (m_receivingMessageTypeIsText) { 504 if (m_receivingMessageTypeIsText) {
281 handleTextMessage(&messageData); 505 handleTextMessage(&messageData);
282 // handleTextMessage may delete this object. 506 // handleTextMessage may delete this object.
283 } else { 507 } else {
284 handleBinaryMessage(&messageData); 508 handleBinaryMessage(&messageData);
285 } 509 }
286 } 510 }
287 511
288 512
289 void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short c ode, const WebKit::WebString& reason) 513 void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short c ode, const WebKit::WebString& reason)
290 { 514 {
515 ASSERT(m_handle);
516 m_handle.clear();
291 // FIXME: Maybe we should notify an error to m_client for some didClose mess ages. 517 // FIXME: Maybe we should notify an error to m_client for some didClose mess ages.
292 handleDidClose(code, reason); 518 if (m_isSuspended) {
293 // handleDidClose may delete this object. 519 m_resumer->append(PendingEvent(code, reason));
294 }
295
296 void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
297 {
298 ASSERT(m_client);
299 ASSERT(messageData);
300 String message = "";
301 if (m_receivingMessageData.size() > 0) {
302 message = String::fromUTF8(messageData->data(), messageData->size());
303 }
304 // For consistency with handleBinaryMessage, we clear |messageData|.
305 messageData->clear();
306 if (message.isNull()) {
307 failAsError("Could not decode a text frame as UTF-8.");
308 // failAsError may delete this object.
309 } else { 520 } else {
310 m_client->didReceiveMessage(message); 521 handleDidClose(code, reason);
522 // handleDidClose may delete this object.
311 } 523 }
312 } 524 }
313 525
314 void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
315 {
316 ASSERT(m_client);
317 ASSERT(messageData);
318 OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
319 messageData->swap(*binaryData);
320 m_client->didReceiveBinaryData(binaryData.release());
321 }
322
323 void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
324 {
325 m_handle.clear();
326 if (!m_client) {
327 return;
328 }
329 WebSocketChannelClient* client = m_client;
330 m_client = 0;
331 WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
332 isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSo cketChannelClient::ClosingHandshakeIncomplete;
333 client->didClose(m_bufferedAmount, status, code, reason);
334 // client->didClose may delete this object.
335 }
336
337 } // namespace WebCore 526 } // namespace WebCore
OLDNEW
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698