OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/routed_raw_channel.h" |
| 6 |
| 7 #include "base/bind.h" |
| 8 #include "base/logging.h" |
| 9 #include "mojo/edk/embedder/embedder_internal.h" |
| 10 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 11 |
| 12 namespace mojo { |
| 13 namespace edk { |
| 14 |
| 15 namespace { |
| 16 const uint64_t kInternalRoutingId = 0; |
| 17 |
| 18 // These are messages sent over our internal routing id above, meant for the |
| 19 // other side's RoutedRawChannel to dispatch. |
| 20 enum InternalMessages { |
| 21 ROUTE_CLOSED = 0, |
| 22 }; |
| 23 } |
| 24 |
| 25 RoutedRawChannel::PendingMessage::PendingMessage() { |
| 26 } |
| 27 |
| 28 RoutedRawChannel::PendingMessage::~PendingMessage() { |
| 29 } |
| 30 |
| 31 RoutedRawChannel::RoutedRawChannel( |
| 32 ScopedPlatformHandle handle, |
| 33 const base::Callback<void(RoutedRawChannel*)>& destruct_callback) |
| 34 : channel_(RawChannel::Create(handle.Pass())), |
| 35 destruct_callback_(destruct_callback) { |
| 36 internal::g_io_thread_task_runner->PostTask( |
| 37 FROM_HERE, |
| 38 base::Bind(&RawChannel::Init, base::Unretained(channel_), this)); |
| 39 internal::g_io_thread_task_runner->PostTask( |
| 40 FROM_HERE, |
| 41 base::Bind(&RawChannel::EnsureLazyInitialized, |
| 42 base::Unretained(channel_))); |
| 43 } |
| 44 |
| 45 void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) { |
| 46 CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved"; |
| 47 base::AutoLock auto_lock(lock_); |
| 48 CHECK(routes_.find(pipe_id) == routes_.end()); |
| 49 routes_[pipe_id] = pipe; |
| 50 |
| 51 for (size_t i = 0; i < pending_messages_.size();) { |
| 52 MessageInTransit::View view(pending_messages_[i]->message.size(), |
| 53 &pending_messages_[i]->message[0]); |
| 54 if (view.route_id() == pipe_id) { |
| 55 pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass()); |
| 56 pending_messages_.erase(pending_messages_.begin() + i); |
| 57 } else { |
| 58 ++i; |
| 59 } |
| 60 } |
| 61 |
| 62 if (close_routes_.find(pipe_id) != close_routes_.end()) |
| 63 pipe->OnError(ERROR_READ_SHUTDOWN); |
| 64 } |
| 65 |
| 66 void RoutedRawChannel::RemoveRoute(uint64_t pipe_id, |
| 67 MessagePipeDispatcher* pipe) { |
| 68 base::AutoLock auto_lock(lock_); |
| 69 CHECK(routes_.find(pipe_id) != routes_.end()); |
| 70 CHECK_EQ(routes_[pipe_id], pipe); |
| 71 routes_.erase(pipe_id); |
| 72 |
| 73 // Only send a message to the other side to close the route if we hadn't |
| 74 // received a close route message. Otherwise they would keep going back and |
| 75 // forth. |
| 76 if (close_routes_.find(pipe_id) != close_routes_.end()) { |
| 77 close_routes_.erase(pipe_id); |
| 78 } else if (channel_) { |
| 79 // Default route id of 0 to reach the other side's RoutedRawChannel. |
| 80 char message_data[sizeof(char) + sizeof(uint64_t)]; |
| 81 message_data[0] = ROUTE_CLOSED; |
| 82 memcpy(&message_data[1], &pipe_id, sizeof(uint64_t)); |
| 83 scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| 84 MessageInTransit::Type::MESSAGE, arraysize(message_data), |
| 85 message_data)); |
| 86 message->set_route_id(kInternalRoutingId); |
| 87 channel_->WriteMessage(message.Pass()); |
| 88 } |
| 89 |
| 90 if (!channel_ && routes_.empty()) { |
| 91 // PostTask to avoid reentrancy since the broker might be calling us. |
| 92 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); |
| 93 } |
| 94 } |
| 95 |
| 96 RoutedRawChannel::~RoutedRawChannel() { |
| 97 destruct_callback_.Run(this); |
| 98 } |
| 99 |
| 100 void RoutedRawChannel::OnReadMessage( |
| 101 const MessageInTransit::View& message_view, |
| 102 ScopedPlatformHandleVectorPtr platform_handles) { |
| 103 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| 104 // Note: normally, when a message arrives here we should find a corresponding |
| 105 // entry for the MessagePipeDispatcher with the given route_id. However it is |
| 106 // possible that they just connected, and due to race conditions one side has |
| 107 // connected and sent a message (and even closed) before the other side had a |
| 108 // chance to register with this RoutedRawChannel. In that case, we must buffer |
| 109 // all messages. |
| 110 base::AutoLock auto_lock(lock_); |
| 111 uint64_t route_id = message_view.route_id(); |
| 112 if (route_id == kInternalRoutingId) { |
| 113 CHECK_EQ(message_view.num_bytes(), sizeof(char) + sizeof(uint64_t)); |
| 114 const char* bytes = static_cast<const char*>(message_view.bytes()); |
| 115 CHECK_EQ(bytes[0], ROUTE_CLOSED); |
| 116 uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]); |
| 117 if (close_routes_.find(closed_route) != close_routes_.end()) { |
| 118 NOTREACHED() << "Should only receive one ROUTE_CLOSED per route."; |
| 119 return; |
| 120 } |
| 121 close_routes_.insert(closed_route); |
| 122 if (routes_.find(closed_route) == routes_.end()) |
| 123 return; // This side hasn't connected yet. |
| 124 |
| 125 routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN); |
| 126 return; |
| 127 } |
| 128 |
| 129 if (routes_.find(route_id) != routes_.end()) { |
| 130 routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass()); |
| 131 } else { |
| 132 scoped_ptr<PendingMessage> msg(new PendingMessage); |
| 133 msg->message.resize(message_view.total_size()); |
| 134 memcpy(&msg->message[0], message_view.main_buffer(), |
| 135 message_view.total_size()); |
| 136 msg->handles = platform_handles.Pass(); |
| 137 pending_messages_.push_back(msg.Pass()); |
| 138 } |
| 139 } |
| 140 |
| 141 void RoutedRawChannel::OnError(Error error) { |
| 142 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| 143 bool destruct = false; |
| 144 { |
| 145 base::AutoLock auto_lock(lock_); |
| 146 |
| 147 channel_->Shutdown(); |
| 148 channel_ = nullptr; |
| 149 if (routes_.empty()) { |
| 150 destruct = true; |
| 151 } else { |
| 152 for (auto it = routes_.begin(); it != routes_.end(); ++it) |
| 153 it->second->OnError(error); |
| 154 } |
| 155 } |
| 156 |
| 157 if (destruct) |
| 158 delete this; |
| 159 } |
| 160 |
| 161 } // namespace edk |
| 162 } // namespace mojo |
OLD | NEW |