OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "blimp/net/grpc_stream.h" |
| 6 |
| 7 #include "base/bind_helpers.h" |
| 8 #include "base/callback.h" |
| 9 #include "base/callback_helpers.h" |
| 10 #include "base/command_line.h" |
| 11 #include "base/memory/ptr_util.h" |
| 12 #include "base/memory/ref_counted.h" |
| 13 #include "base/synchronization/lock.h" |
| 14 #include "base/threading/thread.h" |
| 15 #include "base/threading/thread_restrictions.h" |
| 16 #include "base/threading/thread_task_runner_handle.h" |
| 17 |
| 18 #include "content/public/browser/browser_thread.h" |
| 19 |
| 20 namespace blimp { |
| 21 |
| 22 GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback) |
| 23 : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 24 connection_callback_(connection_callback), |
| 25 grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")), |
| 26 weak_factory_(this) {} |
| 27 |
| 28 GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() { |
| 29 return GrpcTag::Connect(connection_callback_); |
| 30 } |
| 31 |
| 32 GrpcStream::GrpcTag* GrpcStream::GrpcReadTag( |
| 33 const Stream::ReceiveMessageCallback& received_cb) { |
| 34 return GrpcTag::Read(received_cb); |
| 35 } |
| 36 |
| 37 GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag( |
| 38 const Stream::SendMessageCallback& sent_cb) { |
| 39 return GrpcTag::Write(sent_cb); |
| 40 } |
| 41 |
| 42 GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() { |
| 43 return GrpcTag::WritesDone(); |
| 44 } |
| 45 |
| 46 void GrpcStream::StartCompletionQueueThread( |
| 47 grpc::CompletionQueue* completion_q) { |
| 48 if (grpc_thread_ == nullptr) { |
| 49 LOG(FATAL) << "Completion queue thread can only be started exactly once."; |
| 50 return; |
| 51 } |
| 52 |
| 53 base::Thread::Options options; |
| 54 grpc_thread_->StartWithOptions(options); |
| 55 grpc_thread_.release()->task_runner()->PostTask( |
| 56 FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q, |
| 57 callback_task_runner_)); |
| 58 } |
| 59 |
| 60 void GrpcStream::CompletionQueueThread( |
| 61 grpc::CompletionQueue* completion_q, |
| 62 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) { |
| 63 base::ThreadRestrictions::SetIOAllowed(true); |
| 64 |
| 65 DVLOG(3) << "Starting Completion Queue thread."; |
| 66 static int event_count = 0; |
| 67 bool ok = false; |
| 68 void* tag = nullptr; |
| 69 DVLOG(3) << "Waiting for next event (grpc stream; count so far = " |
| 70 << event_count << ")."; |
| 71 ++event_count; |
| 72 |
| 73 if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) { |
| 74 if (tag != nullptr) { |
| 75 GrpcTag* grpc_tag = reinterpret_cast<GrpcTag*>(tag); |
| 76 if (callback_task_runner != nullptr) { |
| 77 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " |
| 78 << static_cast<int>(grpc_tag->tag_type_) |
| 79 << "; Result: " << (ok ? "OK" : "ERROR"); |
| 80 callback_task_runner->PostTask( |
| 81 FROM_HERE, |
| 82 base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread, |
| 83 base::Passed(base::WrapUnique(grpc_tag)), ok)); |
| 84 } else { |
| 85 LOG(ERROR) << "Unable to trigger gRPC callbacks."; |
| 86 delete grpc_tag; |
| 87 } |
| 88 } else { |
| 89 LOG(ERROR) << "Completion queue shutting down; tag = " << tag; |
| 90 } |
| 91 |
| 92 if (ok) { |
| 93 scoped_refptr<base::SingleThreadTaskRunner> task_runner = |
| 94 base::ThreadTaskRunnerHandle::Get(); |
| 95 if (task_runner != nullptr) { |
| 96 task_runner->PostTask(FROM_HERE, |
| 97 base::Bind(&GrpcStream::CompletionQueueThread, |
| 98 completion_q, callback_task_runner)); |
| 99 } |
| 100 } else { |
| 101 LOG(INFO) << "Tag " << tag << " had problems. Exiting."; |
| 102 } |
| 103 } else { |
| 104 LOG(ERROR) << "Completion queue thread has no more events."; |
| 105 } |
| 106 } |
| 107 |
| 108 GrpcStream::~GrpcStream() { callback_task_runner_ = nullptr; } |
| 109 |
| 110 GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) { |
| 111 DVLOG(3) << "Created Tag " << this; |
| 112 } |
| 113 |
| 114 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect( |
| 115 const net::CompletionCallback& connection_cb) { |
| 116 GrpcTag* tag = new GrpcStream::GrpcTag(); |
| 117 tag->tag_type_ = TagType::CONNECT; |
| 118 tag->connection_cb_ = connection_cb; |
| 119 return tag; |
| 120 } |
| 121 |
| 122 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write( |
| 123 const Stream::SendMessageCallback& callback) { |
| 124 GrpcTag* tag = new GrpcStream::GrpcTag(); |
| 125 tag->tag_type_ = TagType::WRITE; |
| 126 tag->sent_cb_ = callback; |
| 127 return tag; |
| 128 } |
| 129 |
| 130 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read( |
| 131 const Stream::ReceiveMessageCallback& callback) { |
| 132 GrpcTag* tag = new GrpcStream::GrpcTag(); |
| 133 tag->tag_type_ = TagType::READ; |
| 134 tag->received_msg_ = base::MakeUnique<HeliumWrapper>(); |
| 135 tag->received_cb_ = callback; |
| 136 return tag; |
| 137 } |
| 138 |
| 139 GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() { |
| 140 GrpcTag* tag = new GrpcStream::GrpcTag(); |
| 141 tag->tag_type_ = TagType::WRITES_DONE; |
| 142 return tag; |
| 143 } |
| 144 |
| 145 HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const { |
| 146 CHECK_EQ(TagType::READ, tag_type_); |
| 147 return received_msg_.get(); |
| 148 } |
| 149 |
| 150 void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) { |
| 151 Result status_code = |
| 152 (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR); |
| 153 DVLOG(3) << "Processing GrpcTag (on IO thread): " |
| 154 << static_cast<int>(tag_type_) |
| 155 << " status_code = " << static_cast<int>(status_code); |
| 156 switch (tag_type_) { |
| 157 case TagType::CONNECT: |
| 158 DVLOG(1) << "gRPC Stream is now setup."; |
| 159 if (!connection_cb_.is_null()) { |
| 160 base::ResetAndReturn(&connection_cb_).Run(status_code); |
| 161 } |
| 162 break; |
| 163 case TagType::READ: |
| 164 DVLOG(1) << "Received Helium Message = " |
| 165 << "; size = " << received_msg_->ByteSize(); |
| 166 |
| 167 base::ResetAndReturn(&received_cb_) |
| 168 .Run(std::move(received_msg_), status_code); |
| 169 break; |
| 170 case TagType::WRITE: |
| 171 // TODO(perumaal): Apply correct helium results here. |
| 172 base::ResetAndReturn(&sent_cb_).Run(status_code); |
| 173 break; |
| 174 case TagType::WRITES_DONE: |
| 175 break; |
| 176 default: |
| 177 LOG(FATAL) << "GrpcTag has invalid tag type : " |
| 178 << static_cast<int>(tag_type_) << "; this = " << this; |
| 179 } |
| 180 } |
| 181 |
| 182 GrpcStream::GrpcTag::~GrpcTag() { |
| 183 DVLOG(3) << "Deleting tag " << ((void*)this); |
| 184 } |
| 185 |
| 186 } // namespace blimp |
OLD | NEW |