| Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp
|
| diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
|
| index 7d2fb5c81b2d4f58eb11f8547b6972bb60e3a4df..39dee0bb579628fdc8d831883cf3d26994f658ee 100644
|
| --- a/Source/modules/websockets/NewWebSocketChannelImpl.cpp
|
| +++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
|
| @@ -47,10 +47,10 @@
|
| #include "public/platform/WebVector.h"
|
| #include "weborigin/SecurityOrigin.h"
|
| #include "wtf/ArrayBuffer.h"
|
| +#include "wtf/Vector.h"
|
| #include "wtf/text/WTFString.h"
|
|
|
| // FIXME: We should implement Inspector notification.
|
| -// FIXME: We should implement suspend / resume.
|
| // FIXME: We should add log messages.
|
|
|
| using WebKit::WebSocketHandle;
|
| @@ -59,6 +59,76 @@ namespace WebCore {
|
|
|
| namespace {
|
|
|
| +struct PendingEvent {
|
| + enum Type {
|
| + DidConnectComplete,
|
| + DidReceiveTextMessage,
|
| + DidReceiveBinaryMessage,
|
| + DidReceiveError,
|
| + DidClose,
|
| + };
|
| + Type type;
|
| + Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage
|
| + int closingCode; // for DidClose
|
| + String closingReason; // for DidClose
|
| +
|
| + explicit PendingEvent(Type type) : type(type), closingCode(0) { }
|
| + PendingEvent(Type, Vector<char>*);
|
| + PendingEvent(int code, const String& reason) : type(DidClose), closingCode(code), closingReason(reason) { }
|
| +};
|
| +
|
| +class PendingEventProcessor : public RefCounted<PendingEventProcessor> {
|
| +public:
|
| + PendingEventProcessor() : m_isAborted(false) { }
|
| + virtual ~PendingEventProcessor() { }
|
| + void abort() { m_isAborted = true; }
|
| + void append(const PendingEvent& e) { m_events.append(e); }
|
| + void process(NewWebSocketChannelImpl*);
|
| +
|
| +private:
|
| + bool m_isAborted;
|
| + Vector<PendingEvent> m_events;
|
| +};
|
| +
|
| +PendingEvent::PendingEvent(Type type, Vector<char>* data)
|
| + : type(type)
|
| + , closingCode(0)
|
| +{
|
| + ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage);
|
| + message.swap(*data);
|
| +}
|
| +
|
| +void PendingEventProcessor::process(NewWebSocketChannelImpl* channel)
|
| +{
|
| + RefPtr<PendingEventProcessor> protect(this);
|
| + for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) {
|
| + PendingEvent& event = m_events[i];
|
| + switch (event.type) {
|
| + case PendingEvent::DidConnectComplete:
|
| + channel->handleDidConnect();
|
| + // |this| can be detached here.
|
| + break;
|
| + case PendingEvent::DidReceiveTextMessage:
|
| + channel->handleTextMessage(&event.message);
|
| + // |this| can be detached here.
|
| + break;
|
| + case PendingEvent::DidReceiveBinaryMessage:
|
| + channel->handleBinaryMessage(&event.message);
|
| + // |this| can be detached here.
|
| + break;
|
| + case PendingEvent::DidReceiveError:
|
| + channel->handleDidReceiveMessageError();
|
| + // |this| can be detached here.
|
| + break;
|
| + case PendingEvent::DidClose:
|
| + channel->handleDidClose(event.closingCode, event.closingReason);
|
| + // |this| can be detached here.
|
| + break;
|
| + }
|
| + }
|
| + m_events.clear();
|
| +}
|
| +
|
| bool isClean(int code)
|
| {
|
| return code == WebSocketChannel::CloseEventCodeNormalClosure
|
| @@ -112,10 +182,86 @@ void NewWebSocketChannelImpl::BlobLoader::didFail(FileError::ErrorCode errorCode
|
| // |this| is deleted here.
|
| }
|
|
|
| +class NewWebSocketChannelImpl::Resumer {
|
| +public:
|
| + explicit Resumer(NewWebSocketChannelImpl*);
|
| + ~Resumer();
|
| +
|
| + void append(const PendingEvent&);
|
| + void suspend();
|
| + void resumeLater();
|
| + void abort();
|
| +
|
| +private:
|
| + void resumeNow(Timer<Resumer>*);
|
| +
|
| + NewWebSocketChannelImpl* m_channel;
|
| + RefPtr<PendingEventProcessor> m_pendingEventProcessor;
|
| + Timer<Resumer> m_timer;
|
| + bool m_isAborted;
|
| +};
|
| +
|
| +NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel)
|
| + : m_channel(channel)
|
| + , m_pendingEventProcessor(adoptRef(new PendingEventProcessor))
|
| + , m_timer(this, &Resumer::resumeNow)
|
| + , m_isAborted(false) { }
|
| +
|
| +NewWebSocketChannelImpl::Resumer::~Resumer()
|
| +{
|
| + abort();
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event)
|
| +{
|
| + if (m_isAborted)
|
| + return;
|
| + m_pendingEventProcessor->append(event);
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::Resumer::suspend()
|
| +{
|
| + if (m_isAborted)
|
| + return;
|
| + m_timer.stop();
|
| + m_channel->m_isSuspended = true;
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::Resumer::resumeLater()
|
| +{
|
| + if (m_isAborted)
|
| + return;
|
| + if (!m_timer.isActive())
|
| + m_timer.startOneShot(0);
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::Resumer::abort()
|
| +{
|
| + if (m_isAborted)
|
| + return;
|
| + m_isAborted = true;
|
| + m_timer.stop();
|
| + m_pendingEventProcessor->abort();
|
| + m_pendingEventProcessor = 0;
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*)
|
| +{
|
| + ASSERT(!m_isAborted);
|
| + m_channel->m_isSuspended = false;
|
| +
|
| + ASSERT(m_channel->m_client);
|
| + m_pendingEventProcessor->process(m_channel);
|
| + // |this| can be aborted here.
|
| + // |this| can be deleted here.
|
| +}
|
| +
|
| NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
|
| : ContextLifecycleObserver(context)
|
| , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
|
| , m_client(client)
|
| + , m_resumer(adoptPtr(new Resumer(this)))
|
| + , m_isSuspended(false)
|
| , m_sendingQuota(0)
|
| , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota
|
| , m_bufferedAmount(0)
|
| @@ -193,8 +339,12 @@ void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con
|
| // m_handle and m_client can be null here.
|
| if (m_client)
|
| m_client->didReceiveMessageError();
|
| - handleDidClose(CloseEventCodeAbnormalClosure, reason);
|
| - // handleDidClose may delete this object.
|
| + if (m_isSuspended) {
|
| + m_resumer->append(PendingEvent(CloseEventCodeAbnormalClosure, reason));
|
| + } else {
|
| + handleDidClose(CloseEventCodeAbnormalClosure, reason);
|
| + // handleDidClose may delete this object.
|
| + }
|
| }
|
|
|
| void NewWebSocketChannelImpl::disconnect()
|
| @@ -208,14 +358,71 @@ void NewWebSocketChannelImpl::disconnect()
|
|
|
| void NewWebSocketChannelImpl::suspend()
|
| {
|
| - notImplemented();
|
| + m_resumer->suspend();
|
| }
|
|
|
| void NewWebSocketChannelImpl::resume()
|
| {
|
| - notImplemented();
|
| + m_resumer->resumeLater();
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::handleDidConnect()
|
| +{
|
| + ASSERT(m_client);
|
| + ASSERT(!m_isSuspended);
|
| + m_client->didConnect();
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
|
| +{
|
| + ASSERT(m_client);
|
| + ASSERT(!m_isSuspended);
|
| + ASSERT(messageData);
|
| + String message = String::fromUTF8(messageData->data(), messageData->size());
|
| + // For consistency with handleBinaryMessage, we clear |messageData|.
|
| + messageData->clear();
|
| + if (message.isNull()) {
|
| + failAsError("Could not decode a text frame as UTF-8.");
|
| + // failAsError may delete this object.
|
| + } else {
|
| + m_client->didReceiveMessage(message);
|
| + }
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
|
| +{
|
| + ASSERT(m_client);
|
| + ASSERT(!m_isSuspended);
|
| + ASSERT(messageData);
|
| + OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
|
| + messageData->swap(*binaryData);
|
| + m_client->didReceiveBinaryData(binaryData.release());
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::handleDidReceiveMessageError()
|
| +{
|
| + ASSERT(m_client);
|
| + ASSERT(!m_isSuspended);
|
| + m_client->didReceiveMessageError();
|
| +}
|
| +
|
| +void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
|
| +{
|
| + ASSERT(!m_isSuspended);
|
| + m_handle.clear();
|
| + abortAsyncOperations();
|
| + if (!m_client) {
|
| + return;
|
| + }
|
| + WebSocketChannelClient* client = m_client;
|
| + m_client = 0;
|
| + WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
|
| + isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
|
| + client->didClose(m_bufferedAmount, status, code, reason);
|
| + // client->didClose may delete this object.
|
| }
|
|
|
| +
|
| NewWebSocketChannelImpl::Message::Message(const String& text)
|
| : type(MessageTypeText)
|
| , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD)) { }
|
| @@ -286,13 +493,12 @@ void NewWebSocketChannelImpl::abortAsyncOperations()
|
| m_blobLoader->cancel();
|
| m_blobLoader.clear();
|
| }
|
| + m_resumer->abort();
|
| }
|
|
|
| void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
|
| {
|
| - if (!m_handle) {
|
| - return;
|
| - }
|
| + ASSERT(m_handle);
|
| ASSERT(handle == m_handle);
|
| ASSERT(m_client);
|
| if (!succeed) {
|
| @@ -302,14 +508,15 @@ void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed,
|
| }
|
| m_subprotocol = selectedProtocol;
|
| m_extensions = extensions;
|
| - m_client->didConnect();
|
| + if (m_isSuspended)
|
| + m_resumer->append(PendingEvent(PendingEvent::DidConnectComplete));
|
| + else
|
| + m_client->didConnect();
|
| }
|
|
|
| void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin)
|
| {
|
| - if (!m_handle) {
|
| - return;
|
| - }
|
| + ASSERT(m_handle);
|
| ASSERT(handle == m_handle);
|
| ASSERT(m_client);
|
| // Non-final frames cannot be empty.
|
| @@ -333,6 +540,12 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
|
| if (!fin) {
|
| return;
|
| }
|
| + if (m_isSuspended) {
|
| + PendingEvent::Type type =
|
| + m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage;
|
| + m_resumer->append(PendingEvent(type, &m_receivingMessageData));
|
| + return;
|
| + }
|
| Vector<char> messageData;
|
| messageData.swap(m_receivingMessageData);
|
| if (m_receivingMessageTypeIsText) {
|
| @@ -346,51 +559,15 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH
|
|
|
| void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason)
|
| {
|
| + ASSERT(m_handle);
|
| + m_handle.clear();
|
| // FIXME: Maybe we should notify an error to m_client for some didClose messages.
|
| - handleDidClose(code, reason);
|
| - // handleDidClose may delete this object.
|
| -}
|
| -
|
| -void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
|
| -{
|
| - ASSERT(m_client);
|
| - ASSERT(messageData);
|
| - String message = "";
|
| - if (m_receivingMessageData.size() > 0) {
|
| - message = String::fromUTF8(messageData->data(), messageData->size());
|
| - }
|
| - // For consistency with handleBinaryMessage, we clear |messageData|.
|
| - messageData->clear();
|
| - if (message.isNull()) {
|
| - failAsError("Could not decode a text frame as UTF-8.");
|
| - // failAsError may delete this object.
|
| + if (m_isSuspended) {
|
| + m_resumer->append(PendingEvent(code, reason));
|
| } else {
|
| - m_client->didReceiveMessage(message);
|
| - }
|
| -}
|
| -
|
| -void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
|
| -{
|
| - ASSERT(m_client);
|
| - ASSERT(messageData);
|
| - OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
|
| - messageData->swap(*binaryData);
|
| - m_client->didReceiveBinaryData(binaryData.release());
|
| -}
|
| -
|
| -void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
|
| -{
|
| - m_handle.clear();
|
| - abortAsyncOperations();
|
| - if (!m_client) {
|
| - return;
|
| + handleDidClose(code, reason);
|
| + // handleDidClose may delete this object.
|
| }
|
| - WebSocketChannelClient* client = m_client;
|
| - m_client = 0;
|
| - WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
|
| - isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
|
| - client->didClose(m_bufferedAmount, status, code, reason);
|
| - // client->didClose may delete this object.
|
| }
|
|
|
| void NewWebSocketChannelImpl::didFinishLoadingBlob(PassRefPtr<ArrayBuffer> buffer)
|
|
|