Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "blimp/net/grpc_stream.h" | |
| 6 | |
| 7 #include "base/bind_helpers.h" | |
| 8 #include "base/callback.h" | |
| 9 #include "base/callback_helpers.h" | |
| 10 #include "base/command_line.h" | |
| 11 #include "base/memory/ptr_util.h" | |
| 12 #include "base/synchronization/lock.h" | |
| 13 #include "base/threading/thread.h" | |
| 14 #include "base/threading/thread_restrictions.h" | |
| 15 #include "base/threading/thread_task_runner_handle.h" | |
| 16 | |
| 17 #include "content/public/browser/browser_thread.h" | |
| 18 | |
| 19 namespace blimp { | |
| 20 | |
| 21 GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback) | |
| 22 : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()), | |
| 23 connection_callback_(connection_callback), | |
| 24 grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")), | |
| 25 weak_factory_(this) {} | |
| 26 | |
| 27 GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() { | |
| 28 return GrpcTag::Connect(connection_callback_); | |
| 29 } | |
| 30 | |
| 31 GrpcStream::GrpcTag* GrpcStream::GrpcReadTag( | |
| 32 const HeliumMessageReceivedCb& received_cb) { | |
| 33 return GrpcTag::Read(received_cb); | |
| 34 } | |
| 35 | |
| 36 GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag( | |
| 37 const HeliumMessageSentCb& sent_cb) { | |
| 38 return GrpcTag::Write(sent_cb); | |
| 39 } | |
| 40 | |
| 41 GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() { | |
| 42 return GrpcTag::WritesDone(); | |
| 43 } | |
| 44 | |
| 45 void GrpcStream::StartCompletionQueueThread( | |
| 46 grpc::CompletionQueue* completion_q) { | |
| 47 if (grpc_thread_ == nullptr) { | |
| 48 LOG(FATAL) << "Completion queue thread can only be started exactly once."; | |
| 49 return; | |
| 50 } | |
| 51 | |
| 52 base::Thread::Options options; | |
| 53 // options.joinable = false; | |
| 54 grpc_thread_->StartWithOptions(options); | |
| 55 grpc_thread_.release()->task_runner()->PostTask( | |
| 56 FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q, | |
| 57 callback_task_runner_)); | |
| 58 } | |
| 59 | |
| 60 void GrpcStream::CompletionQueueThread( | |
| 61 grpc::CompletionQueue* completion_q, | |
| 62 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) { | |
| 63 base::ThreadRestrictions::SetIOAllowed(true); | |
| 64 | |
| 65 DVLOG(3) << "Starting Completion Queue thread."; | |
| 66 static int event_count = 0; | |
| 67 bool ok = false; | |
| 68 void* tag = nullptr; | |
| 69 DVLOG(3) << "Waiting for next event (grpc stream; count so far = " | |
| 70 << event_count << ")."; | |
| 71 ++event_count; | |
| 72 | |
| 73 if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) { | |
| 74 if (tag != nullptr) { | |
| 75 auto grpc_tag = reinterpret_cast<GrpcTag*>(tag); | |
| 76 if (callback_task_runner != nullptr) { | |
| 77 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " | |
| 78 << static_cast<int>(grpc_tag->tag_type_) | |
| 79 << "; Result: " << (ok ? "OK" : "ERROR"); | |
| 80 callback_task_runner->PostTask( | |
| 81 FROM_HERE, | |
| 82 base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread, | |
| 83 base::Passed(base::WrapUnique(grpc_tag)), ok)); | |
| 84 } else { | |
| 85 LOG(ERROR) << "Unable to trigger gRPC callbacks."; | |
| 86 delete grpc_tag; | |
| 87 } | |
| 88 } else { | |
| 89 LOG(ERROR) << "Completion queue shutting down; tag = " << tag; | |
| 90 } | |
| 91 | |
| 92 if (ok) { | |
| 93 auto task_runner = base::ThreadTaskRunnerHandle::Get(); | |
| 94 if (task_runner != nullptr) { | |
| 95 task_runner->PostTask(FROM_HERE, | |
| 96 base::Bind(&GrpcStream::CompletionQueueThread, | |
| 97 completion_q, callback_task_runner)); | |
| 98 } | |
| 99 } else { | |
| 100 LOG(INFO) << "Tag " << tag << " had problems. Exiting."; | |
| 101 } | |
| 102 } else { | |
| 103 LOG(ERROR) << "Completion queue thread has no more events."; | |
| 104 } | |
| 105 } | |
| 106 | |
| 107 GrpcStream::~GrpcStream() { | |
| 108 callback_task_runner_ = nullptr; | |
| 109 } | |
| 110 | |
| 111 GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) { | |
| 112 DVLOG(3) << "Created Tag " << this; | |
| 113 } | |
| 114 | |
| 115 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect( | |
| 116 const net::CompletionCallback& connection_cb) { | |
| 117 auto tag = new GrpcStream::GrpcTag(); | |
| 118 tag->tag_type_ = TagType::CONNECT; | |
| 119 tag->connection_cb_ = connection_cb; | |
| 120 return tag; | |
| 121 } | |
| 122 | |
| 123 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write( | |
| 124 const GrpcStream::HeliumMessageSentCb& callback) { | |
| 125 auto tag = new GrpcStream::GrpcTag(); | |
| 126 tag->tag_type_ = TagType::WRITE; | |
| 127 tag->sent_cb_ = callback; | |
| 128 return tag; | |
| 129 } | |
| 130 | |
| 131 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read( | |
| 132 const GrpcStream::HeliumMessageReceivedCb& callback) { | |
| 133 auto tag = new GrpcStream::GrpcTag(); | |
| 134 tag->tag_type_ = TagType::READ; | |
| 135 tag->received_msg_ = base::MakeUnique<HeliumWrapper>(); | |
| 136 tag->received_cb_ = callback; | |
| 137 return tag; | |
| 138 } | |
| 139 | |
| 140 GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() { | |
| 141 auto tag = new GrpcStream::GrpcTag(); | |
| 142 tag->tag_type_ = TagType::WRITES_DONE; | |
| 143 return tag; | |
| 144 } | |
| 145 | |
| 146 HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const { | |
| 147 CHECK_EQ(TagType::READ, tag_type_); | |
| 148 return received_msg_.get(); | |
| 149 } | |
| 150 | |
| 151 void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) { | |
| 152 auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN); | |
| 153 DVLOG(3) << "Processing GrpcTag (on IO thread): " | |
| 154 << static_cast<int>(tag_type_) | |
| 155 << " status_code = " << static_cast<int>(status_code); | |
| 156 switch (tag_type_) { | |
| 157 case TagType::CONNECT: | |
| 158 DVLOG(1) << "gRPC Stream is now setup."; | |
| 159 if (!connection_cb_.is_null()) { | |
| 160 base::ResetAndReturn(&connection_cb_).Run(status_code); | |
| 161 } | |
| 162 break; | |
| 163 case TagType::READ: | |
| 164 DVLOG(1) << "Received Helium Message = " | |
| 165 << "; size = " << received_msg_->ByteSize(); | |
| 166 | |
| 167 base::ResetAndReturn(&received_cb_) | |
| 168 .Run(std::move(received_msg_), status_code); | |
| 169 break; | |
| 170 case TagType::WRITE: | |
| 171 // TODO(perumaal): Apply correct helium results here. | |
| 172 base::ResetAndReturn(&sent_cb_).Run(status_code); | |
| 173 break; | |
| 174 case TagType::WRITES_DONE: | |
| 175 break; | |
| 176 default: | |
|
Kevin M
2016/10/31 21:33:25
General point: don't use default, so that accident
| |
| 177 LOG(FATAL) << "GrpcTag has invalid tag type : " | |
| 178 << static_cast<int>(tag_type_) << "; this = " << this; | |
| 179 } | |
| 180 } | |
| 181 | |
| 182 GrpcStream::GrpcTag::~GrpcTag() { | |
| 183 DVLOG(3) << "Deleting tag " << ((void*)this); | |
| 184 } | |
| 185 | |
| 186 } // namespace blimp | |
| OLD | NEW |