Chromium Code Reviews| Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| index abb783fe14bee2c2cece8d92ebf65ed578929217..4bc63f430a4fa4d1ca57d5d69e6ff67b13b0a2da 100644 |
| --- a/Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| +++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| @@ -44,11 +44,11 @@ |
| #include "public/platform/WebVector.h" |
| #include "weborigin/SecurityOrigin.h" |
| #include "wtf/ArrayBuffer.h" |
| +#include "wtf/Vector.h" |
| #include "wtf/text/WTFString.h" |
| // FIXME: We should implement Inspector notification. |
| // FIXME: We should implement send(Blob). |
| -// FIXME: We should implement suspend / resume. |
| // FIXME: We should add log messages. |
| using WebKit::WebSocketHandle; |
| @@ -66,10 +66,158 @@ bool isClean(int code) |
| } // namespace |
| +class NewWebSocketChannelImpl::Resumer { |
| +public: |
| + struct PendingEvent { |
|
abarth-chromium
2013/09/10 16:24:15
I would move this class out from inside Resumer in
yhirano
2013/09/12 01:35:49
Done.
|
| + enum Type { |
| + DidConnectComplete, |
| + DidReceiveTextMessage, |
| + DidReceiveBinaryMessage, |
| + DidReceiveError, |
| + DidClose, |
| + }; |
| + Type type; |
| + Vector<char> message; // for DidReceiveTextMessage / DidReceiveBinaryMessage |
| + int closingCode; // for DidClose |
| + String closingReason; // for DidClose |
| + |
| + PendingEvent(Type type) : type(type), closingCode(0) { } |
| + PendingEvent(Type, Vector<char>*); |
| + PendingEvent(int code, const String& reason) : type(DidClose), closingCode(code), closingReason(reason) { } |
| + }; |
| + |
| + Resumer(NewWebSocketChannelImpl*); |
|
abarth-chromium
2013/09/10 16:24:15
Please mark one-argument constructors explicit.
yhirano
2013/09/12 01:35:49
Done.
|
| + ~Resumer(); |
| + |
| + void append(const PendingEvent&); |
| + void suspend(); |
| + void resumeLater(); |
| + void abort(); |
| + |
| +private: |
| + class PendingEventProcessor : public RefCounted<PendingEventProcessor> { |
|
abarth-chromium
2013/09/10 16:24:15
I'd also move this class into the anonymous namesp
yhirano
2013/09/12 01:35:49
Done.
|
| + public: |
| + PendingEventProcessor() : m_isAborted(false) { } |
| + virtual ~PendingEventProcessor() { } |
| + void abort() { m_isAborted = true; } |
| + void append(const PendingEvent& e) { m_events.append(e); } |
| + void process(NewWebSocketChannelImpl*); |
| + |
| + private: |
| + bool m_isAborted; |
| + Vector<PendingEvent> m_events; |
| + }; |
| + |
| + void resumeNow(Timer<Resumer>*); |
| + |
| + NewWebSocketChannelImpl* m_channel; |
| + RefPtr<PendingEventProcessor> m_pendingEventProcessor; |
| + Timer<Resumer> m_timer; |
| + bool m_isAborted; |
| +}; |
| + |
| +NewWebSocketChannelImpl::Resumer::PendingEvent::PendingEvent(Type type, Vector<char>* data) : type(type), closingCode(0) |
|
abarth-chromium
2013/09/10 16:24:15
These initializers go on separate lines.
yhirano
2013/09/12 01:35:49
Done.
|
| +{ |
| + ASSERT(type == DidReceiveTextMessage || type == DidReceiveBinaryMessage); |
| + message.swap(*data); |
| +} |
| + |
| +NewWebSocketChannelImpl::Resumer::Resumer(NewWebSocketChannelImpl* channel) |
| + : m_channel(channel) |
| + , m_pendingEventProcessor(adoptRef(new PendingEventProcessor)) |
| + , m_timer(this, &Resumer::resumeNow) |
| + , m_isAborted(false) { } |
| + |
| +NewWebSocketChannelImpl::Resumer::~Resumer() |
| +{ |
| + abort(); |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::append(const PendingEvent& event) |
| +{ |
| + if (m_isAborted) |
| + return; |
| + m_pendingEventProcessor->append(event); |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::suspend() |
| +{ |
| + if (m_isAborted) |
| + return; |
| + m_timer.stop(); |
| + m_channel->m_isSuspended = true; |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::resumeLater() |
| +{ |
| + if (m_isAborted) |
| + return; |
| + if (!m_timer.isActive()) |
| + m_timer.startOneShot(0); |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::abort() |
| +{ |
| + if (m_isAborted) |
| + return; |
| + m_isAborted = true; |
| + m_timer.stop(); |
| + m_pendingEventProcessor->abort(); |
| + m_pendingEventProcessor = 0; |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::resumeNow(Timer<Resumer>*) |
| +{ |
| + ASSERT(!m_isAborted); |
| + m_channel->m_isSuspended = false; |
| + |
| + ASSERT(m_channel->m_client); |
| + if (m_channel->m_handle) { |
| + m_channel->sendInternal(); |
| + m_channel->flowControlIfNecessary(); |
| + } |
| + m_pendingEventProcessor->process(m_channel); |
| + // |this| can be aborted here. |
| + // |this| can be deleted here. |
| +} |
| + |
| +void NewWebSocketChannelImpl::Resumer::PendingEventProcessor::process(NewWebSocketChannelImpl* channel) |
| +{ |
| + RefPtr<PendingEventProcessor> protect(this); |
| + for (size_t i = 0; i < m_events.size() && !m_isAborted; ++i) { |
| + PendingEvent& event = m_events[i]; |
| + switch (event.type) { |
| + case PendingEvent::DidConnectComplete: |
| + channel->handleDidConnect(); |
| + // |this| can be detached here. |
| + break; |
| + case PendingEvent::DidReceiveTextMessage: |
| + channel->handleTextMessage(&event.message); |
| + // |this| can be detached here. |
| + break; |
| + case PendingEvent::DidReceiveBinaryMessage: |
| + channel->handleBinaryMessage(&event.message); |
| + // |this| can be detached here. |
| + break; |
| + case PendingEvent::DidReceiveError: |
| + channel->handleDidReceiveMessageError(); |
| + // |this| can be detached here. |
| + break; |
| + case PendingEvent::DidClose: |
| + channel->handleDidClose(event.closingCode, event.closingReason); |
| + // |this| can be detached here. |
| + break; |
| + } |
| + } |
| + m_events.clear(); |
| +} |
| + |
| NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) |
| : ContextLifecycleObserver(context) |
| , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle())) |
| , m_client(client) |
| + , m_resumer(adoptPtr(new Resumer(this))) |
| + , m_isSuspended(false) |
| , m_sendingQuota(0) |
| , m_receivedDataSizeForFlowControl(receivedDataSizeForFlowControlHighWaterMark * 2) // initial quota |
| , m_bufferedAmount(0) |
| @@ -77,6 +225,10 @@ NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context |
| { |
| } |
| +NewWebSocketChannelImpl::~NewWebSocketChannelImpl() |
| +{ |
| +} |
| + |
| void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol) |
| { |
| if (!m_handle) |
| @@ -142,8 +294,12 @@ void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, con |
| // m_handle and m_client can be null here. |
| if (m_client) |
| m_client->didReceiveMessageError(); |
| - handleDidClose(CloseEventCodeAbnormalClosure, reason); |
| - // handleDidClose may delete this object. |
| + if (m_isSuspended) { |
| + m_resumer->append(Resumer::PendingEvent(CloseEventCodeAbnormalClosure, reason)); |
| + } else { |
| + handleDidClose(CloseEventCodeAbnormalClosure, reason); |
| + // handleDidClose may delete this object. |
| + } |
| } |
| void NewWebSocketChannelImpl::disconnect() |
| @@ -152,16 +308,17 @@ void NewWebSocketChannelImpl::disconnect() |
| m_handle->close(CloseEventCodeAbnormalClosure, ""); |
| m_handle.clear(); |
| m_client = 0; |
| + m_resumer->abort(); |
| } |
| void NewWebSocketChannelImpl::suspend() |
| { |
| - notImplemented(); |
| + m_resumer->suspend(); |
| } |
| void NewWebSocketChannelImpl::resume() |
| { |
| - notImplemented(); |
| + m_resumer->resumeLater(); |
| } |
| NewWebSocketChannelImpl::Message::Message(const String& text) |
| @@ -232,9 +389,7 @@ void NewWebSocketChannelImpl::flowControlIfNecessary() |
| void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions) |
| { |
| - if (!m_handle) { |
| - return; |
| - } |
| + ASSERT(m_handle); |
| ASSERT(handle == m_handle); |
| ASSERT(m_client); |
| if (!succeed) { |
| @@ -244,14 +399,15 @@ void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, |
| } |
| m_subprotocol = selectedProtocol; |
| m_extensions = extensions; |
| - m_client->didConnect(); |
| + if (m_isSuspended) |
| + m_resumer->append(Resumer::PendingEvent(Resumer::PendingEvent::DidConnectComplete)); |
| + else |
| + m_client->didConnect(); |
| } |
| void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin) |
| { |
| - if (!m_handle) { |
| - return; |
| - } |
| + ASSERT(m_handle); |
| ASSERT(handle == m_handle); |
| ASSERT(m_client); |
| // Non-final frames cannot be empty. |
| @@ -275,6 +431,12 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH |
| if (!fin) { |
| return; |
| } |
| + if (m_isSuspended) { |
| + Resumer::PendingEvent::Type type = |
| + m_receivingMessageTypeIsText ? Resumer::PendingEvent::DidReceiveTextMessage : Resumer::PendingEvent::DidReceiveBinaryMessage; |
| + m_resumer->append(Resumer::PendingEvent(type, &m_receivingMessageData)); |
| + return; |
| + } |
| Vector<char> messageData; |
| messageData.swap(m_receivingMessageData); |
| if (m_receivingMessageTypeIsText) { |
| @@ -288,14 +450,28 @@ void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketH |
| void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason) |
| { |
| + ASSERT(m_handle); |
| + m_handle.clear(); |
| // FIXME: Maybe we should notify an error to m_client for some didClose messages. |
| - handleDidClose(code, reason); |
| - // handleDidClose may delete this object. |
| + if (m_isSuspended) { |
| + m_resumer->append(Resumer::PendingEvent(code, reason)); |
| + } else { |
| + handleDidClose(code, reason); |
| + // handleDidClose may delete this object. |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::handleDidConnect() |
| +{ |
| + ASSERT(m_client); |
| + ASSERT(!m_isSuspended); |
| + m_client->didConnect(); |
| } |
| void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData) |
| { |
| ASSERT(m_client); |
| + ASSERT(!m_isSuspended); |
| ASSERT(messageData); |
| String message = ""; |
| if (m_receivingMessageData.size() > 0) { |
| @@ -314,15 +490,25 @@ void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData) |
| void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData) |
| { |
| ASSERT(m_client); |
| + ASSERT(!m_isSuspended); |
| ASSERT(messageData); |
| OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>); |
| messageData->swap(*binaryData); |
| m_client->didReceiveBinaryData(binaryData.release()); |
| } |
| +void NewWebSocketChannelImpl::handleDidReceiveMessageError() |
| +{ |
| + ASSERT(m_client); |
| + ASSERT(!m_isSuspended); |
| + m_client->didReceiveMessageError(); |
| +} |
| + |
| void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason) |
| { |
| + ASSERT(!m_isSuspended); |
| m_handle.clear(); |
| + m_resumer->abort(); |
| if (!m_client) { |
| return; |
| } |