Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1133)

Unified Diff: Source/modules/websockets/NewWebSocketChannelImpl.cpp

Issue 22914026: [ABANDONED] Introduce blink-side bridges for the new WebSocket implementation. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | public/platform/Platform.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: Source/modules/websockets/NewWebSocketChannelImpl.cpp
diff --git a/Source/modules/websockets/NewWebSocketChannelImpl.cpp b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..b637e5597c984f3044bbe5c39cbf36aa2b0d8c19
--- /dev/null
+++ b/Source/modules/websockets/NewWebSocketChannelImpl.cpp
@@ -0,0 +1,569 @@
+/*
+ * Copyright (C) 2013 Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "modules/websockets/NewWebSocketChannelImpl.h"
+
+#include "bindings/v8/ScriptCallStackFactory.h"
+#include "core/dom/ScriptExecutionContext.h"
+#include "core/fileapi/Blob.h"
+#include "core/fileapi/FileReaderLoader.h"
+#include "core/inspector/InspectorInstrumentation.h"
+#include "core/inspector/ScriptCallStack.h"
+#include "core/loader/UniqueIdentifier.h"
+#include "core/platform/Logging.h"
+#include "modules/websockets/WebSocketChannel.h"
+#include "modules/websockets/WebSocketChannelClient.h"
+#include "public/platform/Platform.h"
+#include "public/platform/WebData.h"
+#include "public/platform/WebSocketHandle.h"
+#include "public/platform/WebString.h"
+#include "public/platform/WebURL.h"
+#include "public/platform/WebVector.h"
+#include "weborigin/SecurityOrigin.h"
+#include "wtf/ArrayBuffer.h"
+#include "wtf/OwnPtr.h"
+#include "wtf/PassRefPtr.h"
+#include "wtf/RefPtr.h"
+#include "wtf/Vector.h"
+#include "wtf/text/WTFString.h"
+
+// FIXME: The following notifications are not implemented:
+// InspectorInstrument::willSendWebSocketHandshake
+// InspectorInstrument::didReceiveWebSocketHandshakeResponse
+// InspectorInstrument::didReceiveWebSocketFrame
+// InspectorInstrument::didSendWebSocketFrame
+
+using WebKit::WebSocketHandle;
+
+namespace WebCore {
+
+namespace {
+
+bool isClean(int code)
+{
+ return code == WebSocketChannel::CloseEventCodeNormalClosure
+ || (WebSocketChannel::CloseEventCodeMinimumUserDefined <= code
+ && code <= WebSocketChannel::CloseEventCodeMaximumUserDefined);
+}
+
+} // namespace
+
+NewWebSocketChannelImpl::NewWebSocketChannelImpl(ScriptExecutionContext* context, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
+ : m_context(context)
+ , m_handle(adoptPtr(WebKit::Platform::current()->createWebSocketHandle()))
+ , m_client(client)
+ , m_identifier(0)
+ , m_state(NotConnected)
+ , m_sendingQuota(0)
+ , m_receivedDataSizeForFlowControl(0)
+ , m_bufferedAmount(0)
+ , m_sentSizeOfTopMessage(0)
+ , m_hasAlreadyFailed(false)
+ , m_suspendState(Active)
+ , m_resumeTimer(this, &NewWebSocketChannelImpl::resumeTimerFired)
+ , m_sourceURLAtConnection(sourceURL)
+ , m_lineNumberAtConnection(lineNumber)
+{
+ if (context->isDocument() && toDocument(context)->page()) {
+ m_identifier = createUniqueIdentifier();
+ }
+}
+
+void NewWebSocketChannelImpl::connect(const KURL& url, const String& protocol)
+{
+ ASSERT(m_state == NotConnected);
+ LOG(Network, "NewWebSocketChannelImpl %p connect()", this);
+ if (m_identifier) {
+ InspectorInstrumentation::didCreateWebSocket(toDocument(m_context), m_identifier, url, protocol);
+ }
+ m_state = Connecting;
+ m_url = url;
+ Vector<String> protocols;
+ // Since protocol is already verified and escaped, we can simply split it.
+ protocol.split(", ", true, protocols);
+ WebKit::WebVector<WebKit::WebString> webProtocols(protocols.size());
+ for (size_t i = 0; i < protocols.size(); ++i) {
+ webProtocols[i] = protocols[i];
+ }
+ m_handle->connect(url, webProtocols, m_context->securityOrigin()->toString(), this);
+
+ RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
+ if (callStack && callStack->size()) {
+ m_sourceURLAtConnection = callStack->at(0).sourceURL();
+ m_lineNumberAtConnection = callStack->at(0).lineNumber();
+ }
+}
+
+String NewWebSocketChannelImpl::subprotocol()
+{
+ LOG(Network, "NewWebSocketChannelImpl %p subprotocol()", this);
+ return m_subprotocol;
+}
+
+String NewWebSocketChannelImpl::extensions()
+{
+ LOG(Network, "NewWebSocketChannelImpl %p extensions()", this);
+ return m_extensions;
+}
+
+WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const String& message)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p sendText(%s)", this, message.utf8().data());
+ if (m_state != Open) {
+ return SendFail;
+ }
+ m_messages.append(Message(message));
+ sendInternal();
+ return SendSuccess;
+}
+
+WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const Blob& blob)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p sendBlob()", this);
+ if (m_state != Open) {
+ return SendFail;
+ }
+ m_messages.append(Message(blob));
+ sendInternal();
+ return SendSuccess;
+}
+
+WebSocketChannel::SendResult NewWebSocketChannelImpl::send(const ArrayBuffer& buffer, unsigned byteOffset, unsigned byteLength)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p sendArrayBuffer(%p, %u, %u)", this, buffer.data(), byteOffset, byteLength);
+ if (m_state != Open) {
+ return SendFail;
+ }
+ // buffer.slice copies its contents.
+ m_messages.append(buffer.slice(byteOffset, byteOffset + byteLength));
+ sendInternal();
+ return SendSuccess;
+}
+
+unsigned long NewWebSocketChannelImpl::bufferedAmount() const
+{
+ LOG(Network, "NewWebSocketChannelImpl %p bufferedAmount()", this);
+ return m_bufferedAmount;
+}
+
+void NewWebSocketChannelImpl::close(int code, const String& reason)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p close(%d, %s)", this, code, reason.utf8().data());
+ if (m_state == Closing || m_state == Closed) {
+ return;
+ }
+ ASSERT(m_handle);
+ if (m_state == Open) {
+ m_handle->close(static_cast<unsigned short>(code), reason);
+ }
+ m_state = Closing;
+}
+
+void NewWebSocketChannelImpl::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p fail(%s)", this, reason.utf8().data());
+ // m_handle and m_client can be null here.
+ const String message = "WebSocket connection to '" + m_url.elidedString() + "' failed: " + reason;
+ m_context->addConsoleMessage(JSMessageSource, level, message, sourceURL, lineNumber);
+ if (m_identifier) {
+ InspectorInstrumentation::didReceiveWebSocketFrameError(toDocument(m_context), m_identifier, reason);
+ }
+
+ if (m_client && !m_hasAlreadyFailed) {
+ if (m_suspendState == Active) {
+ m_client->didReceiveMessageError();
+ } else {
+ m_pendingEvents.append(PendingEvent(PendingEvent::DidReceiveError));
+ }
+ }
+ m_hasAlreadyFailed = true;
+ if (m_state == Closing || m_state == Closed) {
+ return;
+ }
+ m_state = Closed;
+ m_handle = 0;
+ unsigned short code = CloseEventCodeAbnormalClosure;
+ if (m_suspendState != Active) {
+ m_pendingEvents.append(PendingEvent(code, reason));
+ return;
+ }
+ handleDidClose(code, reason);
+ // handleDidClose can delete this object.
+}
+
+void NewWebSocketChannelImpl::disconnect()
+{
+ LOG(Network, "NewWebSocketChannelImpl %p disconnect()", this);
+ if (m_state == Closed) {
+ return;
+ }
+ if (m_identifier) {
+ InspectorInstrumentation::didCloseWebSocket(toDocument(m_context), m_identifier);
+ }
+ ASSERT(m_handle);
+ if (m_state != Closing) {
+ m_handle->close(CloseEventCodeAbnormalClosure, "");
+ }
+ m_state = Closed;
+ m_handle = 0;
+ m_client = 0;
+ m_pendingEvents.clear();
+}
+
+void NewWebSocketChannelImpl::suspend()
+{
+ LOG(Network, "NewWebSocketChannelImpl %p suspend()", this);
+ m_suspendState = Suspended;
+}
+
+void NewWebSocketChannelImpl::resume()
+{
+ LOG(Network, "NewWebSocketChannelImpl %p resume()", this);
+ m_suspendState = Resuming;
+ // Use a timer to finish this function quickly.
+ if (!m_resumeTimer.isActive()) {
+ // Protect this object until the timer fires.
+ ref();
+ m_resumeTimer.startOneShot(0);
+ }
+}
+
+NewWebSocketChannelImpl::Message::Message(const String& text)
+ : type(MessageTypeText)
+ , text(text.utf8(String::StrictConversionReplacingUnpairedSurrogatesWithFFFD)) { }
+
+NewWebSocketChannelImpl::Message::Message(const Blob& blob)
+ : type(MessageTypeBlob)
+ , blob(Blob::create(blob.url(), blob.type(), blob.size())) { }
+
+NewWebSocketChannelImpl::Message::Message(PassRefPtr<ArrayBuffer> arrayBuffer)
+ : type(MessageTypeArrayBuffer)
+ , arrayBuffer(arrayBuffer) { }
+
+void NewWebSocketChannelImpl::sendInternal()
+{
+ if (m_state != Open || m_blobLoader || !m_sendingQuota) {
+ return;
+ }
+ ASSERT(m_handle);
+ ASSERT(m_client);
+ unsigned long bufferedAmount = m_bufferedAmount;
+ while (!m_messages.isEmpty()) {
+ if (!m_sendingQuota) {
+ break;
+ }
+ bool final = false;
+ const Message& message = m_messages.first();
+ switch (message.type) {
+ case MessageTypeText: {
+ WebSocketHandle::MessageType type =
+ m_sentSizeOfTopMessage ? WebSocketHandle::MessageTypeContinuation : WebSocketHandle::MessageTypeText;
+ size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.text.length() - m_sentSizeOfTopMessage);
+ final = (m_sendingQuota == size);
+ m_handle->send(type, message.text.data() + m_sentSizeOfTopMessage, size, final);
+ m_sentSizeOfTopMessage += size;
+ m_sendingQuota -= size;
+ break;
+ }
+ case MessageTypeBlob:
+ startLoadingBlob(*message.blob);
+ break;
+ case MessageTypeArrayBuffer: {
+ WebSocketHandle::MessageType type =
+ m_sentSizeOfTopMessage ? WebSocketHandle::MessageTypeContinuation : WebSocketHandle::MessageTypeBinary;
+ size_t size = std::min(static_cast<size_t>(m_sendingQuota), message.arrayBuffer->byteLength() - m_sentSizeOfTopMessage);
+ final = (m_sendingQuota == size);
+ m_handle->send(type, static_cast<const char*>(message.arrayBuffer->data()) + m_sentSizeOfTopMessage, size, final);
+ m_sentSizeOfTopMessage += size;
+ m_sendingQuota -= size;
+ break;
+ }
+ default:
+ ASSERT_NOT_REACHED();
+ }
+ if (m_blobLoader) {
+ break;
+ }
+ ASSERT(final || !m_sendingQuota);
+ if (final) {
+ m_messages.removeFirst();
+ m_sentSizeOfTopMessage = 0;
+ }
+ }
+ if (m_suspendState == Active && m_bufferedAmount != bufferedAmount) {
+ m_client->didUpdateBufferedAmount(m_bufferedAmount);
+ }
+}
+
+void NewWebSocketChannelImpl::flowControlIfNecessary()
+{
+ if (m_state != Open) {
+ return;
+ }
+ ASSERT(m_handle);
+ if (m_receivedDataSizeForFlowControl < receivedDataSizeForFlowControlHighWaterMark) {
+ return;
+ }
+ m_handle->flowControl(m_receivedDataSizeForFlowControl);
+ m_receivedDataSizeForFlowControl = 0;
+}
+
+void NewWebSocketChannelImpl::didConnect(WebSocketHandle* handle, bool succeed, const WebKit::WebString& selectedProtocol, const WebKit::WebString& extensions)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p didConnect(%p, %d, %s, %s)", this, handle, succeed, selectedProtocol.utf8().data(), extensions.utf8().data());
+ if (m_state != Connecting) {
+ return;
+ }
+ ASSERT(handle == m_handle);
+ ASSERT(m_client);
+ if (!succeed) {
+ failAsError("Cannot connect to " + m_url.string() + ".");
+ // failAsError can delete this object.
+ return;
+ }
+ m_state = Open;
+ m_subprotocol = selectedProtocol;
+ m_extensions = extensions;
+ if (m_suspendState == Active) {
+ m_client->didConnect();
+ } else {
+ m_pendingEvents.append(PendingEvent(PendingEvent::DidConnectComplete));
+ }
+}
+
+void NewWebSocketChannelImpl::didReceiveData(WebSocketHandle* handle, WebSocketHandle::MessageType type, const char* data, size_t size, bool fin)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p didReceiveData(%p, %d, (%p, %zu), %d)", this, handle, type, data, size, fin);
+ if (m_state != Open) {
+ return;
+ }
+ ASSERT(handle == m_handle);
+ ASSERT(m_client);
+ // Non-final frames cannot be empty.
+ ASSERT(fin || size);
+ switch (type) {
+ case WebSocketHandle::MessageTypeText:
+ ASSERT(m_receivingMessageData.isEmpty());
+ m_receivingMessageTypeIsText = true;
+ break;
+ case WebSocketHandle::MessageTypeBinary:
+ ASSERT(m_receivingMessageData.isEmpty());
+ m_receivingMessageTypeIsText = false;
+ break;
+ case WebSocketHandle::MessageTypeContinuation:
+ ASSERT(!m_receivingMessageData.isEmpty());
+ break;
+ default:
+ ASSERT_NOT_REACHED();
+ break;
+ }
+ m_receivingMessageData.append(data, size);
+ m_receivedDataSizeForFlowControl += size;
+ flowControlIfNecessary();
+ if (!fin) {
+ return;
+ }
+ if (m_suspendState != Active) {
+ m_pendingEvents.append(PendingEvent(m_receivingMessageTypeIsText ? PendingEvent::DidReceiveTextMessage : PendingEvent::DidReceiveBinaryMessage));
+ m_pendingEvents.last().message.swap(m_receivingMessageData);
+ return;
+ }
+ Vector<char> messageData;
+ messageData.swap(m_receivingMessageData);
+ if (m_receivingMessageTypeIsText) {
+ handleTextMessage(&messageData);
+ // handleTextMessage can delete this object.
+ } else {
+ handleBinaryMessage(&messageData);
+ }
+}
+
+
+void NewWebSocketChannelImpl::didClose(WebSocketHandle* handle, unsigned short code, const WebKit::WebString& reason)
+{
+ // FIXME: Maybe we should notify an error to m_client for some didClose messages.
+ LOG(Network, "NewWebSocketChannelImpl %p didClose(%p, %d, %s)", this, code, String(reason).utf8().data());
+ if (m_state == Closed) {
+ return;
+ }
+ if (m_identifier) {
+ InspectorInstrumentation::didCloseWebSocket(toDocument(m_context), m_identifier);
+ }
+ ASSERT(handle == m_handle);
+ m_handle = 0;
+ m_state = Closed;
+ if (m_suspendState != Active) {
+ m_pendingEvents.append(PendingEvent(code, reason));
+ return;
+ }
+ handleDidClose(code, reason);
+ // handleDidClose may delete this object.
+}
+
+void NewWebSocketChannelImpl::didFinishLoading()
+{
+ // m_client can be invalid here.
+ LOG(Network, "NewWebSocketChannelImpl %p didFinishLoading()", this);
+ if (m_state == Open) {
+ ASSERT(m_handle);
+ // The loaded blob is always placed on m_messages[0].
+ ASSERT(m_messages.size() > 0 && m_messages.first().type == MessageTypeBlob);
+ // We replace it with the loaded blob.
+ m_messages.first() = Message(m_blobLoader->arrayBufferResult());
+ sendInternal();
+ }
+ m_blobLoader.clear();
+
+ deref();
+ // deref() may delete this object.
+}
+
+void NewWebSocketChannelImpl::didFail(FileError::ErrorCode errorCode)
+{
+ // m_client can be invalid here.
+ LOG(Network, "NewWebSocketChannelImpl %p didFail(%d)", this, errorCode);
+ m_blobLoader.clear();
+ failAsError("Failed to load Blob: error code = " + String::number(errorCode)); // FIXME: Generate human-friendly reason message.
+ deref();
+ // deref() may delete this object.
+}
+
+void NewWebSocketChannelImpl::resumeTimerFired(Timer<NewWebSocketChannelImpl>*)
+{
+ RefPtr<NewWebSocketChannelImpl> protect(this);
+ deref();
+
+ if (!m_client) {
+ ASSERT(m_state == Closed);
+ return;
+ }
+ if (m_suspendState == Suspended) {
+ return;
+ }
+ ASSERT(m_suspendState == Resuming);
+ m_suspendState = Active;
+ sendInternal();
+ flowControlIfNecessary();
+ processPendingEvents();
+}
+
+void NewWebSocketChannelImpl::handleTextMessage(Vector<char>* messageData)
+{
+ ASSERT(m_suspendState == Active);
+ ASSERT(messageData);
+ String message = "";
+ if (m_receivingMessageData.size() > 0) {
+ message = String::fromUTF8(m_receivingMessageData.data(), m_receivingMessageData.size());
+ }
+ if (message.isNull()) {
+ failAsError("Could not decode a text frame as UTF-8.");
+ } else {
+ m_client->didReceiveMessage(message);
+ }
+}
+
+void NewWebSocketChannelImpl::handleBinaryMessage(Vector<char>* messageData)
+{
+ ASSERT(m_suspendState == Active);
+ ASSERT(messageData);
+ OwnPtr<Vector<char> > binaryData = adoptPtr(new Vector<char>);
+ messageData->swap(*binaryData);
+ m_client->didReceiveBinaryData(binaryData.release());
+}
+
+void NewWebSocketChannelImpl::handleDidClose(unsigned short code, const String& reason)
+{
+ ASSERT(m_state == Closed);
+ ASSERT(m_suspendState == Active);
+ ASSERT(m_client);
+ WebSocketChannelClient* client = m_client;
+ m_client = 0;
+ WebSocketChannelClient::ClosingHandshakeCompletionStatus status =
+ isClean(code) ? WebSocketChannelClient::ClosingHandshakeComplete : WebSocketChannelClient::ClosingHandshakeIncomplete;
+ client->didClose(m_bufferedAmount, status, code, reason);
+ // client->didClose may delete this object.
+}
+
+void NewWebSocketChannelImpl::startLoadingBlob(const Blob& blob)
+{
+ LOG(Network, "NewWebSocketChannelImpl %p startLoadingBlob(%s)", this, blob.url().string().utf8().data());
+ ASSERT(!m_blobLoader);
+ // Protect this object until the loading completes or fails.
+ ref();
+
+ m_blobLoader = adoptPtr(new FileReaderLoader(FileReaderLoader::ReadAsArrayBuffer, this));
+ m_blobLoader->start(m_context, blob);
+}
+
+void NewWebSocketChannelImpl::processPendingEvents()
+{
+ RefPtr<NewWebSocketChannelImpl> protect(this);
+ ASSERT(m_suspendState == Active);
+
+ for (size_t i = 0; i < m_pendingEvents.size(); ++i) {
+ ASSERT(m_client);
+ PendingEvent& event = m_pendingEvents[i];
+ switch (event.type) {
+ case PendingEvent::DidConnectComplete:
+ m_client->didConnect();
+ break;
+ case PendingEvent::DidReceiveTextMessage:
+ handleTextMessage(&event.message);
+ // m_client can be invalid here.
+ break;
+ case PendingEvent::DidReceiveBinaryMessage:
+ handleBinaryMessage(&event.message);
+ break;
+ case PendingEvent::DidReceiveError:
+ m_client->didReceiveMessageError();
+ break;
+ case PendingEvent::DidClose: {
+ ASSERT(m_state == Closed);
+ ASSERT(!m_handle);
+ handleDidClose(event.closingCode, event.closingReason);
+ // m_client can be invalid here.
+ m_client = 0;
+ break;
+ }
+ default:
+ ASSERT_NOT_REACHED();
+ break;
+ }
+
+ if (event.type == PendingEvent::DidClose || !m_client) {
+ // Drop remaining messages.
+ break;
+ }
+ }
+ m_pendingEvents.clear();
+}
+
+} // namespace WebCore
« no previous file with comments | « Source/modules/websockets/NewWebSocketChannelImpl.h ('k') | public/platform/Platform.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698