OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/routed_raw_channel.h" | |
6 | |
7 #include <stddef.h> | |
8 #include <stdint.h> | |
9 | |
10 #include <utility> | |
11 | |
12 #include "base/bind.h" | |
13 #include "base/logging.h" | |
14 #include "mojo/edk/embedder/embedder_internal.h" | |
15 | |
16 namespace mojo { | |
17 namespace edk { | |
18 | |
19 namespace { | |
20 const uint64_t kInternalRouteId = 0; | |
21 } | |
22 | |
23 RoutedRawChannel::PendingMessage::PendingMessage() { | |
24 } | |
25 | |
26 RoutedRawChannel::PendingMessage::~PendingMessage() { | |
27 } | |
28 | |
29 RoutedRawChannel::RoutedRawChannel( | |
30 ScopedPlatformHandle handle, | |
31 const base::Callback<void(RoutedRawChannel*)>& destruct_callback) | |
32 : channel_(RawChannel::Create(std::move(handle))), | |
33 destruct_callback_(destruct_callback) { | |
34 internal::g_io_thread_task_runner->PostTask( | |
35 FROM_HERE, | |
36 base::Bind(&RawChannel::Init, base::Unretained(channel_), this)); | |
37 internal::g_io_thread_task_runner->PostTask( | |
38 FROM_HERE, | |
39 base::Bind(&RawChannel::EnsureLazyInitialized, | |
40 base::Unretained(channel_))); | |
41 } | |
42 | |
43 void RoutedRawChannel::AddRoute(uint64_t route_id, | |
44 RawChannel::Delegate* delegate) { | |
45 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
46 CHECK_NE(route_id, kInternalRouteId) << kInternalRouteId << " is reserved"; | |
47 CHECK(routes_.find(route_id) == routes_.end()); | |
48 routes_[route_id] = delegate; | |
49 | |
50 for (size_t i = 0; i < pending_messages_.size();) { | |
51 MessageInTransit::View view(pending_messages_[i]->message.size(), | |
52 &pending_messages_[i]->message[0]); | |
53 if (view.route_id() == route_id) { | |
54 delegate->OnReadMessage(view, std::move(pending_messages_[i]->handles)); | |
55 pending_messages_.erase(pending_messages_.begin() + i); | |
56 } else { | |
57 ++i; | |
58 } | |
59 } | |
60 | |
61 if (close_routes_.find(route_id) != close_routes_.end()) | |
62 delegate->OnError(ERROR_READ_SHUTDOWN); | |
63 } | |
64 | |
65 void RoutedRawChannel::RemoveRoute(uint64_t route_id) { | |
66 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
67 // We don't check that routes_ contains route_id because it's possible for it | |
68 // to not have been added yet (i.e. it's waiting to be added later down the | |
69 // call stack). | |
70 routes_.erase(route_id); | |
71 | |
72 // Only send a message to the other side to close the route if we hadn't | |
73 // received a close route message. Otherwise they would keep going back and | |
74 // forth. | |
75 if (close_routes_.find(route_id) != close_routes_.end()) { | |
76 close_routes_.erase(route_id); | |
77 } else if (channel_) { | |
78 // Default route id of 0 to reach the other side's RoutedRawChannel. | |
79 char message_data[sizeof(uint64_t)]; | |
80 memcpy(&message_data[0], &route_id, sizeof(uint64_t)); | |
81 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
82 MessageInTransit::Type::MESSAGE, arraysize(message_data), | |
83 message_data)); | |
84 message->set_route_id(kInternalRouteId); | |
85 channel_->WriteMessage(std::move(message)); | |
86 } | |
87 | |
88 if (!channel_ && routes_.empty()) { | |
89 // PostTask to avoid reentrancy since the broker might be calling us. | |
90 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); | |
91 } | |
92 } | |
93 | |
94 RoutedRawChannel::~RoutedRawChannel() { | |
95 DCHECK(!channel_); | |
96 destruct_callback_.Run(this); | |
97 } | |
98 | |
99 void RoutedRawChannel::OnReadMessage( | |
100 const MessageInTransit::View& message_view, | |
101 ScopedPlatformHandleVectorPtr platform_handles) { | |
102 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
103 // Note: normally, when a message arrives here we should find a corresponding | |
104 // entry for the RawChannel::Delegate with the given route_id. However it is | |
105 // possible that they just connected, and due to race conditions one side has | |
106 // connected and sent a message (and even closed) before the other side had a | |
107 // chance to register with this RoutedRawChannel. In that case, we must buffer | |
108 // all messages. | |
109 uint64_t route_id = message_view.route_id(); | |
110 if (route_id == kInternalRouteId) { | |
111 if (message_view.num_bytes() != sizeof(uint64_t)) { | |
112 NOTREACHED() << "Invalid internal message in RoutedRawChannel." ; | |
113 return; | |
114 } | |
115 uint64_t closed_route = *static_cast<const uint64_t*>(message_view.bytes()); | |
116 if (close_routes_.find(closed_route) != close_routes_.end()) { | |
117 NOTREACHED() << "Should only receive one ROUTE_CLOSED per route."; | |
118 return; | |
119 } | |
120 close_routes_.insert(closed_route); | |
121 if (routes_.find(closed_route) == routes_.end()) | |
122 return; // This side hasn't connected yet. | |
123 | |
124 routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN); | |
125 return; | |
126 } | |
127 | |
128 if (routes_.find(route_id) != routes_.end()) { | |
129 routes_[route_id]->OnReadMessage(message_view, std::move(platform_handles)); | |
130 } else { | |
131 scoped_ptr<PendingMessage> msg(new PendingMessage); | |
132 msg->message.resize(message_view.total_size()); | |
133 memcpy(&msg->message[0], message_view.main_buffer(), | |
134 message_view.total_size()); | |
135 msg->handles = std::move(platform_handles); | |
136 pending_messages_.push_back(std::move(msg)); | |
137 } | |
138 } | |
139 | |
140 void RoutedRawChannel::OnError(Error error) { | |
141 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
142 | |
143 // Note: we must ensure we don't call RawChannel::Shutdown until after we've | |
144 // called OnError on each route's delegate. | |
145 for (auto it = routes_.begin(); it != routes_.end();) { | |
146 // The delegate might call RemoveRoute in their OnError implementation which | |
147 // would invalidate |it|. So increment it first. | |
148 auto cur_it = it++; | |
149 cur_it->second->OnError(error); | |
150 } | |
151 | |
152 if (routes_.empty()) { | |
153 channel_->Shutdown(); | |
154 channel_ = nullptr; | |
155 delete this; | |
156 return; | |
157 } | |
158 } | |
159 | |
160 } // namespace edk | |
161 } // namespace mojo | |
OLD | NEW |