OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include <grpc++/channel.h> | |
6 #include <grpc++/client_context.h> | |
7 #include <grpc++/create_channel.h> | |
8 #include <grpc++/security/credentials.h> | |
9 #include <grpc/grpc.h> | |
10 | |
11 #include "base/bind_helpers.h" | |
12 #include "blimp/net/grpc_client_stream.h" | |
13 | |
14 namespace blimp { | |
15 | |
16 // Data-only struct for handling cross-thread gRPC objects. | |
17 struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData { | |
18 public: | |
19 // This stub implements the gRPC service. We use this to strema data to | |
Garrett Casto
2016/11/11 00:15:53
Nit: strema -> stream.
| |
20 // and from the server (engine). | |
21 std::unique_ptr<HeliumService::Stub> stub; | |
22 | |
23 // This is the specific Stream API exposed as part of our gRPC service. | |
24 std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> | |
25 stream; | |
26 | |
27 // This is the context bound to both the stub and the stream. This must be | |
28 // destroyed only after the stream is no longer in use. | |
29 grpc::ClientContext context; | |
30 | |
31 protected: | |
32 ~ClientSharedData() override { | |
33 // The destructor can be called from either the IO thread or the completion | |
34 // queue thread. | |
35 DVLOG(3) << "Client stream shared data is now destroyed."; | |
36 }; | |
37 friend class base::RefCountedThreadSafe<ClientSharedData>; | |
38 }; | |
39 | |
40 // Sets up the gRPC client stream and connects to the specified assignment | |
41 // options. | |
42 GrpcClientStream::GrpcClientStream( | |
43 const AssignmentOptions& assignment_options, | |
44 const net::CompletionCallback& connection_callback) | |
45 : GrpcStream(), | |
46 assignment_options_(assignment_options), | |
47 shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) { | |
48 // First establish a stream to the engine (asynchronously establishing a | |
49 // connection as well). | |
50 shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>(); | |
51 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 | |
52 shared_data_->stub = HeliumService::NewStub( | |
53 grpc::CreateChannel(assignment_options.engine_endpoint.ToString(), | |
54 grpc::InsecureChannelCredentials())); | |
55 shared_data_->stream = shared_data_->stub->AsyncStream( | |
56 &shared_data_->context, shared_data_->completion_queue.get(), | |
57 ConnectTag(connection_callback)); | |
58 | |
59 // Now start the completion queue passing on a shared ownership of the | |
60 // |shared_data_| until one of the two threads are killed. | |
61 StartCompletionQueueThread(shared_data_); | |
62 | |
63 DVLOG(3) << "Starting client stream @ " | |
64 << assignment_options.engine_endpoint.ToString(); | |
65 } | |
66 | |
67 void GrpcClientStream::SendMessage( | |
68 std::unique_ptr<HeliumWrapper> helium_message, | |
69 const Stream::SendMessageCallback& sent_cb) { | |
70 DCHECK(thread_checker_.CalledOnValidThread()); | |
71 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; | |
72 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); | |
73 shared_data_->stream->Write(*helium_message.get(), | |
74 reinterpret_cast<void*>(WriteTag(sent_cb))); | |
75 } | |
76 | |
77 void GrpcClientStream::ReceiveMessage( | |
78 const Stream::ReceiveMessageCallback& received_cb) { | |
79 DCHECK(thread_checker_.CalledOnValidThread()); | |
80 HeliumWrapper* received_msg = nullptr; | |
81 auto tag = ReadTag(received_cb, &received_msg); | |
82 DVLOG(3) << "Receiving (waiting for callback)"; | |
83 DCHECK(received_msg != nullptr); | |
84 shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag)); | |
85 } | |
86 | |
87 // The destructor for the client stream should be called from the IO thread. | |
88 GrpcClientStream::~GrpcClientStream() { | |
89 DCHECK(thread_checker_.CalledOnValidThread()); | |
90 DVLOG(3) << "Destroying GrpcClientStream"; | |
91 shared_data_->completion_queue->Shutdown(); | |
92 } | |
93 | |
94 } // namespace blimp | |
OLD | NEW |