Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2124)

Unified Diff: Source/modules/websockets/NewWebSocketChannelImpl.cpp

Issue 23464050: Implement suspend / resume in NewWebSocketChannelImpl. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp
diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
index 7d2fb5c81b2d4f58eb11f8547b6972bb60e3a4df..39dee0bb579628fdc8d831883cf3d26994f658ee 100644
--- a/Source/modules/websockets/NewWebSocketChannelImpl.cpp
+++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
@@ -47,10 +47,10 @@
#include "public/platform/WebVector.h"
#include "weborigin/SecurityOrigin.h"
#include "wtf/ArrayBuffer.h"
+#include "wtf/Vector.h"
#include "wtf/text/WTFString.h"
// FIXME: We should implement Inspector notification.
-// FIXME: We should implement suspend / resume.
// FIXME: We should add log messages.
using WebKit::WebSocketHandle;
@@ -59,6 +59,76 @@ namespace WebCore {
namespace {
+struct PendingEvent {
+ enum Type {
+ DidConnectComplete,
+ DidReceiveTextMessage,
+ DidReceiveBinaryMessage,
+ DidReceiveError,
+ DidClose,
+ };
+ Type type;
+ Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage
+ int closingCode; // for DidClose
+ String closingReason; // for DidClose
+
+ explicit PendingEvent(Type type) : type(type), closingCode(0) { }
+ PendingEvent(Type, Vector<char>*);
+ PendingEvent(int code, const String& reason) : type(DidClose), closingCode(code), closingReason(reason) { }
+};
+
+class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
+public:
+ PendingEventProcessor() : m_isAborted(false) { }
+ virtual ~PendingEventProcessor() { }
+ void abort() { m_isAborted = true; }
+ void append(const PendingEvent& e) { m_events.append(e); }
+ void process(NewWebSocketChannelImpl*);
+
+private:
+ bool m_isAborted;
+ Vector<PendingEvent> m_events;
+};
+
+PendingEvent::PendingEvent(Type type, Vector<char>* data)
+ : type(type)
+ , closingCode(0)
+{
+ ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
+ message.swap(*data);
+}
+
+void PendingEventProcessor::process(NewWebSocketChannelImpl* channel)
+{
+ RefPtr<PendingEventProcessor> protect(this);
+ for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
+ PendingEvent& event = m_events[i];
+ switch (event.type) {
+ case PendingEvent::DidConnectComplete:
+ channel->handleDidConnect();
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveTextMessage:
+ channel->handleTextMessage(&event.message);
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveBinaryMessage:
+ channel->handleBinaryMessage(&event.message);
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveError:
+ channel->handleDidReceiveMessageError();
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidClose:
+ channel->handleDidClose(event.closingCode, event.closingReason);
+ // |this| can be detached here.
+ break;
+ }
+ }
+ m_events.clear();
+}
+
bool isClean(int code)
{
return code == WebSocketChannel::CloseEventCodeNormalClosure
@@ -112,10 +182,86 @@ void NewWebSocketChannelImpl::BlobLoader::didFail(FileError::ErrorCode errorCode
// |this| is deleted here.
}
+class NewWebSocketChannelImpl::Resumer {
+public:
+ explicit Resumer(NewWebSocketChannelImpl*);
+ ~Resumer();
+
+ void append(const PendingEvent&);
+ void suspend();
+ void resumeLater();
+ void abort();
+
+private:
+ void resumeNow(Timer<Resumer>*);
+
+ NewWebSocketChannelImpl* m_channel;
+ RefPtr<PendingEventProcessor> m_pendingEventProcessor;
+ Timer<Resumer> m_timer;
+ bool m_isAborted;
+};
+
+NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
+ : m_channel(channel)
+ , m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
+ , m_timer(this, &Resumer::resumeNow)
+ , m_isAborted(false) { }
+
+NewWebSocketChannelImpl::Resumer::~Resumer()
+{
+ abort();
+}
+
+void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
+{
+ if (m_isAborted)
+ return;
+ m_pendingEventProcessor->append(event);
+}
+
+void NewWebSocketChannelImpl::Resumer::suspend()
+{
+ if (m_isAborted)
+ return;
+ m_timer.stop();
+ m_channel->m_isSuspended = true;
+}
+
+void NewWebSocketChannelImpl::Resumer::resumeLater()
+{
+ if (m_isAborted)
+ return;
+ if (!m_timer.isActive())
+ m_timer.startOneShot(0);
+}
+
+void NewWebSocketChannelImpl::Resumer::abort()
+{
+ if (m_isAborted)
+ return;
+ m_isAborted = true;
+ m_timer.stop();
+ m_pendingEventProcessor->abort();
+ m_pendingEventProcessor = 0;
+}
+
+void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
+{
+ ASSERT(!m_isAborted);
+ m_channel->m_isSuspended = false;
+
+ ASSERT(m_channel->m_client);
+ m_pendingEventProcessor->process(m_channel);
+ // |this| can be aborted here.
+ // |this| can be deleted here.
+}
+
NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
: ContextLifecycleObserver(context)
, m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
, m_client(client)
+ , m_resumer(adoptPtr(new Resumer(this)))
+ , m_isSuspended(false)
, m_sendingQuota(0)
, m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota
, m_bufferedAmount(0)
@@ -193,8 +339,12 @@ void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con
// m_handle and m_client can be null here.
if (m_client)
m_client->didReceiveMessageError();
- handleDidClose(CloseEventCodeAbnormalClosure, reason);
- // handleDidClose may delete this object.
+ if (m_isSuspended) {
+ m_resumer->append(PendingEvent(CloseEventCodeAbnormalClosure, reason));
+ } else {
+ handleDidClose(CloseEventCodeAbnormalClosure, reason);
+ // handleDidClose may delete this object.
+ }
}
void NewWebSocketChannelImpl::disconnect()
@@ -208,14 +358,71 @@ void NewWebSocketChannelImpl::disconnect()
void NewWebSocketChannelImpl::suspend()
{
- notImplemented();
+ m_resumer->suspend();
}
void NewWebSocketChannelImpl::resume()
{
- notImplemented();
+ m_resumer->resumeLater();
+}
+
+void NewWebSocketChannelImpl::handleDidConnect()
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ m_client->didConnect();
+}
+
+void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ ASSERT(messageData);
+ String message = String::fromUTF8(messageData->data(), messageData->size());
+ // For consistency with handleBinaryMessage, we clear |messageData|.
+ messageData->clear();
+ if (message.isNull()) {
+ failAsError("Could not decode a text frame as UTF-8.");
+ // failAsError may delete this object.
+ } else {
+ m_client->didReceiveMessage(message);
+ }
+}
+
+void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ ASSERT(messageData);
+ OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
+ messageData->swap(*binaryData);
+ m_client->didReceiveBinaryData(binaryData.release());
+}
+
+void NewWebSocketChannelImpl::handleDidReceiveMessageError()
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ m_client->didReceiveMessageError();
+}
+
+void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
+{
+ ASSERT(!m_isSuspended);
+ m_handle.clear();
+ abortAsyncOperations();
+ if (!m_client) {
+ return;
+ }
+ WebSocketChannelClient* client = m_client;
+ m_client = 0;
+ WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
+ isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
+ client->didClose(m_bufferedAmount, status, code, reason);
+ // client->didClose may delete this object.
}
+
NewWebSocketChannelImpl::Message::Message(const String& text)
: type(MessageTypeText)
, text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD)) { }
@@ -286,13 +493,12 @@ void NewWebSocketChannelImpl::abortAsyncOperations()
m_blobLoader->cancel();
m_blobLoader.clear();
}
+ m_resumer->abort();
}
void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
{
- if (!m_handle) {
- return;
- }
+ ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
if (!succeed) {
@@ -302,14 +508,15 @@ void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed,
}
m_subprotocol = selectedProtocol;
m_extensions = extensions;
- m_client->didConnect();
+ if (m_isSuspended)
+ m_resumer->append(PendingEvent(PendingEvent::DidConnectComplete));
+ else
+ m_client->didConnect();
}
void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin)
{
- if (!m_handle) {
- return;
- }
+ ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
// Non-final frames cannot be empty.
@@ -333,6 +540,12 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
if (!fin) {
return;
}
+ if (m_isSuspended) {
+ PendingEvent::Type type =
+ m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage;
+ m_resumer->append(PendingEvent(type, &m_receivingMessageData));
+ return;
+ }
Vector<char> messageData;
messageData.swap(m_receivingMessageData);
if (m_receivingMessageTypeIsText) {
@@ -346,51 +559,15 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason)
{
+ ASSERT(m_handle);
+ m_handle.clear();
// FIXME: Maybe we should notify an error to m_client for some didClose messages.
- handleDidClose(code, reason);
- // handleDidClose may delete this object.
-}
-
-void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
-{
- ASSERT(m_client);
- ASSERT(messageData);
- String message = "";
- if (m_receivingMessageData.size() > 0) {
- message = String::fromUTF8(messageData->data(), messageData->size());
- }
- // For consistency with handleBinaryMessage, we clear |messageData|.
- messageData->clear();
- if (message.isNull()) {
- failAsError("Could not decode a text frame as UTF-8.");
- // failAsError may delete this object.
+ if (m_isSuspended) {
+ m_resumer->append(PendingEvent(code, reason));
} else {
- m_client->didReceiveMessage(message);
- }
-}
-
-void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
-{
- ASSERT(m_client);
- ASSERT(messageData);
- OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
- messageData->swap(*binaryData);
- m_client->didReceiveBinaryData(binaryData.release());
-}
-
-void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
-{
- m_handle.clear();
- abortAsyncOperations();
- if (!m_client) {
- return;
+ handleDidClose(code, reason);
+ // handleDidClose may delete this object.
}
- WebSocketChannelClient* client = m_client;
- m_client = 0;
- WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
- isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
- client->didClose(m_bufferedAmount, status, code, reason);
- // client->didClose may delete this object.
}
void NewWebSocketChannelImpl::didFinishLoadingBlob(PassRefPtr<ArrayBuffer> buffer)
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698