OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/routed_raw_channel.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/logging.h" | |
9 #include "mojo/edk/embedder/embedder_internal.h" | |
10 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
11 | |
12 namespace mojo { | |
13 namespace edk { | |
14 | |
15 namespace { | |
16 const uint64_t kInternalRoutingId = 0; | |
17 | |
18 // These are messages sent over our internal routing id above, meant for the | |
19 // other side's RoutedRawChannel to dispatch. | |
20 enum InternalMessages { | |
21 ROUTE_CLOSED = 0, | |
22 }; | |
23 } | |
24 | |
25 RoutedRawChannel::PendingMessage::PendingMessage() { | |
26 } | |
27 | |
28 RoutedRawChannel::PendingMessage::~PendingMessage() { | |
29 } | |
30 | |
31 RoutedRawChannel::RoutedRawChannel( | |
32 ScopedPlatformHandle handle, | |
33 const base::Callback<void(RoutedRawChannel*)>& destruct_callback) | |
34 : channel_(RawChannel::Create(handle.Pass())), | |
35 destruct_callback_(destruct_callback) { | |
36 internal::g_io_thread_task_runner->PostTask( | |
37 FROM_HERE, | |
38 base::Bind(&RawChannel::Init, base::Unretained(channel_), this)); | |
39 internal::g_io_thread_task_runner->PostTask( | |
40 FROM_HERE, | |
41 base::Bind(&RawChannel::EnsureLazyInitialized, | |
42 base::Unretained(channel_))); | |
43 } | |
44 | |
45 void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) { | |
46 CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved"; | |
47 base::AutoLock auto_lock(lock_); | |
48 CHECK(routes_.find(pipe_id) == routes_.end()); | |
49 routes_[pipe_id] = pipe; | |
50 | |
51 for (size_t i = 0; i < pending_messages_.size();) { | |
52 MessageInTransit::View view(pending_messages_[i]->message.size(), | |
53 &pending_messages_[i]->message[0]); | |
54 if (view.route_id() == pipe_id) { | |
55 pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass()); | |
56 pending_messages_.erase(pending_messages_.begin() + i); | |
57 } else { | |
58 ++i; | |
59 } | |
60 } | |
61 | |
62 if (close_routes_.find(pipe_id) != close_routes_.end()) | |
63 pipe->OnError(ERROR_READ_SHUTDOWN); | |
64 } | |
65 | |
66 void RoutedRawChannel::RemoveRoute(uint64_t pipe_id, | |
67 MessagePipeDispatcher* pipe) { | |
68 base::AutoLock auto_lock(lock_); | |
69 CHECK(routes_.find(pipe_id) != routes_.end()); | |
70 CHECK_EQ(routes_[pipe_id], pipe); | |
71 routes_.erase(pipe_id); | |
72 | |
73 // Only send a message to the other side to close the route if we hadn't | |
74 // received a close route message. Otherwise they would keep going back and | |
75 // forth. | |
76 if (close_routes_.find(pipe_id) != close_routes_.end()) { | |
77 close_routes_.erase(pipe_id); | |
78 } else if (channel_) { | |
79 // Default route id of 0 to reach the other side's RoutedRawChannel. | |
80 char message_data[sizeof(char) + sizeof(uint64_t)]; | |
81 message_data[0] = ROUTE_CLOSED; | |
82 memcpy(&message_data[1], &pipe_id, sizeof(uint64_t)); | |
83 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
84 MessageInTransit::Type::MESSAGE, arraysize(message_data), | |
85 message_data)); | |
86 message->set_route_id(kInternalRoutingId); | |
87 channel_->WriteMessage(message.Pass()); | |
88 } | |
89 | |
90 if (!channel_ && routes_.empty()) { | |
91 // PostTask to avoid reentrancy since the broker might be calling us. | |
92 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); | |
93 } | |
94 } | |
95 | |
96 RoutedRawChannel::~RoutedRawChannel() { | |
97 destruct_callback_.Run(this); | |
98 } | |
99 | |
100 void RoutedRawChannel::OnReadMessage( | |
101 const MessageInTransit::View& message_view, | |
102 ScopedPlatformHandleVectorPtr platform_handles) { | |
103 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
104 // Note: normally, when a message arrives here we should find a corresponding | |
105 // entry for the MessagePipeDispatcher with the given route_id. However it is | |
106 // possible that they just connected, and due to race conditions one side has | |
107 // connected and sent a message (and even closed) before the other side had a | |
108 // chance to register with this RoutedRawChannel. In that case, we must buffer | |
109 // all messages. | |
110 base::AutoLock auto_lock(lock_); | |
111 uint64_t route_id = message_view.route_id(); | |
112 if (route_id == kInternalRoutingId) { | |
113 CHECK_EQ(message_view.num_bytes(), sizeof(char) + sizeof(uint64_t)); | |
Tom Sepez
2015/12/04 17:59:43
Again, can a rouge child cause us to abort there?
jam
2015/12/05 00:09:34
yep, added if statement
| |
114 const char* bytes = static_cast<const char*>(message_view.bytes()); | |
115 CHECK_EQ(bytes[0], ROUTE_CLOSED); | |
116 uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]); | |
117 if (close_routes_.find(closed_route) != close_routes_.end()) { | |
118 NOTREACHED() << "Should only receive one ROUTE_CLOSED per route."; | |
119 return; | |
120 } | |
121 close_routes_.insert(closed_route); | |
122 if (routes_.find(closed_route) == routes_.end()) | |
123 return; // This side hasn't connected yet. | |
124 | |
125 routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN); | |
126 return; | |
127 } | |
128 | |
129 if (routes_.find(route_id) != routes_.end()) { | |
130 routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass()); | |
131 } else { | |
132 scoped_ptr<PendingMessage> msg(new PendingMessage); | |
133 msg->message.resize(message_view.total_size()); | |
134 memcpy(&msg->message[0], message_view.main_buffer(), | |
135 message_view.total_size()); | |
136 msg->handles = platform_handles.Pass(); | |
137 pending_messages_.push_back(msg.Pass()); | |
138 } | |
139 } | |
140 | |
141 void RoutedRawChannel::OnError(Error error) { | |
142 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
143 bool destruct = false; | |
144 { | |
145 base::AutoLock auto_lock(lock_); | |
146 | |
147 channel_->Shutdown(); | |
148 channel_ = nullptr; | |
149 if (routes_.empty()) { | |
150 destruct = true; | |
151 } else { | |
152 for (auto it = routes_.begin(); it != routes_.end(); ++it) | |
153 it->second->OnError(error); | |
154 } | |
155 } | |
156 | |
157 if (destruct) | |
158 delete this; | |
159 } | |
160 | |
161 } // namespace edk | |
162 } // namespace mojo | |
OLD | NEW |