Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(519)

Side by Side Diff: blimp/net/grpc_connection.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_connection.h"
6
7 #include "blimp/common/proto/blimp_message.pb.h"
8
9 #include "base/callback.h"
10 #include "base/callback_helpers.h"
11 #include "base/macros.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/threading/thread_task_runner_handle.h"
14 #include "blimp/common/logging.h"
15 #include "blimp/net/grpc_stream.h"
16 #include "content/public/browser/browser_thread.h"
17 #include "net/base/net_errors.h"
18 #include "blimp/net/connection_error_observer.h"
19
20 namespace blimp {
21
22 // NOTE: This entire class/file is going away soon.
23 // This set of classes is a thin wrapper around the new HeliumStream-like
24 // GrpcStream.
25
26 // Forwards a |BlimpMessage| from the client to the Engine via gRPC after
27 // converting it to a |HeliumWrapper| first.
28 class GrpcOutgoingForwarder : public BlimpMessageProcessor {
29 public:
30 GrpcOutgoingForwarder(GrpcStream* stream,
31 ConnectionErrorObserver* error_observer)
32 : stream_(stream) {}
33 void ProcessMessage(std::unique_ptr<BlimpMessage> message,
34 const net::CompletionCallback& callback) override;
35
36 private:
37 GrpcStream* stream_;
38 };
39
40 void GrpcOutgoingForwarder::ProcessMessage(
41 std::unique_ptr<BlimpMessage> message,
42 const net::CompletionCallback& callback) {
43 DVLOG(3) << "Sending message: " << *message
44 << "; type = " << message->message_id();
45 // DCHECK(message->has_message_id());
46 if (stream_ != nullptr) {
47 auto helium_message = base::MakeUnique<HeliumWrapper>();
48
49 if (message->SerializeToString(
50 helium_message->mutable_serialized_helium_message())) {
51 stream_->SendMessage(std::move(helium_message), callback);
52 } else {
53 LOG(FATAL) << "Unable to serialize message " << *message;
54 }
55 }
56 };
57
58 // Callback for the gRPC layer that has a new |HeliumWrapper| message to forward
59 // to the client's 'incoming message processor' that accepts |BlimpMessage|.
60 class GrpcIncomingForwarder {
61 public:
62 GrpcIncomingForwarder(GrpcStream* stream,
63 ConnectionErrorObserver* error_observer)
64 : stream_(stream),
65 error_observer_(error_observer),
66 incoming_msg_processor_(nullptr),
67 incoming_forwarder_(this) {}
68
69 void set_incoming_message_processor(BlimpMessageProcessor* processor) {
70 auto is_receive_message_setup = (incoming_msg_processor_ != nullptr);
71 incoming_msg_processor_ = processor;
72
73 if (!is_receive_message_setup) {
74 TriggerReceiveMessage();
75 }
76 }
77
78 private:
79 void TriggerReceiveMessage();
80
81 void ReceiveMessage();
82
83 void OnMessageReceived(std::unique_ptr<HeliumWrapper> helium_message,
84 int result);
85
86 void ProcessResult(int result);
87
88 GrpcStream* stream_;
89 ConnectionErrorObserver* error_observer_;
90 BlimpMessageProcessor* incoming_msg_processor_;
91 base::WeakPtrFactory<GrpcIncomingForwarder> incoming_forwarder_;
92 };
93
94 void GrpcIncomingForwarder::ProcessResult(int result) {
95 // No-op.
96 }
97
98 void GrpcIncomingForwarder::TriggerReceiveMessage() {
99 base::ThreadTaskRunnerHandle::Get()->PostTask(
100 FROM_HERE, base::Bind(&GrpcIncomingForwarder::ReceiveMessage,
101 incoming_forwarder_.GetWeakPtr()));
102 }
103
104 void GrpcIncomingForwarder::ReceiveMessage() {
105 if (stream_ != nullptr) {
106 stream_->ReceiveMessage(
107 base::Bind(&GrpcIncomingForwarder::OnMessageReceived,
108 incoming_forwarder_.GetWeakPtr()));
109 }
110 }
111
112 void GrpcIncomingForwarder::OnMessageReceived(
113 std::unique_ptr<HeliumWrapper> helium_message,
114 int result) {
115 if (result != grpc::OK) {
116 DVLOG(3) << "Received message with error " << result;
117 error_observer_->OnConnectionError(result);
118 return;
119 }
120
121 DVLOG(3) << "Processing message : " << helium_message->GetTypeName();
122 auto blimp_message = base::MakeUnique<BlimpMessage>();
123
124 if (blimp_message->ParseFromString(
125 helium_message->serialized_helium_message())) {
126 if (incoming_msg_processor_ != nullptr) {
127 DVLOG(3) << "Parsed BlimpMessage: " << *blimp_message;
128 incoming_msg_processor_->ProcessMessage(
129 std::move(blimp_message),
130 base::Bind(&GrpcIncomingForwarder::ProcessResult,
131 incoming_forwarder_.GetWeakPtr()));
132 TriggerReceiveMessage();
133 }
134 } else {
135 LOG(FATAL) << "Unable to deserialize message.";
136 }
137 }
138
139 // We rely on the gRPC and our //net code to agree on the 'success' code alone.
140 // Again, this is interim while we use the current //net as well as gRPC but
141 // will go away once we fully move to gRPC alone with strongly-typed results.
142 static_assert(static_cast<int>(grpc::OK) == static_cast<int>(net::OK),
143 "Mismatch in error code definitions.");
144
145 GrpcConnection::GrpcConnection(std::unique_ptr<GrpcStream> stream)
146 : stream_(std::move(stream)),
147 outgoing_forwarder_(
148 base::MakeUnique<GrpcOutgoingForwarder>(stream_.get(), this)),
149 incoming_forwarder_(
150 base::MakeUnique<GrpcIncomingForwarder>(stream_.get(), this)) {}
151
152 BlimpMessageProcessor* GrpcConnection::GetOutgoingMessageProcessor() {
153 return outgoing_forwarder_.get();
154 }
155
156 void GrpcConnection::SetIncomingMessageProcessor(
157 BlimpMessageProcessor* processor) {
158 DVLOG(3) << "Setting incoming message processor " << processor;
159 incoming_forwarder_->set_incoming_message_processor(processor);
160 }
161
162 GrpcConnection::~GrpcConnection() {
163 DVLOG(1) << "Ending connection.";
164 }
165
166 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698