Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: blimp/net/grpc_connection.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Implement helium::Stream Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_connection.h"
6
7 #include "blimp/common/proto/blimp_message.pb.h"
8
9 #include "base/callback.h"
10 #include "base/callback_helpers.h"
11 #include "base/macros.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/threading/thread_task_runner_handle.h"
14 #include "blimp/common/logging.h"
15 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/grpc_stream.h"
17 #include "content/public/browser/browser_thread.h"
18 #include "net/base/net_errors.h"
19
20 namespace blimp {
21
22 int ConvertHeliumResult(helium::Result result) {
23 return (result == helium::Result::SUCCESS ? 0 : -1);
24 }
25
26 // NOTE: This entire class/file is going away soon.
27 // This set of classes is a thin wrapper around the new HeliumStream-like
28 // GrpcStream.
29
30 // Forwards a |BlimpMessage| from the client to the Engine via gRPC after
31 // converting it to a |HeliumWrapper| first.
32 class GrpcOutgoingForwarder : public BlimpMessageProcessor {
33 public:
34 GrpcOutgoingForwarder(GrpcStream* stream,
35 ConnectionErrorObserver* error_observer)
36 : stream_(stream) {}
37 void ProcessMessage(std::unique_ptr<BlimpMessage> message,
38 const net::CompletionCallback& callback) override;
39
40 private:
41 static void SendMessageCb(const base::Callback<void(int)>& callback,
42 helium::Result result) {
43 callback.Run(ConvertHeliumResult(result));
44 }
45
46 GrpcStream* stream_;
47 };
48
49 void GrpcOutgoingForwarder::ProcessMessage(
50 std::unique_ptr<BlimpMessage> message,
51 const net::CompletionCallback& callback) {
52 DVLOG(3) << "Sending message: " << *message
53 << "; type = " << message->message_id();
54 // DCHECK(message->has_message_id());
55 if (stream_ != nullptr) {
56 std::unique_ptr<HeliumWrapper> helium_message = base::MakeUnique<HeliumWrapp er>();
57
58 if (message->SerializeToString(
59 helium_message->mutable_serialized_helium_message())) {
60 stream_->SendMessage(
61 std::move(helium_message),
62 base::Bind(&GrpcOutgoingForwarder::SendMessageCb, callback));
63 } else {
64 LOG(FATAL) << "Unable to serialize message " << *message;
65 }
66 }
67 };
68
69 // Callback for the gRPC layer that has a new |HeliumWrapper| message to forward
70 // to the client's 'incoming message processor' that accepts |BlimpMessage|.
71 class GrpcIncomingForwarder {
72 public:
73 GrpcIncomingForwarder(GrpcStream* stream,
74 ConnectionErrorObserver* error_observer)
75 : stream_(stream),
76 error_observer_(error_observer),
77 incoming_msg_processor_(nullptr),
78 incoming_forwarder_(this) {}
79
80 void set_incoming_message_processor(BlimpMessageProcessor* processor) {
81 bool is_receive_message_setup = (incoming_msg_processor_ != nullptr);
82 incoming_msg_processor_ = processor;
83
84 if (!is_receive_message_setup) {
85 TriggerReceiveMessage();
86 }
87 }
88
89 private:
90 void TriggerReceiveMessage();
91
92 void ReceiveMessage();
93
94 void OnMessageReceived(std::unique_ptr<HeliumWrapper> helium_message,
95 helium::Result result);
96
97 void ProcessResult(int result);
98
99 GrpcStream* stream_;
100 ConnectionErrorObserver* error_observer_;
101 BlimpMessageProcessor* incoming_msg_processor_;
102 base::WeakPtrFactory<GrpcIncomingForwarder> incoming_forwarder_;
103 };
104
105 void GrpcIncomingForwarder::ProcessResult(int result) {
106 // No-op.
107 }
108
109 void GrpcIncomingForwarder::TriggerReceiveMessage() {
110 base::ThreadTaskRunnerHandle::Get()->PostTask(
111 FROM_HERE, base::Bind(&GrpcIncomingForwarder::ReceiveMessage,
112 incoming_forwarder_.GetWeakPtr()));
113 }
114
115 void GrpcIncomingForwarder::ReceiveMessage() {
116 if (stream_ != nullptr) {
117 stream_->ReceiveMessage(
118 base::Bind(&GrpcIncomingForwarder::OnMessageReceived,
119 incoming_forwarder_.GetWeakPtr()));
120 }
121 }
122
123 void GrpcIncomingForwarder::OnMessageReceived(
124 std::unique_ptr<HeliumWrapper> helium_message, helium::Result result) {
125 int result_code = ConvertHeliumResult(result);
126 if (result != helium::Result::SUCCESS) {
127 DVLOG(3) << "Received message with error " << result;
128 error_observer_->OnConnectionError(result_code);
129 return;
130 }
131
132 DVLOG(3) << "Processing message : " << helium_message->GetTypeName();
133 std::unique_ptr<BlimpMessage> blimp_message = base::MakeUnique<BlimpMessage>() ;
134
135 if (blimp_message->ParseFromString(
136 helium_message->serialized_helium_message())) {
137 if (incoming_msg_processor_ != nullptr) {
138 DVLOG(3) << "Parsed BlimpMessage: " << *blimp_message;
139 incoming_msg_processor_->ProcessMessage(
140 std::move(blimp_message),
141 base::Bind(&GrpcIncomingForwarder::ProcessResult,
142 incoming_forwarder_.GetWeakPtr()));
143 TriggerReceiveMessage();
144 }
145 } else {
146 LOG(FATAL) << "Unable to deserialize message.";
147 }
148 }
149
150 // We rely on the gRPC and our //net code to agree on the 'success' code alone.
151 // Again, this is interim while we use the current //net as well as gRPC but
152 // will go away once we fully move to gRPC alone with strongly-typed results.
153 static_assert(static_cast<int>(grpc::OK) == static_cast<int>(net::OK),
154 "Mismatch in error code definitions.");
155
156 GrpcConnection::GrpcConnection(std::unique_ptr<GrpcStream> stream)
157 : stream_(std::move(stream)),
158 outgoing_forwarder_(
159 base::MakeUnique<GrpcOutgoingForwarder>(stream_.get(), this)),
160 incoming_forwarder_(
161 base::MakeUnique<GrpcIncomingForwarder>(stream_.get(), this)) {}
162
163 BlimpMessageProcessor* GrpcConnection::GetOutgoingMessageProcessor() {
164 return outgoing_forwarder_.get();
165 }
166
167 void GrpcConnection::SetIncomingMessageProcessor(
168 BlimpMessageProcessor* processor) {
169 DVLOG(3) << "Setting incoming message processor " << processor;
170 incoming_forwarder_->set_incoming_message_processor(processor);
171 }
172
173 GrpcConnection::~GrpcConnection() { DVLOG(1) << "Ending connection."; }
174
175 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698