Index: components/cronet/android/cronet_bidirectional_stream.cc |
diff --git a/components/cronet/android/cronet_bidirectional_stream.cc b/components/cronet/android/cronet_bidirectional_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f1b1e91edb0946f404f223f16339b6f6eb5bf108 |
--- /dev/null |
+++ b/components/cronet/android/cronet_bidirectional_stream.cc |
@@ -0,0 +1,355 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "cronet_bidirectional_stream.h" |
+ |
+#include <limits> |
xunjieli
2015/12/18 19:11:11
nit: not used?
mef
2015/12/29 20:36:54
Done.
|
+#include <vector> |
xunjieli
2015/12/18 19:11:11
nit: also #include <string>
mef
2015/12/29 20:36:54
Done.
|
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/strings/string_number_conversions.h" |
+#include "components/cronet/android/cronet_url_request_context_adapter.h" |
+#include "jni/CronetBidirectionalStream_jni.h" |
+#include "net/base/io_buffer.h" |
+#include "net/base/net_errors.h" |
+#include "net/base/request_priority.h" |
+#include "net/cert/cert_status_flags.h" |
+#include "net/http/bidirectional_stream_request_info.h" |
+#include "net/http/http_network_session.h" |
+#include "net/http/http_response_headers.h" |
+#include "net/http/http_status_code.h" |
+#include "net/http/http_transaction_factory.h" |
+#include "net/http/http_util.h" |
+#include "net/spdy/spdy_header_block.h" |
+#include "net/ssl/ssl_info.h" |
+#include "net/url_request/redirect_info.h" |
+#include "net/url_request/url_request_context.h" |
+ |
+using base::android::ConvertUTF8ToJavaString; |
+ |
+namespace cronet { |
+ |
+// Explicitly register static JNI functions. |
+bool CronetBidirectionalStreamRegisterJni(JNIEnv* env) { |
+ return RegisterNativesImpl(env); |
+} |
+ |
+static jlong CreateBidirectionalStream( |
+ JNIEnv* env, |
+ const JavaParamRef<jobject>& jbidi_stream, |
+ jlong jurl_request_context_adapter) { |
+ CronetURLRequestContextAdapter* context_adapter = |
+ reinterpret_cast<CronetURLRequestContextAdapter*>( |
+ jurl_request_context_adapter); |
+ DCHECK(context_adapter); |
+ |
+ CronetBidirectionalStream* adapter = |
xunjieli
2015/12/18 19:49:25
nit: maybe s/adapter/stream, or rename this class
mef
2015/12/29 20:36:54
I'll rename CronetBidirectionalStream -> CronetBid
|
+ new CronetBidirectionalStream(context_adapter, env, jbidi_stream); |
+ |
+ return reinterpret_cast<jlong>(adapter); |
+} |
+ |
+// TODO(mef): Extract this and its original from cronet_url_request_adapter.cc |
+// into separate module. |
+// net::WrappedIOBuffer subclass for a buffer owned by a Java ByteBuffer. Keeps |
+// the ByteBuffer alive until destroyed. Uses WrappedIOBuffer because data() is |
+// owned by the embedder. |
+class CronetBidirectionalStream::IOBufferWithByteBuffer |
+ : public net::WrappedIOBuffer { |
+ public: |
+ // Creates a buffer wrapping the Java ByteBuffer |jbyte_buffer|. |data| points |
+ // to the memory backed by the ByteBuffer, and position is the location to |
+ // start writing. |
+ IOBufferWithByteBuffer(JNIEnv* env, |
+ jobject jbyte_buffer, |
xunjieli
2015/12/18 19:11:11
nit: the new recommendation is not to pass plain j
mef
2015/12/29 20:36:54
Done.
|
+ void* data, |
+ int position) |
+ : net::WrappedIOBuffer(static_cast<char*>(data) + position), |
+ initial_position_(position) { |
+ DCHECK(data); |
+ DCHECK_EQ(env->GetDirectBufferAddress(jbyte_buffer), data); |
+ byte_buffer_.Reset(env, jbyte_buffer); |
+ } |
+ |
+ int initial_position() const { return initial_position_; } |
+ |
+ jobject byte_buffer() const { return byte_buffer_.obj(); } |
+ |
+ private: |
+ ~IOBufferWithByteBuffer() override {} |
+ |
+ base::android::ScopedJavaGlobalRef<jobject> byte_buffer_; |
+ |
+ const int initial_position_; |
+}; |
+ |
+CronetBidirectionalStream::CronetBidirectionalStream( |
+ CronetURLRequestContextAdapter* context, |
+ JNIEnv* env, |
+ jobject jbidi_stream) |
+ : context_(context) { |
+ DCHECK(!context_->IsOnNetworkThread()); |
+ owner_.Reset(env, jbidi_stream); |
+} |
+ |
+CronetBidirectionalStream::~CronetBidirectionalStream() { |
+ DCHECK(context_->IsOnNetworkThread()); |
+} |
+ |
+jint CronetBidirectionalStream::Start(JNIEnv* env, |
+ jobject jcaller, |
+ jstring jurl, |
+ jstring jmethod, |
+ jobjectArray jheaders, |
+ jboolean jend_of_stream) { |
+ DCHECK(!context_->IsOnNetworkThread()); |
+ // Prepare request info here to be able to return the error. |
+ scoped_ptr<net::BidirectionalStreamRequestInfo> request_info( |
+ new net::BidirectionalStreamRequestInfo()); |
+ request_info->url = GURL(base::android::ConvertJavaStringToUTF8(env, jurl)); |
xunjieli
2015/12/18 19:11:11
nit: We have "using base::android::ConvertUTF8ToJa
mef
2015/12/29 20:36:53
Done.
|
+ // Http method is a token, just as header name. |
+ request_info->method = base::android::ConvertJavaStringToUTF8(env, jmethod); |
+ if (!net::HttpUtil::IsValidHeaderName(request_info->method)) |
+ return -1; |
+ |
+ std::vector<std::string> headers; |
+ base::android::AppendJavaStringArrayToStringVector(env, jheaders, &headers); |
+ for (size_t i = 0; i < headers.size(); i += 2) { |
+ std::string name(headers[i]); |
+ std::string value(headers[i + 1]); |
+ if (!net::HttpUtil::IsValidHeaderName(name) || |
+ !net::HttpUtil::IsValidHeaderValue(value)) { |
+ return i + 1; |
+ } |
+ |
+ request_info->extra_headers.SetHeader(name, value); |
+ } |
+ request_info->end_stream_on_headers = jend_of_stream; |
+ |
+ context_->PostTaskToNetworkThread( |
+ FROM_HERE, |
+ base::Bind(&CronetBidirectionalStream::StartOnNetworkThread, |
+ base::Unretained(this), base::Passed(&request_info))); |
+ return 0; |
+} |
+ |
+void CronetBidirectionalStream::StartOnNetworkThread( |
+ scoped_ptr<net::BidirectionalStreamRequestInfo> request_info) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ |
+ VLOG(1) << "Starting bidirectional stream: " |
+ << request_info->url.possibly_invalid_spec().c_str(); |
+ |
+ bidi_stream_.reset(new net::BidirectionalStream( |
+ *request_info, net::DEFAULT_PRIORITY, context_->GetURLRequestContext() |
xunjieli
2015/12/18 19:11:11
Priority is now folded into RequestInfo.
mef
2015/12/29 20:36:54
Done.
|
+ ->http_transaction_factory() |
+ ->GetSession(), |
+ this)); |
+} |
+ |
+jboolean CronetBidirectionalStream::ReadData(JNIEnv* env, |
+ jobject jcaller, |
+ jobject jbyte_buffer, |
+ jint jposition, |
+ jint jcapacity) { |
+ DCHECK(!context_->IsOnNetworkThread()); |
+ DCHECK_LT(jposition, jcapacity); |
+ |
+ void* data = env->GetDirectBufferAddress(jbyte_buffer); |
+ if (!data) |
+ return JNI_FALSE; |
+ |
+ scoped_refptr<IOBufferWithByteBuffer> read_buffer( |
+ new IOBufferWithByteBuffer(env, jbyte_buffer, data, jposition)); |
+ |
+ int remaining_capacity = jcapacity - jposition; |
+ |
+ context_->PostTaskToNetworkThread( |
+ FROM_HERE, |
+ base::Bind(&CronetBidirectionalStream::ReadDataOnNetworkThread, |
+ base::Unretained(this), read_buffer, remaining_capacity)); |
+ return JNI_TRUE; |
+} |
+ |
+jboolean CronetBidirectionalStream::WriteData(JNIEnv* env, |
+ jobject jcaller, |
+ jobject jbyte_buffer, |
+ jint jposition, |
+ jint jcapacity, |
+ jboolean jend_of_stream) { |
+ DCHECK(!context_->IsOnNetworkThread()); |
+ DCHECK_LT(jposition, jcapacity); |
+ |
+ void* data = env->GetDirectBufferAddress(jbyte_buffer); |
+ if (!data) |
+ return JNI_FALSE; |
+ |
+ scoped_refptr<IOBufferWithByteBuffer> write_buffer( |
+ new IOBufferWithByteBuffer(env, jbyte_buffer, data, jposition)); |
+ |
+ int remaining_capacity = jcapacity - jposition; |
+ |
+ context_->PostTaskToNetworkThread( |
+ FROM_HERE, |
+ base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread, |
+ base::Unretained(this), write_buffer, remaining_capacity, |
+ jend_of_stream)); |
+ return JNI_TRUE; |
+} |
+ |
+void CronetBidirectionalStream::Destroy(JNIEnv* env, |
+ jobject jcaller, |
+ jboolean jsend_on_canceled) { |
+ // Destroy could be called from any thread, including network thread (if |
+ // posting task to executor throws an exception), but is posted, so |this| |
+ // is valid until calling task is complete. Destroy() is always called from |
+ // within a synchronized java block that guarantees no future posts to the |
+ // network thread with the adapter pointer. |
+ context_->PostTaskToNetworkThread( |
+ FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread, |
+ base::Unretained(this), jsend_on_canceled)); |
+} |
+ |
+base::android::ScopedJavaLocalRef<jstring> |
+CronetBidirectionalStream::GetNegotiatedProtocol(JNIEnv* env, |
+ jobject jcaller) const { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ switch (bidi_stream_->GetProtocol()) { |
+ case net::kProtoHTTP2: |
+ return ConvertUTF8ToJavaString(env, "h2"); |
+ default: |
+ break; |
+ } |
+ NOTREACHED(); |
+ return ConvertUTF8ToJavaString(env, ""); |
+} |
+ |
+// net::BidirectionalStream::Delegate overrides (called on network thread). |
+ |
+void CronetBidirectionalStream::OnHeadersSent() { |
+ VLOG(1) << "OnHeadersSent"; |
+ DCHECK(context_->IsOnNetworkThread()); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onRequestHeadersSent(env, |
+ owner_.obj()); |
+} |
+ |
+void CronetBidirectionalStream::OnHeadersReceived( |
+ const net::SpdyHeaderBlock& response_headers) { |
+ VLOG(1) << "OnHeadersReceived"; |
+ DCHECK(context_->IsOnNetworkThread()); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ // Get http status code from response headers. |
+ jint http_status_code = 0; |
+ const auto http_status_header = response_headers.find(":status"); |
+ if (http_status_header != response_headers.end()) |
+ base::StringToInt(http_status_header->second, &http_status_code); |
+ |
+ cronet::Java_CronetBidirectionalStream_onResponseHeadersReceived( |
+ env, owner_.obj(), http_status_code, |
+ GetHeadersArray(env, response_headers).obj()); |
+} |
+ |
+void CronetBidirectionalStream::OnDataRead(int bytes_read) { |
+ VLOG(1) << "OnDataRead:" << bytes_read; |
+ DCHECK(context_->IsOnNetworkThread()); |
+ jlong received_bytes_count = bidi_stream_->GetTotalReceivedBytes(); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onReadCompleted( |
+ env, owner_.obj(), read_buffer_->byte_buffer(), bytes_read, |
+ read_buffer_->initial_position(), received_bytes_count); |
+ // Free the read buffer. This lets the Java ByteBuffer be freed, if the |
+ // embedder releases it, too. |
+ read_buffer_ = nullptr; |
+} |
+ |
+void CronetBidirectionalStream::OnDataSent() { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onWriteCompleted( |
+ env, owner_.obj(), write_buffer_->byte_buffer(), |
+ write_buffer_->initial_position()); |
+ // Free the write buffer. This lets the Java ByteBuffer be freed, if the |
+ // embedder releases it, too. |
+ write_buffer_ = nullptr; |
+} |
+ |
+void CronetBidirectionalStream::OnTrailersReceived( |
+ const net::SpdyHeaderBlock& response_trailers) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onResponseTrailersReceived( |
+ env, owner_.obj(), GetHeadersArray(env, response_trailers).obj()); |
+} |
+ |
+void CronetBidirectionalStream::OnFailed(int error) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ VLOG(1) << "OnFailed:" << error; |
+ jlong received_bytes_count = bidi_stream_->GetTotalReceivedBytes(); |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onError( |
+ env, owner_.obj(), error, |
+ ConvertUTF8ToJavaString(env, net::ErrorToString(error)).obj(), |
+ received_bytes_count); |
+} |
+ |
+base::android::ScopedJavaLocalRef<jobjectArray> |
+CronetBidirectionalStream::GetHeadersArray( |
+ JNIEnv* env, |
+ const net::SpdyHeaderBlock& header_block) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ |
+ std::vector<std::string> headers; |
+ for (const auto& header : header_block) { |
+ headers.push_back(header.first.as_string()); |
+ headers.push_back(header.second.as_string()); |
+ } |
+ return base::android::ToJavaArrayOfStrings(env, headers); |
+} |
+ |
+void CronetBidirectionalStream::ReadDataOnNetworkThread( |
+ scoped_refptr<IOBufferWithByteBuffer> read_buffer, |
+ int buffer_size) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ DCHECK(read_buffer); |
+ DCHECK(!read_buffer_); |
+ |
+ read_buffer_ = read_buffer; |
+ |
+ int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size); |
+ // If IO is pending, wait for the BidirectionalStream to call OnDataRead. |
+ if (bytes_read == net::ERR_IO_PENDING) |
+ return; |
+ |
+ if (bytes_read < 0) { |
+ OnFailed(bytes_read); |
+ return; |
+ } |
+ OnDataRead(bytes_read); |
+} |
+ |
+void CronetBidirectionalStream::WriteDataOnNetworkThread( |
+ scoped_refptr<IOBufferWithByteBuffer> write_buffer, |
+ int buffer_size, |
+ bool end_of_stream) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ DCHECK(write_buffer); |
+ DCHECK(!write_buffer_); |
+ |
+ write_buffer_ = write_buffer; |
+ bidi_stream_->SendData(write_buffer_.get(), buffer_size, end_of_stream); |
+} |
+ |
+void CronetBidirectionalStream::DestroyOnNetworkThread(bool send_on_canceled) { |
+ DCHECK(context_->IsOnNetworkThread()); |
+ if (send_on_canceled) { |
+ JNIEnv* env = base::android::AttachCurrentThread(); |
+ cronet::Java_CronetBidirectionalStream_onCanceled(env, owner_.obj()); |
+ } |
+ delete this; |
+} |
+ |
+} // namespace cronet |