OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "blimp/net/grpc_stream.h" |
| 6 |
| 7 #include <memory> |
| 8 #include <utility> |
| 9 |
| 10 #include "base/bind_helpers.h" |
| 11 #include "base/callback.h" |
| 12 #include "base/callback_helpers.h" |
| 13 #include "base/command_line.h" |
| 14 #include "base/memory/ptr_util.h" |
| 15 #include "base/memory/ref_counted.h" |
| 16 #include "base/synchronization/lock.h" |
| 17 #include "base/threading/thread.h" |
| 18 #include "base/threading/thread_restrictions.h" |
| 19 #include "base/threading/thread_task_runner_handle.h" |
| 20 |
| 21 namespace blimp { |
| 22 |
| 23 // GrpcStream::SharedData code. It's intended to be used as a simple |
| 24 // data-structure. |
| 25 GrpcStream::SharedData::SharedData() |
| 26 : callback_task_runner(base::ThreadTaskRunnerHandle::Get()), |
| 27 event_count(0), |
| 28 grpc_thread(new base::Thread("GrpcThread")) {} |
| 29 |
| 30 GrpcStream::SharedData::~SharedData() { |
| 31 // When the shared data is being destructed, the gRPC thread object is being |
| 32 // explicitly relinquished beforehand so as to not have any circular |
| 33 // references. |
| 34 DCHECK_EQ(nullptr, grpc_thread); |
| 35 |
| 36 // The thread checker is purely for verification inside the completion queue |
| 37 // thread. |
| 38 DCHECK_EQ(nullptr, grpc_thread_checker.get()); |
| 39 } |
| 40 |
| 41 // A wrapper class around a gRPC tag targeted for use by the completion queue. |
| 42 struct GrpcTag { |
| 43 public: |
| 44 GrpcTag(); |
| 45 ~GrpcTag(); |
| 46 |
| 47 // This method calls the correct callback based on the given tag-type. |
| 48 void ApplyCallback(bool ok); |
| 49 |
| 50 GrpcStream::TagType tag_type; |
| 51 |
| 52 // For initial connection. |
| 53 net::CompletionCallback connection_cb; |
| 54 |
| 55 // For sending. |
| 56 Stream::SendMessageCallback sent_cb; |
| 57 |
| 58 // For receiving. |
| 59 std::unique_ptr<HeliumWrapper> received_msg; |
| 60 Stream::ReceiveMessageCallback received_cb; |
| 61 |
| 62 base::ThreadChecker thread_checker; |
| 63 |
| 64 friend class GrpcStream; |
| 65 }; |
| 66 |
| 67 GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) { |
| 68 DVLOG(3) << "Created Tag " << this; |
| 69 } |
| 70 |
| 71 // Applies the callback (on the IO thread). |
| 72 void GrpcTag::ApplyCallback(bool ok) { |
| 73 DCHECK(thread_checker.CalledOnValidThread()); |
| 74 Result status_code = |
| 75 (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR); |
| 76 DVLOG(3) << "Processing GrpcTag (on IO thread): " |
| 77 << static_cast<int>(tag_type) |
| 78 << " status_code = " << static_cast<int>(status_code); |
| 79 switch (tag_type) { |
| 80 case GrpcStream::TagType::CONNECT: |
| 81 DVLOG(1) << "gRPC Stream is now setup."; |
| 82 if (!connection_cb.is_null()) { |
| 83 base::ResetAndReturn(&connection_cb).Run(status_code); |
| 84 } |
| 85 break; |
| 86 case GrpcStream::TagType::READ: |
| 87 DVLOG(1) << "Received Helium Message = " |
| 88 << "; size = " << received_msg->ByteSize(); |
| 89 |
| 90 base::ResetAndReturn(&received_cb) |
| 91 .Run(std::move(received_msg), status_code); |
| 92 break; |
| 93 case GrpcStream::TagType::WRITE: |
| 94 // TODO(perumaal): Apply correct helium results here. |
| 95 base::ResetAndReturn(&sent_cb).Run(status_code); |
| 96 break; |
| 97 default: |
| 98 LOG(FATAL) << "GrpcTag has invalid tag type : " |
| 99 << static_cast<int>(tag_type) << "; this = " << this; |
| 100 } |
| 101 } |
| 102 |
| 103 GrpcTag::~GrpcTag() { |
| 104 DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this)); |
| 105 } |
| 106 |
| 107 // GrpcStream class. |
| 108 GrpcStream::GrpcStream() : weak_factory_(this) {} |
| 109 |
| 110 GrpcStream::~GrpcStream() {} |
| 111 |
| 112 // Various ways of creating tags. Note that the tag is an opaque structure for |
| 113 // the users of GrpcStream including the sub-classes so all the implementation |
| 114 // is self-contained in the cc file without exposing the details. All the tags |
| 115 // must be created on the same thread as the |GrpcStream| - i.e. IO thread. |
| 116 |
| 117 // Tag used during initial connection setup. |
| 118 GrpcTag* GrpcStream::ConnectTag( |
| 119 const net::CompletionCallback& connection_callback) { |
| 120 DCHECK(thread_checker_.CalledOnValidThread()); |
| 121 GrpcTag* tag = new GrpcTag(); |
| 122 tag->tag_type = GrpcStream::TagType::CONNECT; |
| 123 tag->connection_cb = connection_callback; |
| 124 return tag; |
| 125 } |
| 126 |
| 127 // Tag that delivers a read-message from the completion queue. |
| 128 GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb, |
| 129 HeliumWrapper** received_msg) { |
| 130 DCHECK(thread_checker_.CalledOnValidThread()); |
| 131 GrpcTag* tag = new GrpcTag(); |
| 132 tag->tag_type = GrpcStream::TagType::READ; |
| 133 tag->received_msg = base::MakeUnique<HeliumWrapper>(); |
| 134 tag->received_cb = received_cb; |
| 135 *received_msg = tag->received_msg.get(); |
| 136 return tag; |
| 137 } |
| 138 |
| 139 // Tag that indicates when a message has been sent (i.e. in the completion |
| 140 // queue). |
| 141 GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) { |
| 142 DCHECK(thread_checker_.CalledOnValidThread()); |
| 143 GrpcTag* tag = new GrpcTag(); |
| 144 tag->tag_type = GrpcStream::TagType::WRITE; |
| 145 tag->sent_cb = sent_cb; |
| 146 return tag; |
| 147 } |
| 148 |
| 149 // Starts the completion queue with the provided |shared_data| that is owned by |
| 150 // both the completion queue thread and the IO thread. |
| 151 void GrpcStream::StartCompletionQueueThread( |
| 152 scoped_refptr<SharedData> shared_data) { |
| 153 if (shared_data->grpc_thread == nullptr) { |
| 154 LOG(FATAL) << "Completion queue thread can only be started exactly once."; |
| 155 return; |
| 156 } |
| 157 |
| 158 base::Thread::Options options; |
| 159 options.message_loop_type = base::MessageLoop::TYPE_IO; |
| 160 shared_data->grpc_thread->StartWithOptions(options); |
| 161 shared_data->grpc_thread->task_runner()->PostTask( |
| 162 FROM_HERE, |
| 163 base::Bind(&GrpcStream::CompletionQueueThread, |
| 164 base::Passed(std::move(shared_data)), TagType::UNKNOWN)); |
| 165 } |
| 166 |
| 167 // Ensures that the given "tag-type to tag-type transition" is OK. |
| 168 /* static */ |
| 169 GrpcStream::TagType GrpcStream::CheckTagType( |
| 170 GrpcStream::TagType current_tag_type, |
| 171 GrpcStream::TagType previous_tag_type) { |
| 172 switch (current_tag_type) { |
| 173 case TagType::CONNECT: |
| 174 DCHECK_EQ(TagType::UNKNOWN, previous_tag_type); |
| 175 break; |
| 176 case TagType::WRITE: |
| 177 case TagType::READ: |
| 178 DCHECK(previous_tag_type != TagType::UNKNOWN); |
| 179 break; |
| 180 default: |
| 181 LOG(FATAL) << "Unrecognized tag-type transition; current: " |
| 182 << static_cast<int>(current_tag_type) |
| 183 << "; previous: " << static_cast<int>(previous_tag_type); |
| 184 break; |
| 185 } |
| 186 return current_tag_type; |
| 187 } |
| 188 |
| 189 // This is the core Completion queue thread that processes gRPC tasks and |
| 190 // delivers callbacks in the right callback thread (i.e. IO thread). This will |
| 191 // continue scheduling further tasks until (a) either the completion queue is no |
| 192 // longer active or (b) the completion queue encounters a problem (i.e. it's |
| 193 // shutting down). |
| 194 /* static */ |
| 195 void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data, |
| 196 TagType previous_tag_type) { |
| 197 base::ThreadRestrictions::SetIOAllowed(true); |
| 198 |
| 199 if (shared_data->grpc_thread_checker == nullptr) { |
| 200 shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>(); |
| 201 } |
| 202 |
| 203 // Create once in the completion queue thread and ensure we always get called |
| 204 // into the same thread until the completion queue is shutdown. |
| 205 DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread()); |
| 206 bool ok = false; |
| 207 void* tag = nullptr; |
| 208 DVLOG(3) << "Waiting for next event (grpc stream; count so far = " |
| 209 << shared_data->event_count << ")."; |
| 210 ++shared_data->event_count; |
| 211 scoped_refptr<base::SingleThreadTaskRunner> task_runner = |
| 212 base::ThreadTaskRunnerHandle::Get(); |
| 213 |
| 214 // This is a blocking call. Waits until (a) either a tag is available for |
| 215 // processing or (b) when the completion queue is being shutdown. |
| 216 if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag), |
| 217 &ok)) { |
| 218 if (tag != nullptr) { |
| 219 std::unique_ptr<GrpcTag> grpc_tag = |
| 220 base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag)); |
| 221 if (shared_data->callback_task_runner != nullptr) { |
| 222 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " |
| 223 << static_cast<int>(grpc_tag->tag_type) |
| 224 << "; Result: " << (ok ? "OK" : "ERROR"); |
| 225 previous_tag_type = |
| 226 GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type); |
| 227 shared_data->callback_task_runner->PostTask( |
| 228 FROM_HERE, |
| 229 base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok)); |
| 230 } else { |
| 231 LOG(ERROR) << "Unable to trigger gRPC callbacks."; |
| 232 } |
| 233 } |
| 234 |
| 235 if (!ok) { |
| 236 DVLOG(3) << "Tag " << tag |
| 237 << " had problems. Continuing to process remaining tags."; |
| 238 } |
| 239 task_runner->PostTask( |
| 240 FROM_HERE, |
| 241 base::Bind(&GrpcStream::CompletionQueueThread, |
| 242 base::Passed(std::move(shared_data)), previous_tag_type)); |
| 243 return; |
| 244 } else { |
| 245 LOG(ERROR) << "Completion queue thread has no more events."; |
| 246 } |
| 247 |
| 248 // We reach here once the completion queue has shutdown or when there is an |
| 249 // unrecoverable error. Destroy the thread and the shared_data now. |
| 250 |
| 251 // Ensure that the |shared_data| thread checker is destructed in the same |
| 252 // thread as it was created in. |
| 253 shared_data->grpc_thread_checker = nullptr; |
| 254 |
| 255 // TODO(perumaal): Figure out how to clean up the thread cleanly. |
| 256 shared_data->grpc_thread = nullptr; |
| 257 } |
| 258 |
| 259 } // namespace blimp |
OLD | NEW |