OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include <grpc++/channel.h> |
| 6 #include <grpc++/security/credentials.h> |
| 7 #include <grpc++/security/server_credentials.h> |
| 8 #include <grpc++/server.h> |
| 9 #include <grpc++/server_builder.h> |
| 10 #include <grpc++/server_context.h> |
| 11 #include <grpc/grpc.h> |
| 12 |
| 13 #include <utility> |
| 14 |
| 15 #include "base/bind_helpers.h" |
| 16 #include "base/callback.h" |
| 17 #include "blimp/common/logging.h" |
| 18 #include "blimp/common/proto/helium_service.grpc.pb.h" |
| 19 #include "blimp/common/public/session/assignment_options.h" |
| 20 #include "blimp/net/grpc_connection.h" |
| 21 #include "blimp/net/grpc_engine_stream.h" |
| 22 |
| 23 namespace blimp { |
| 24 |
| 25 // Data-only struct for handling cross-thread gRPC objects. |
| 26 struct GrpcEngineStream::EngineSharedData : public GrpcStream::SharedData { |
| 27 public: |
| 28 // The gRPC service that provides the streaming, asynchronous API. |
| 29 HeliumService::AsyncService service; |
| 30 |
| 31 // The gRPC server that listens to a specified port/file-descriptor. |
| 32 std::unique_ptr<grpc::Server> server; |
| 33 |
| 34 // The context that is used by the service/server as well as the completion |
| 35 // queue. |
| 36 grpc::ServerContext context; |
| 37 |
| 38 // This is the actual asynchronous API used to stream bidirectional |
| 39 // |HeliumMessage|s to and from a client. |
| 40 std::unique_ptr<grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> |
| 41 stream; |
| 42 |
| 43 protected: |
| 44 ~EngineSharedData() override { |
| 45 DVLOG(3) << "Engine stream shared data is now destroyed."; |
| 46 }; |
| 47 friend class base::RefCountedThreadSafe<EngineSharedData>; |
| 48 }; |
| 49 |
| 50 GrpcEngineStream::GrpcEngineStream( |
| 51 const AssignmentOptions& assignment_options, |
| 52 const net::CompletionCallback& connection_callback) |
| 53 : GrpcStream(), assignment_options_(assignment_options) { |
| 54 // First start the gRPC server and add a completion queue. |
| 55 shared_data_ = scoped_refptr<EngineSharedData>(new EngineSharedData()); |
| 56 grpc::ServerBuilder builder; |
| 57 |
| 58 int selected_port = -1; |
| 59 builder.AddListeningPort(assignment_options.engine_endpoint.ToString(), |
| 60 grpc::InsecureServerCredentials(), &selected_port); |
| 61 builder.RegisterService(&shared_data_->service); |
| 62 |
| 63 std::unique_ptr<grpc::ServerCompletionQueue> completion_queue = |
| 64 builder.AddCompletionQueue(); |
| 65 |
| 66 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 |
| 67 shared_data_->server = builder.BuildAndStart(); |
| 68 |
| 69 // Update our actual listening engine ip-address/port number. |
| 70 assignment_options_.engine_endpoint = net::IPEndPoint( |
| 71 assignment_options_.engine_endpoint.address(), selected_port); |
| 72 |
| 73 // Now setup a completion queue thread that processes tasks (tags) from the |
| 74 // completion queue. |
| 75 shared_data_->stream = base::MakeUnique< |
| 76 grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>( |
| 77 &shared_data_->context); |
| 78 shared_data_->service.RequestStream( |
| 79 &shared_data_->context, shared_data_->stream.get(), |
| 80 completion_queue.get(), completion_queue.get(), |
| 81 reinterpret_cast<void*>(ConnectTag(connection_callback))); |
| 82 shared_data_->completion_queue = std::move(completion_queue); |
| 83 StartCompletionQueueThread(shared_data_); |
| 84 DVLOG(3) << "Starting engine stream @ " |
| 85 << assignment_options_.engine_endpoint.ToString(); |
| 86 } |
| 87 |
| 88 const AssignmentOptions& GrpcEngineStream::GetAssignmentOptions() const { |
| 89 return assignment_options_; |
| 90 } |
| 91 |
| 92 void GrpcEngineStream::SendMessage( |
| 93 std::unique_ptr<HeliumWrapper> helium_message, |
| 94 const Stream::SendMessageCallback& sent_cb) { |
| 95 DCHECK(thread_checker_.CalledOnValidThread()); |
| 96 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; |
| 97 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); |
| 98 |
| 99 shared_data_->stream->Write(*helium_message.get(), |
| 100 reinterpret_cast<void*>(WriteTag(sent_cb))); |
| 101 } |
| 102 |
| 103 void GrpcEngineStream::ReceiveMessage( |
| 104 const Stream::ReceiveMessageCallback& received_cb) { |
| 105 DCHECK(thread_checker_.CalledOnValidThread()); |
| 106 DVLOG(3) << "Receiving (waiting for callback)"; |
| 107 HeliumWrapper* received_msg = nullptr; |
| 108 void* tag = reinterpret_cast<void*>(ReadTag(received_cb, &received_msg)); |
| 109 DCHECK(received_msg != nullptr); |
| 110 shared_data_->stream->Read(received_msg, tag); |
| 111 } |
| 112 |
| 113 GrpcEngineStream::~GrpcEngineStream() { |
| 114 DCHECK(thread_checker_.CalledOnValidThread()); |
| 115 |
| 116 // The order here is important! Completion queue *must* be shutdown *after* |
| 117 // the server shutdown. The unit-tests verify this aspect by tearing down the |
| 118 // engine stream explicitly. |
| 119 DVLOG(3) << "Engine stream shutting down."; |
| 120 |
| 121 // It's important to not block the IO thread simply because the server is |
| 122 // shutting down. |
| 123 gpr_timespec deadline = gpr_time_from_millis(0, GPR_CLOCK_MONOTONIC); |
| 124 shared_data_->server->Shutdown(deadline); |
| 125 shared_data_->completion_queue->Shutdown(); |
| 126 DVLOG(3) << "Engine stream shut down finished."; |
| 127 } |
| 128 } // namespace blimp |
OLD | NEW |