Chromium Code Reviews| Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..afb72c4a5fff3e35d165319d33d1b433637f29fa |
| --- /dev/null |
| +++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp |
| @@ -0,0 +1,508 @@ |
| +/* |
| + * Copyright (C) 2013 Google Inc. All rights reserved. |
| + * |
| + * Redistribution and use in source and binary forms, with or without |
| + * modification, are permitted provided that the following conditions are |
| + * met: |
| + * |
| + * * Redistributions of source code must retain the above copyright |
| + * notice, this list of conditions and the following disclaimer. |
| + * * Redistributions in binary form must reproduce the above |
| + * copyright notice, this list of conditions and the following disclaimer |
| + * in the documentation and/or other materials provided with the |
| + * distribution. |
| + * * Neither the name of Google Inc. nor the names of its |
| + * contributors may be used to endorse or promote products derived from |
| + * this software without specific prior written permission. |
| + * |
| + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| + */ |
| + |
| +#include "config.h" |
| +#include "modules/websockets/NewWebSocketChannelImpl.h" |
| + |
| +#include "core/dom/ScriptExecutionContext.h" |
| +#include "core/fileapi/Blob.h" |
| +#include "core/fileapi/FileReaderLoader.h" |
| +#include "core/inspector/InspectorInstrumentation.h" |
| +#include "core/loader/UniqueIdentifier.h" |
| +#include "core/platform/Logging.h" |
| +#include "modules/websockets/WebSocketChannel.h" |
| +#include "modules/websockets/WebSocketChannelClient.h" |
| +#include "public/platform/Platform.h" |
| +#include "public/platform/WebData.h" |
| +#include "public/platform/WebSocketHandle.h" |
| +#include "public/platform/WebString.h" |
| +#include "public/platform/WebURL.h" |
| +#include "public/platform/WebVector.h" |
| +#include "weborigin/SecurityOrigin.h" |
| +#include "wtf/ArrayBuffer.h" |
| +#include "wtf/OwnPtr.h" |
| +#include "wtf/PassRefPtr.h" |
| +#include "wtf/RefPtr.h" |
| +#include "wtf/Vector.h" |
| +#include "wtf/text/WTFString.h" |
| + |
| +// FIXME: The following notifications are not implemented: |
| +// InspectorInstrument::willSendWebSocketHandshake |
| +// InspectorInstrument::didReceiveWebSocketHandshakeResponse |
| +// InspectorInstrument::didReceiveWebSocketFrame |
| +// InspectorInstrument::didSendWebSocketFrame |
| + |
| +namespace WebCore { |
| + |
| +namespace { |
| + |
| +bool isClean(int code) |
| +{ |
| + return code == WebSocketChannel::CloseEventCodeNormalClosure |
| + || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code |
| + && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined); |
| +} |
| + |
| +} // namespace |
| + |
| +NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) |
| + : m_context(context) |
| + , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle())) |
| + , m_client(client) |
| + , m_identifier(0) |
| + , m_state(NotConnected) |
| + , m_sendingQuota(0) |
| + , m_receivedDataSizeForFlowControl(0) |
| + , m_bufferedAmount(0) |
| + , m_sentSizeOfTopMessage(0) |
| + , m_hasAlreadyFailed(false) |
| + , m_isSuspended(false) |
| + , m_sourceURLAtConnection(sourceURL) |
| + , m_lineNumberAtConnection(lineNumber) |
| +{ |
| + if (context->isDocument() && toDocument(context)->page()) { |
| + m_identifier = createUniqueIdentifier(); |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol) |
| +{ |
| + ASSERT(m_state == NotConnected); |
| + LOG(Network, "NewWebSocketChannelImpl %p connect()", this); |
| + if (m_identifier) { |
| + InspectorInstrumentation::didCreateWebSocket(toDocument(m_context), m_identifier, url, protocol); |
| + } |
| + m_state = Connecting; |
| + m_url = url; |
| + Vector<String> protocols; |
| + // Since protocol is already verified and escaped, we can simply split it. |
| + protocol.split(", ", true, protocols); |
| + WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size()); |
| + for (size_t i = 0; i < protocols.size(); ++i) { |
| + webProtocols[i] = protocols[i]; |
| + } |
| + m_handle->connect(url, webProtocols, m_context->securityOrigin()->toString(), this); |
|
tyoshino (SeeGerritForStatus)
2013/08/26 08:04:11
TODO to remember porting callStack copying code?
yhirano
2013/08/26 08:36:18
Thanks, I added an implementation.
|
| +} |
| + |
| +String NewWebSocketChannelImpl::subprotocol() |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p subprotocol()", this); |
| + return m_subprotocol; |
| +} |
| + |
| +String NewWebSocketChannelImpl::extensions() |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p extensions()", this); |
| + return m_extensions; |
| +} |
| + |
| +WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const String& message) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p sendText(%s)", this, message.utf8().data()); |
| + if (m_state != Open) { |
| + return SendFail; |
| + } |
| + m_messages.append(Message(message)); |
| + sendInternal(); |
| + return SendSuccess; |
| +} |
| + |
| +WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const Blob& blob) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p sendBlob()", this); |
| + if (m_state != Open) { |
| + return SendFail; |
| + } |
| + m_messages.append(Message(blob)); |
| + sendInternal(); |
| + return SendSuccess; |
| +} |
| + |
| +WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const ArrayBuffer& buffer, unsigned byteOffset, unsigned byteLength) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p sendArrayBuffer(%p, %u, %u)", this, buffer.data(), byteOffset, byteLength); |
| + if (m_state != Open) { |
| + return SendFail; |
| + } |
| + m_messages.append(buffer.slice(byteOffset, byteOffset + byteLength)); |
| + sendInternal(); |
| + return SendSuccess; |
| +} |
| + |
| +unsigned long NewWebSocketChannelImpl::bufferedAmount() const |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p bufferedAmount()", this); |
| + return m_bufferedAmount; |
| +} |
| + |
| +void NewWebSocketChannelImpl::close(int code, const String& reason) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p close(%d, %s)", this, code, reason.utf8().data()); |
| + if (m_state == Closing || m_state == Closed) { |
| + return; |
| + } |
| + ASSERT(m_handle); |
| + if (m_state == Open) { |
| + m_handle->close(static_cast<unsigned short>(code), reason); |
|
tyoshino (SeeGerritForStatus)
2013/08/26 08:04:11
closing handshake timeout will be handled by lower
yhirano
2013/08/26 08:36:18
Yes.
|
| + } |
| + m_state = Closing; |
| +} |
| + |
| +void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p fail(%s)", this, reason.utf8().data()); |
| + // m_handle and m_client can be null here. |
| + const String message = "WebSocket connection to '" + m_url.elidedString() + "' failed: " + reason; |
| + m_context->addConsoleMessage(JSMessageSource, level, message, sourceURL, lineNumber); |
| + if (m_identifier) { |
| + InspectorInstrumentation::didReceiveWebSocketFrameError(toDocument(m_context), m_identifier, reason); |
| + } |
| + |
| + if (m_client && !m_hasAlreadyFailed) { |
| + if (m_isSuspended) { |
| + m_pendingEvents.append(PendingEvent(PendingEvent::DidReceiveError)); |
| + } else { |
| + m_client->didReceiveMessageError(); |
| + } |
| + } |
| + m_hasAlreadyFailed = true; |
| + if (m_state != Closing && m_state != Closed) { |
| + disconnect(); |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::disconnect() |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p disconnect()", this); |
| + if (m_state == Closed) { |
| + return; |
| + } |
| + if (m_identifier) { |
| + InspectorInstrumentation::didCloseWebSocket(toDocument(m_context), m_identifier); |
| + } |
| + ASSERT(m_handle); |
| + if (m_state != Closing) { |
| + m_handle->close(CloseEventCodeAbnormalClosure, ""); |
| + } |
| + m_state = Closed; |
| + m_handle = 0; |
| + m_client = 0; |
| +} |
| + |
| +void NewWebSocketChannelImpl::suspend() |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p suspend()", this); |
| + m_isSuspended = true; |
| +} |
| + |
| +void NewWebSocketChannelImpl::resume() |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p resume()", this); |
| + m_isSuspended = false; |
| + if (!m_client) { |
| + ASSERT(m_state == Closed); |
| + ASSERT(m_pendingEvents.isEmpty()); |
| + return; |
| + } |
| + sendInternal(); |
| + flowControlIfNecessary(); |
| + processPendingEvents(); |
| + // processPendingEvents may delete this object. |
| +} |
| + |
| +NewWebSocketChannelImpl::Message::Message(const String& text) |
| + : type(MessageTypeText) |
| + , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD)) { } |
| + |
| +NewWebSocketChannelImpl::Message::Message(const Blob& blob) |
| + : type(MessageTypeBlob) |
| + , blob(Blob::create(blob.url(), blob.type(), blob.size())) { } |
| + |
| +NewWebSocketChannelImpl::Message::Message(PassRefPtr<ArrayBuffer> arrayBuffer) |
| + : type(MessageTypeArrayBuffer) |
| + , arrayBuffer(arrayBuffer) { } |
| + |
| +void NewWebSocketChannelImpl::sendInternal() |
| +{ |
| + if (m_state != Open || m_blobLoader || !m_sendingQuota) { |
| + return; |
| + } |
| + ASSERT(m_handle); |
| + ASSERT(m_client); |
| + unsigned long bufferedAmount = m_bufferedAmount; |
| + size_t i; |
| + for (i = 0; i < m_messages.size(); ++i) { |
| + if (!m_sendingQuota) { |
| + break; |
| + } |
| + const Message& message = m_messages[i]; |
| + switch (message.type) { |
| + case MessageTypeText: { |
| + WebKit::WebSocketHandle::MessageType type = |
| + m_sentSizeOfTopMessage ? WebKit::WebSocketHandle::MessageTypeContinuation : WebKit::WebSocketHandle::MessageTypeText; |
| + size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.text.length() - m_sentSizeOfTopMessage); |
| + m_handle->send(type, message.text.data() + m_sentSizeOfTopMessage, size, m_sendingQuota == size); |
| + m_sentSizeOfTopMessage += size; |
| + m_sendingQuota -= size; |
| + break; |
| + } |
| + case MessageTypeBlob: |
| + startLoadingBlob(*message.blob); |
| + break; |
| + case MessageTypeArrayBuffer: { |
| + WebKit::WebSocketHandle::MessageType type = |
| + m_sentSizeOfTopMessage ? WebKit::WebSocketHandle::MessageTypeContinuation : WebKit::WebSocketHandle::MessageTypeBinary; |
| + size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.arrayBuffer->byteLength() - m_sentSizeOfTopMessage); |
| + m_handle->send(type, static_cast<const char*>(message.arrayBuffer->data()) + m_sentSizeOfTopMessage, size, m_sendingQuota == size); |
| + m_sentSizeOfTopMessage += size; |
| + m_sendingQuota -= size; |
| + break; |
| + } |
| + default: |
| + ASSERT_NOT_REACHED(); |
| + } |
| + if (m_blobLoader || !m_sendingQuota) { |
| + break; |
| + } |
| + |
| + m_sentSizeOfTopMessage = 0; |
| + } |
| + // Drop consumed messages. |
| + m_messages.remove(0, i); |
| + if (!m_isSuspended && m_bufferedAmount != bufferedAmount) { |
| + m_client->didUpdateBufferedAmount(m_bufferedAmount); |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::flowControlIfNecessary() |
| +{ |
| + if (m_state != Open) { |
| + return; |
| + } |
| + ASSERT(m_handle); |
| + if (m_receivedDataSizeForFlowControl < receivedDataSizeForFlowControlHighWaterMark) { |
| + return; |
| + } |
| + m_handle->flowControl(m_receivedDataSizeForFlowControl); |
| + m_receivedDataSizeForFlowControl = 0; |
| +} |
| + |
| +void NewWebSocketChannelImpl::didConnect(WebKit::WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p didConnect(%p, %d, %s, %s)", this, handle, succeed, selectedProtocol.utf8().data(), extensions.utf8().data()); |
| + if (m_state != Connecting) { |
| + return; |
| + } |
| + ASSERT(handle == m_handle); |
| + ASSERT(m_client); |
| + if (!succeed) { |
| + failAsError("Cannot connect to " + m_url.string() + "."); |
| + return; |
| + } |
| + m_state = Open; |
| + m_subprotocol = selectedProtocol; |
| + m_extensions = extensions; |
| + if (m_isSuspended) { |
| + m_pendingEvents.append(PendingEvent(PendingEvent::DidConnectComplete)); |
| + } else { |
| + m_client->didConnect(); |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::didReceiveData(WebKit::WebSocketHandle* handle, WebKit::WebSocketHandle::MessageType type, const char* data, size_t size, bool fin) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p didReceiveData(%p, (%p, %zu), %d, %d)", this, handle, data.data(), data.size(), type, fin); |
| + if (m_state != Open) { |
| + return; |
| + } |
| + ASSERT(handle == m_handle); |
| + ASSERT(m_client); |
| + if (type == WebKit::WebSocketHandle::MessageTypeText) { |
| + ASSERT(m_receivingMessageData.isEmpty()); |
| + m_receivingMessageTypeIsText = true; |
| + } else if (type == WebKit::WebSocketHandle::MessageTypeBinary) { |
| + ASSERT(m_receivingMessageData.isEmpty()); |
| + m_receivingMessageTypeIsText = false; |
| + } else { |
| + ASSERT(type == WebKit::WebSocketHandle::MessageTypeContinuation); |
| + ASSERT(!m_receivingMessageData.isEmpty()); |
| + } |
| + m_receivingMessageData.append(data, size); |
| + m_receivedDataSizeForFlowControl += size; |
| + flowControlIfNecessary(); |
| + if (fin) { |
| + if (m_isSuspended) { |
| + m_pendingEvents.append(PendingEvent(m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage)); |
| + m_pendingEvents.last().message.swap(m_receivingMessageData); |
| + } else { |
| + if (m_receivingMessageTypeIsText) { |
| + String message = ""; |
| + if (m_receivingMessageData.size() > 0) { |
| + message = String::fromUTF8(m_receivingMessageData.data(), m_receivingMessageData.size()); |
| + } |
| + if (message.isNull()) { |
| + failAsError("Could not decode a text frame as UTF-8."); |
| + } else { |
| + m_client->didReceiveMessage(message); |
| + } |
| + } else { |
| + OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>); |
| + m_receivingMessageData.swap(*binaryData); |
| + m_client->didReceiveBinaryData(binaryData.release()); |
| + } |
| + } |
| + m_receivingMessageData.clear(); |
| + } |
| +} |
| + |
| +void NewWebSocketChannelImpl::didClose(WebKit::WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason) |
| +{ |
| + // FIXME: Maybe we should notify an error to m_client for some didClose messages. |
| + LOG(Network, "NewWebSocketChannelImpl %p didClose(%p, %d, %s)", this, code, String(reason).utf8().data()); |
| + if (m_state == Closed) { |
| + return; |
| + } |
| + if (m_identifier) { |
| + InspectorInstrumentation::didCloseWebSocket(toDocument(m_context), m_identifier); |
| + } |
| + ASSERT(handle == m_handle); |
| + m_handle = 0; |
| + m_state = Closed; |
| + if (m_isSuspended) { |
| + m_pendingEvents.append(PendingEvent(code, reason)); |
| + return; |
| + } |
| + WebSocketChannelClient* client = m_client; |
| + m_client = 0; |
| + ASSERT(client); |
| + client->didClose(m_bufferedAmount, |
| + isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete, |
| + code, |
| + reason); |
| + // client->didClose may delete this object. |
| +} |
| + |
| +void NewWebSocketChannelImpl::didFinishLoading() |
| +{ |
| + // m_client can be invalid here. |
| + LOG(Network, "NewWebSocketChannelImpl %p didFinishLoading()", this); |
| + if (m_state == Open) { |
| + ASSERT(m_handle); |
| + // The loaded blob is always placed on m_messages[0]. |
| + ASSERT(m_messages.size() > 0 && m_messages[0].type == MessageTypeBlob); |
| + // We replace it with the loaded blob. |
| + m_messages[0] = Message(m_blobLoader->arrayBufferResult()); |
| + sendInternal(); |
| + } |
| + m_blobLoader.clear(); |
| + |
| + deref(); |
| + // deref() may delete this object. |
| +} |
| + |
| +void NewWebSocketChannelImpl::didFail(FileError::ErrorCode errorCode) |
| +{ |
| + // m_client can be invalid here. |
| + LOG(Network, "NewWebSocketChannelImpl %p didFail(%d)", this, errorCode); |
| + m_blobLoader.clear(); |
| + failAsError("Failed to load Blob: error code = " + String::number(errorCode)); // FIXME: Generate human-friendly reason message. |
| + deref(); |
| + // deref() may delete this object. |
| +} |
| + |
| +void NewWebSocketChannelImpl::startLoadingBlob(const Blob& blob) |
| +{ |
| + LOG(Network, "NewWebSocketChannelImpl %p startLoadingBlob(%s)", this, blob.url().string().utf8().data()); |
| + ASSERT(!m_blobLoader); |
| + // Protect this object until the loading completes or fails. |
| + ref(); |
| + |
| + m_blobLoader = adoptPtr(new FileReaderLoader(FileReaderLoader::ReadAsArrayBuffer, this)); |
| + m_blobLoader->start(m_context, blob); |
| +} |
| + |
| +void NewWebSocketChannelImpl::processPendingEvents() |
| +{ |
| + RefPtr<NewWebSocketChannelImpl> protect(this); |
| + ASSERT(!m_isSuspended); |
| + |
| + for (size_t i = 0; i < m_pendingEvents.size(); ++i) { |
| + ASSERT(m_client); |
| + PendingEvent& event = m_pendingEvents[i]; |
| + switch (event.type) { |
| + case PendingEvent::DidConnectComplete: |
| + m_client->didConnect(); |
| + break; |
| + case PendingEvent::DidReceiveTextMessage: { |
| + String message = ""; |
| + if (event.message.size() > 0) { |
| + message = String::fromUTF8(event.message.data(), event.message.size()); |
| + } |
| + if (message.isNull()) { |
| + failAsError("Could not decode a text frame as UTF-8."); |
| + // m_client can be null here. |
| + } else { |
| + m_client->didReceiveMessage(message); |
| + } |
| + break; |
| + } |
| + case PendingEvent::DidReceiveBinaryMessage: { |
| + OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>); |
| + event.message.swap(*binaryData); |
| + m_client->didReceiveBinaryData(binaryData.release()); |
| + break; |
| + } |
| + case PendingEvent::DidReceiveError: |
| + m_client->didReceiveMessageError(); |
| + break; |
| + case PendingEvent::DidClose: |
| + ASSERT(m_state == Closed); |
| + ASSERT(!m_handle); |
| + m_client->didClose(m_bufferedAmount, |
| + isClean(event.closingCode) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete, |
| + event.closingCode, |
| + event.closingReason); |
| + // m_client can be invalid here. |
| + m_client = 0; |
| + break; |
| + default: |
| + ASSERT_NOT_REACHED(); |
| + break; |
| + } |
| + |
| + if (event.type == PendingEvent::DidClose || !m_client) { |
| + // Drop remaining messages. |
| + break; |
| + } |
| + } |
| + m_pendingEvents.clear(); |
| +} |
| + |
| +} // namespace WebCore |