Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(778)

Side by Side Diff: blimp/net/grpc_connection.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/grpc_connection.h ('k') | blimp/net/grpc_connection_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <utility>
6
7 #include "base/callback.h"
8 #include "base/callback_helpers.h"
9 #include "base/macros.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/threading/thread_task_runner_handle.h"
12 #include "blimp/common/logging.h"
13 #include "blimp/common/proto/blimp_message.pb.h"
14 #include "blimp/net/blimp_message_processor.h"
15 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/grpc_connection.h"
17 #include "blimp/net/grpc_stream.h"
18 #include "net/base/net_errors.h"
19
20 namespace blimp {
21
22 int ConvertHeliumResult(helium::Result result) {
23 return (result == helium::Result::SUCCESS ? 0 : -1);
24 }
25
26 // NOTE: This entire class/file is going away soon.
27 // This set of classes is a thin wrapper around the new HeliumStream-like
28 // GrpcStream.
29
30 // Forwards a |BlimpMessage| from the client to the Engine via gRPC after
31 // converting it to a |HeliumWrapper| first.
32 class GrpcOutgoingForwarder : public BlimpMessageProcessor {
33 public:
34 GrpcOutgoingForwarder(GrpcStream* stream,
35 ConnectionErrorObserver* error_observer)
36 : stream_(stream) {}
37 void ProcessMessage(std::unique_ptr<BlimpMessage> message,
38 const net::CompletionCallback& callback) override;
39
40 private:
41 static void SendMessageCb(const base::Callback<void(int)>& callback,
42 helium::Result result) {
43 callback.Run(ConvertHeliumResult(result));
44 }
45
46 GrpcStream* stream_;
47 };
48
49 void GrpcOutgoingForwarder::ProcessMessage(
50 std::unique_ptr<BlimpMessage> message,
51 const net::CompletionCallback& callback) {
52 DVLOG(3) << "Sending message: " << *message
53 << "; type = " << message->message_id();
54
55 if (stream_ != nullptr) {
56 std::unique_ptr<HeliumWrapper> helium_message =
57 base::MakeUnique<HeliumWrapper>();
58
59 if (message->SerializeToString(
60 helium_message->mutable_serialized_helium_message())) {
61 stream_->SendMessage(
62 std::move(helium_message),
63 base::Bind(&GrpcOutgoingForwarder::SendMessageCb, callback));
64 } else {
65 LOG(FATAL) << "Unable to serialize message " << *message;
66 }
67 }
68 }
69
70 // Callback for the gRPC layer that has a new |HeliumWrapper| message to forward
71 // to the client's 'incoming message processor' that accepts |BlimpMessage|.
72 class GrpcIncomingForwarder {
73 public:
74 GrpcIncomingForwarder(GrpcStream* stream,
75 ConnectionErrorObserver* error_observer)
76 : stream_(stream),
77 error_observer_(error_observer),
78 incoming_msg_processor_(nullptr),
79 incoming_forwarder_(this) {}
80
81 void set_incoming_message_processor(BlimpMessageProcessor* processor) {
82 bool is_receive_message_setup = (incoming_msg_processor_ != nullptr);
83 incoming_msg_processor_ = processor;
84
85 if (!is_receive_message_setup) {
86 TriggerReceiveMessage();
87 }
88 }
89
90 private:
91 void TriggerReceiveMessage();
92 void ReceiveMessage();
93 void OnMessageReceived(std::unique_ptr<HeliumWrapper> helium_message,
94 helium::Result result);
95 void ProcessResult(int result);
96
97 GrpcStream* stream_;
98 ConnectionErrorObserver* error_observer_;
99 BlimpMessageProcessor* incoming_msg_processor_;
100 base::WeakPtrFactory<GrpcIncomingForwarder> incoming_forwarder_;
101 };
102
103 void GrpcIncomingForwarder::ProcessResult(int result) {
104 // No-op.
105 }
106
107 void GrpcIncomingForwarder::TriggerReceiveMessage() {
108 base::ThreadTaskRunnerHandle::Get()->PostTask(
109 FROM_HERE, base::Bind(&GrpcIncomingForwarder::ReceiveMessage,
110 incoming_forwarder_.GetWeakPtr()));
111 }
112
113 void GrpcIncomingForwarder::ReceiveMessage() {
114 if (stream_ != nullptr) {
115 stream_->ReceiveMessage(
116 base::Bind(&GrpcIncomingForwarder::OnMessageReceived,
117 incoming_forwarder_.GetWeakPtr()));
118 }
119 }
120
121 void GrpcIncomingForwarder::OnMessageReceived(
122 std::unique_ptr<HeliumWrapper> helium_message, helium::Result result) {
123 int result_code = ConvertHeliumResult(result);
124 if (result != helium::Result::SUCCESS) {
125 DVLOG(3) << "Received message with error " << result;
126 error_observer_->OnConnectionError(result_code);
127 return;
128 }
129
130 DVLOG(3) << "Processing message : " << helium_message->GetTypeName();
131 std::unique_ptr<BlimpMessage> blimp_message =
132 base::MakeUnique<BlimpMessage>();
133
134 if (blimp_message->ParseFromString(
135 helium_message->serialized_helium_message())) {
136 if (incoming_msg_processor_ != nullptr) {
137 DVLOG(3) << "Parsed BlimpMessage: " << *blimp_message;
138 incoming_msg_processor_->ProcessMessage(
139 std::move(blimp_message),
140 base::Bind(&GrpcIncomingForwarder::ProcessResult,
141 incoming_forwarder_.GetWeakPtr()));
142 TriggerReceiveMessage();
143 }
144 } else {
145 LOG(FATAL) << "Unable to deserialize message.";
146 }
147 }
148
149 // We rely on the gRPC and our //net code to agree on the 'success' code alone.
150 // Again, this is interim while we use the current //net as well as gRPC but
151 // will go away once we fully move to gRPC alone with strongly-typed results.
152 static_assert(static_cast<int>(grpc::OK) == static_cast<int>(net::OK),
153 "Mismatch in error code definitions.");
154
155 GrpcConnection::GrpcConnection(std::unique_ptr<GrpcStream> stream)
156 : stream_(std::move(stream)),
157 outgoing_forwarder_(
158 base::MakeUnique<GrpcOutgoingForwarder>(stream_.get(), this)),
159 incoming_forwarder_(
160 base::MakeUnique<GrpcIncomingForwarder>(stream_.get(), this)) {}
161
162 BlimpMessageProcessor* GrpcConnection::GetOutgoingMessageProcessor() {
163 return outgoing_forwarder_.get();
164 }
165
166 void GrpcConnection::SetIncomingMessageProcessor(
167 BlimpMessageProcessor* processor) {
168 DVLOG(3) << "Setting incoming message processor " << processor;
169 incoming_forwarder_->set_incoming_message_processor(processor);
170 }
171
172 GrpcConnection::~GrpcConnection() { DVLOG(1) << "Ending connection."; }
173
174 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/grpc_connection.h ('k') | blimp/net/grpc_connection_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698