Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2228)

Unified Diff: Source/modules/websockets/NewWebSocketChannelImpl.cpp

Issue 23464050: Implement suspend / resume in NewWebSocketChannelImpl. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp
diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
index abb783fe14bee2c2cece8d92ebf65ed578929217..4bc63f430a4fa4d1ca57d5d69e6ff67b13b0a2da 100644
--- a/Source/modules/websockets/NewWebSocketChannelImpl.cpp
+++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
@@ -44,11 +44,11 @@
#include "public/platform/WebVector.h"
#include "weborigin/SecurityOrigin.h"
#include "wtf/ArrayBuffer.h"
+#include "wtf/Vector.h"
#include "wtf/text/WTFString.h"
// FIXME: We should implement Inspector notification.
// FIXME: We should implement send(Blob).
-// FIXME: We should implement suspend / resume.
// FIXME: We should add log messages.
using WebKit::WebSocketHandle;
@@ -66,10 +66,158 @@ bool isClean(int code)
} // namespace
+class NewWebSocketChannelImpl::Resumer {
+public:
+ struct PendingEvent {
abarth-chromium 2013/09/10 16:24:15 I would move this class out from inside Resumer in
yhirano 2013/09/12 01:35:49 Done.
+ enum Type {
+ DidConnectComplete,
+ DidReceiveTextMessage,
+ DidReceiveBinaryMessage,
+ DidReceiveError,
+ DidClose,
+ };
+ Type type;
+ Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage
+ int closingCode; // for DidClose
+ String closingReason; // for DidClose
+
+ PendingEvent(Type type) : type(type), closingCode(0) { }
+ PendingEvent(Type, Vector<char>*);
+ PendingEvent(int code, const String& reason) : type(DidClose), closingCode(code), closingReason(reason) { }
+ };
+
+ Resumer(NewWebSocketChannelImpl*);
abarth-chromium 2013/09/10 16:24:15 Please mark one-argument constructors explicit.
yhirano 2013/09/12 01:35:49 Done.
+ ~Resumer();
+
+ void append(const PendingEvent&);
+ void suspend();
+ void resumeLater();
+ void abort();
+
+private:
+ class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
abarth-chromium 2013/09/10 16:24:15 I'd also move this class into the anonymous namesp
yhirano 2013/09/12 01:35:49 Done.
+ public:
+ PendingEventProcessor() : m_isAborted(false) { }
+ virtual ~PendingEventProcessor() { }
+ void abort() { m_isAborted = true; }
+ void append(const PendingEvent& e) { m_events.append(e); }
+ void process(NewWebSocketChannelImpl*);
+
+ private:
+ bool m_isAborted;
+ Vector<PendingEvent> m_events;
+ };
+
+ void resumeNow(Timer<Resumer>*);
+
+ NewWebSocketChannelImpl* m_channel;
+ RefPtr<PendingEventProcessor> m_pendingEventProcessor;
+ Timer<Resumer> m_timer;
+ bool m_isAborted;
+};
+
+NewWebSocketChannelImpl::Resumer::PendingEvent::PendingEvent(Type type, Vector<char>* data) : type(type), closingCode(0)
abarth-chromium 2013/09/10 16:24:15 These initializers go on separate lines.
yhirano 2013/09/12 01:35:49 Done.
+{
+ ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
+ message.swap(*data);
+}
+
+NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
+ : m_channel(channel)
+ , m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
+ , m_timer(this, &Resumer::resumeNow)
+ , m_isAborted(false) { }
+
+NewWebSocketChannelImpl::Resumer::~Resumer()
+{
+ abort();
+}
+
+void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
+{
+ if (m_isAborted)
+ return;
+ m_pendingEventProcessor->append(event);
+}
+
+void NewWebSocketChannelImpl::Resumer::suspend()
+{
+ if (m_isAborted)
+ return;
+ m_timer.stop();
+ m_channel->m_isSuspended = true;
+}
+
+void NewWebSocketChannelImpl::Resumer::resumeLater()
+{
+ if (m_isAborted)
+ return;
+ if (!m_timer.isActive())
+ m_timer.startOneShot(0);
+}
+
+void NewWebSocketChannelImpl::Resumer::abort()
+{
+ if (m_isAborted)
+ return;
+ m_isAborted = true;
+ m_timer.stop();
+ m_pendingEventProcessor->abort();
+ m_pendingEventProcessor = 0;
+}
+
+void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
+{
+ ASSERT(!m_isAborted);
+ m_channel->m_isSuspended = false;
+
+ ASSERT(m_channel->m_client);
+ if (m_channel->m_handle) {
+ m_channel->sendInternal();
+ m_channel->flowControlIfNecessary();
+ }
+ m_pendingEventProcessor->process(m_channel);
+ // |this| can be aborted here.
+ // |this| can be deleted here.
+}
+
+void NewWebSocketChannelImpl::Resumer::PendingEventProcessor::process(NewWebSocketChannelImpl* channel)
+{
+ RefPtr<PendingEventProcessor> protect(this);
+ for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
+ PendingEvent& event = m_events[i];
+ switch (event.type) {
+ case PendingEvent::DidConnectComplete:
+ channel->handleDidConnect();
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveTextMessage:
+ channel->handleTextMessage(&event.message);
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveBinaryMessage:
+ channel->handleBinaryMessage(&event.message);
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidReceiveError:
+ channel->handleDidReceiveMessageError();
+ // |this| can be detached here.
+ break;
+ case PendingEvent::DidClose:
+ channel->handleDidClose(event.closingCode, event.closingReason);
+ // |this| can be detached here.
+ break;
+ }
+ }
+ m_events.clear();
+}
+
NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
: ContextLifecycleObserver(context)
, m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
, m_client(client)
+ , m_resumer(adoptPtr(new Resumer(this)))
+ , m_isSuspended(false)
, m_sendingQuota(0)
, m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota
, m_bufferedAmount(0)
@@ -77,6 +225,10 @@ NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context
{
}
+NewWebSocketChannelImpl::~NewWebSocketChannelImpl()
+{
+}
+
void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol)
{
if (!m_handle)
@@ -142,8 +294,12 @@ void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con
// m_handle and m_client can be null here.
if (m_client)
m_client->didReceiveMessageError();
- handleDidClose(CloseEventCodeAbnormalClosure, reason);
- // handleDidClose may delete this object.
+ if (m_isSuspended) {
+ m_resumer->append(Resumer::PendingEvent(CloseEventCodeAbnormalClosure, reason));
+ } else {
+ handleDidClose(CloseEventCodeAbnormalClosure, reason);
+ // handleDidClose may delete this object.
+ }
}
void NewWebSocketChannelImpl::disconnect()
@@ -152,16 +308,17 @@ void NewWebSocketChannelImpl::disconnect()
m_handle->close(CloseEventCodeAbnormalClosure, "");
m_handle.clear();
m_client = 0;
+ m_resumer->abort();
}
void NewWebSocketChannelImpl::suspend()
{
- notImplemented();
+ m_resumer->suspend();
}
void NewWebSocketChannelImpl::resume()
{
- notImplemented();
+ m_resumer->resumeLater();
}
NewWebSocketChannelImpl::Message::Message(const String& text)
@@ -232,9 +389,7 @@ void NewWebSocketChannelImpl::flowControlIfNecessary()
void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
{
- if (!m_handle) {
- return;
- }
+ ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
if (!succeed) {
@@ -244,14 +399,15 @@ void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed,
}
m_subprotocol = selectedProtocol;
m_extensions = extensions;
- m_client->didConnect();
+ if (m_isSuspended)
+ m_resumer->append(Resumer::PendingEvent(Resumer::PendingEvent::DidConnectComplete));
+ else
+ m_client->didConnect();
}
void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin)
{
- if (!m_handle) {
- return;
- }
+ ASSERT(m_handle);
ASSERT(handle == m_handle);
ASSERT(m_client);
// Non-final frames cannot be empty.
@@ -275,6 +431,12 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
if (!fin) {
return;
}
+ if (m_isSuspended) {
+ Resumer::PendingEvent::Type type =
+ m_receivingMessageTypeIsText ? Resumer::PendingEvent::DidReceiveTextMessage : Resumer::PendingEvent::DidReceiveBinaryMessage;
+ m_resumer->append(Resumer::PendingEvent(type, &m_receivingMessageData));
+ return;
+ }
Vector<char> messageData;
messageData.swap(m_receivingMessageData);
if (m_receivingMessageTypeIsText) {
@@ -288,14 +450,28 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason)
{
+ ASSERT(m_handle);
+ m_handle.clear();
// FIXME: Maybe we should notify an error to m_client for some didClose messages.
- handleDidClose(code, reason);
- // handleDidClose may delete this object.
+ if (m_isSuspended) {
+ m_resumer->append(Resumer::PendingEvent(code, reason));
+ } else {
+ handleDidClose(code, reason);
+ // handleDidClose may delete this object.
+ }
+}
+
+void NewWebSocketChannelImpl::handleDidConnect()
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ m_client->didConnect();
}
void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
{
ASSERT(m_client);
+ ASSERT(!m_isSuspended);
ASSERT(messageData);
String message = "";
if (m_receivingMessageData.size() > 0) {
@@ -314,15 +490,25 @@ void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
{
ASSERT(m_client);
+ ASSERT(!m_isSuspended);
ASSERT(messageData);
OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
messageData->swap(*binaryData);
m_client->didReceiveBinaryData(binaryData.release());
}
+void NewWebSocketChannelImpl::handleDidReceiveMessageError()
+{
+ ASSERT(m_client);
+ ASSERT(!m_isSuspended);
+ m_client->didReceiveMessageError();
+}
+
void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
{
+ ASSERT(!m_isSuspended);
m_handle.clear();
+ m_resumer->abort();
if (!m_client) {
return;
}
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698