Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(303)

Side by Side Diff: mojo/edk/system/routed_raw_channel.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/routed_raw_channel.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "mojo/edk/embedder/embedder_internal.h"
10 #include "mojo/edk/system/message_pipe_dispatcher.h"
11
12 namespace mojo {
13 namespace edk {
14
15 namespace {
16 const uint64_t kInternalRoutingId = 0;
17
18 // These are messages sent over our internal routing id above, meant for the
19 // other side's RoutedRawChannel to dispatch.
20 enum InternalMessages {
21 ROUTE_CLOSED = 0,
22 };
23 }
24
25 RoutedRawChannel::PendingMessage::PendingMessage() {
26 }
27
28 RoutedRawChannel::PendingMessage::~PendingMessage() {
29 }
30
31 RoutedRawChannel::RoutedRawChannel(
32 ScopedPlatformHandle handle,
33 const base::Callback<void(RoutedRawChannel*)>& destruct_callback)
34 : channel_(RawChannel::Create(handle.Pass())),
35 destruct_callback_(destruct_callback) {
36 internal::g_io_thread_task_runner->PostTask(
37 FROM_HERE,
38 base::Bind(&RawChannel::Init, base::Unretained(channel_), this));
39 internal::g_io_thread_task_runner->PostTask(
40 FROM_HERE,
41 base::Bind(&RawChannel::EnsureLazyInitialized,
42 base::Unretained(channel_)));
43 }
44
45 void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) {
46 CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved";
47 base::AutoLock auto_lock(lock_);
48 CHECK(routes_.find(pipe_id) == routes_.end());
49 routes_[pipe_id] = pipe;
50
51 for (size_t i = 0; i < pending_messages_.size();) {
52 MessageInTransit::View view(pending_messages_[i]->message.size(),
53 &pending_messages_[i]->message[0]);
54 if (view.route_id() == pipe_id) {
55 pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass());
56 pending_messages_.erase(pending_messages_.begin() + i);
57 } else {
58 ++i;
59 }
60 }
61
62 if (close_routes_.find(pipe_id) != close_routes_.end())
63 pipe->OnError(ERROR_READ_SHUTDOWN);
64 }
65
66 void RoutedRawChannel::RemoveRoute(uint64_t pipe_id,
67 MessagePipeDispatcher* pipe) {
68 base::AutoLock auto_lock(lock_);
69 CHECK(routes_.find(pipe_id) != routes_.end());
70 CHECK_EQ(routes_[pipe_id], pipe);
71 routes_.erase(pipe_id);
72
73 // Only send a message to the other side to close the route if we hadn't
74 // received a close route message. Otherwise they would keep going back and
75 // forth.
76 if (close_routes_.find(pipe_id) != close_routes_.end()) {
77 close_routes_.erase(pipe_id);
78 } else if (channel_) {
79 // Default route id of 0 to reach the other side's RoutedRawChannel.
80 char message_data[sizeof(char) + sizeof(uint64_t)];
81 message_data[0] = ROUTE_CLOSED;
82 memcpy(&message_data[1], &pipe_id, sizeof(uint64_t));
83 scoped_ptr<MessageInTransit> message(new MessageInTransit(
84 MessageInTransit::Type::MESSAGE, arraysize(message_data),
85 message_data));
86 message->set_route_id(kInternalRoutingId);
87 channel_->WriteMessage(message.Pass());
88 }
89
90 if (!channel_ && routes_.empty()) {
91 // PostTask to avoid reentrancy since the broker might be calling us.
92 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this);
93 }
94 }
95
96 RoutedRawChannel::~RoutedRawChannel() {
97 destruct_callback_.Run(this);
98 }
99
100 void RoutedRawChannel::OnReadMessage(
101 const MessageInTransit::View& message_view,
102 ScopedPlatformHandleVectorPtr platform_handles) {
103 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
104 // Note: normally, when a message arrives here we should find a corresponding
105 // entry for the MessagePipeDispatcher with the given route_id. However it is
106 // possible that they just connected, and due to race conditions one side has
107 // connected and sent a message (and even closed) before the other side had a
108 // chance to register with this RoutedRawChannel. In that case, we must buffer
109 // all messages.
110 base::AutoLock auto_lock(lock_);
111 uint64_t route_id = message_view.route_id();
112 if (route_id == kInternalRoutingId) {
113 CHECK_EQ(message_view.num_bytes(), sizeof(char) + sizeof(uint64_t));
Tom Sepez 2015/12/04 17:59:43 Again, can a rouge child cause us to abort there?
jam 2015/12/05 00:09:34 yep, added if statement
114 const char* bytes = static_cast<const char*>(message_view.bytes());
115 CHECK_EQ(bytes[0], ROUTE_CLOSED);
116 uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]);
117 if (close_routes_.find(closed_route) != close_routes_.end()) {
118 NOTREACHED() << "Should only receive one ROUTE_CLOSED per route.";
119 return;
120 }
121 close_routes_.insert(closed_route);
122 if (routes_.find(closed_route) == routes_.end())
123 return; // This side hasn't connected yet.
124
125 routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN);
126 return;
127 }
128
129 if (routes_.find(route_id) != routes_.end()) {
130 routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass());
131 } else {
132 scoped_ptr<PendingMessage> msg(new PendingMessage);
133 msg->message.resize(message_view.total_size());
134 memcpy(&msg->message[0], message_view.main_buffer(),
135 message_view.total_size());
136 msg->handles = platform_handles.Pass();
137 pending_messages_.push_back(msg.Pass());
138 }
139 }
140
141 void RoutedRawChannel::OnError(Error error) {
142 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
143 bool destruct = false;
144 {
145 base::AutoLock auto_lock(lock_);
146
147 channel_->Shutdown();
148 channel_ = nullptr;
149 if (routes_.empty()) {
150 destruct = true;
151 } else {
152 for (auto it = routes_.begin(); it != routes_.end(); ++it)
153 it->second->OnError(error);
154 }
155 }
156
157 if (destruct)
158 delete this;
159 }
160
161 } // namespace edk
162 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698