| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <algorithm> |
| 10 #include <utility> | 11 #include <utility> |
| 11 | 12 |
| 12 #include "base/bind.h" | 13 #include "base/bind.h" |
| 13 #include "base/logging.h" | 14 #include "base/logging.h" |
| 14 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
| 15 #include "mojo/edk/embedder/embedder_internal.h" | 16 #include "mojo/edk/embedder/embedder_internal.h" |
| 16 #include "mojo/edk/embedder/platform_shared_buffer.h" | 17 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 17 #include "mojo/edk/embedder/platform_support.h" | 18 #include "mojo/edk/embedder/platform_support.h" |
| 18 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
| 19 #include "mojo/edk/system/data_pipe.h" | 20 #include "mojo/edk/system/transport_data.h" |
| 20 | 21 |
| 21 namespace mojo { | 22 namespace mojo { |
| 22 namespace edk { | 23 namespace edk { |
| 23 | 24 |
| 24 void DataPipeProducerDispatcher::Init( | 25 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe, |
| 25 ScopedPlatformHandle message_pipe, | 26 char* serialized_write_buffer, |
| 26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { | 27 size_t serialized_write_buffer_size, |
| 27 if (message_pipe.is_valid()) { | 28 char* serialized_read_buffer, |
| 28 channel_ = RawChannel::Create(std::move(message_pipe)); | 29 size_t serialized_read_buffer_size, |
| 29 channel_->SetSerializedData( | 30 ScopedPlatformHandle shared_buffer_handle, |
| 30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, | 31 size_t ring_buffer_start, |
| 31 nullptr, nullptr); | 32 size_t ring_buffer_size) { |
| 32 internal::g_io_thread_task_runner->PostTask( | 33 if (!message_pipe.is_valid()) { |
| 33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | 34 peer_closed_ = true; |
| 34 } else { | |
| 35 error_ = true; | |
| 36 } | 35 } |
| 36 |
| 37 data_pipe_->Init(std::move(message_pipe), serialized_write_buffer, |
| 38 serialized_write_buffer_size, serialized_read_buffer, |
| 39 serialized_read_buffer_size, std::move(shared_buffer_handle), |
| 40 ring_buffer_start, ring_buffer_size, true, |
| 41 base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
| 37 } | 42 } |
| 38 | 43 |
| 39 void DataPipeProducerDispatcher::InitOnIO() { | 44 void DataPipeProducerDispatcher::InitOnIO() { |
| 40 base::AutoLock locker(lock()); | 45 base::AutoLock locker(lock()); |
| 41 if (channel_) | 46 calling_init_ = true; |
| 42 channel_->Init(this); | 47 RawChannel* channel = data_pipe_->GetChannel(); |
| 48 if (channel) |
| 49 channel->Init(this); |
| 50 calling_init_ = false; |
| 43 } | 51 } |
| 44 | 52 |
| 45 void DataPipeProducerDispatcher::CloseOnIO() { | 53 void DataPipeProducerDispatcher::CloseOnIO() { |
| 46 base::AutoLock locker(lock()); | 54 base::AutoLock locker(lock()); |
| 47 if (channel_) { | 55 data_pipe_->Shutdown(); |
| 48 channel_->Shutdown(); | |
| 49 channel_ = nullptr; | |
| 50 } | |
| 51 } | 56 } |
| 52 | 57 |
| 53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 58 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
| 54 return Type::DATA_PIPE_PRODUCER; | 59 return Type::DATA_PIPE_PRODUCER; |
| 55 } | 60 } |
| 56 | 61 |
| 57 scoped_refptr<DataPipeProducerDispatcher> | 62 scoped_refptr<DataPipeProducerDispatcher> |
| 58 DataPipeProducerDispatcher::Deserialize( | 63 DataPipeProducerDispatcher::Deserialize( |
| 59 const void* source, | 64 const void* source, |
| 60 size_t size, | 65 size_t size, |
| 61 PlatformHandleVector* platform_handles) { | 66 PlatformHandleVector* platform_handles) { |
| 62 MojoCreateDataPipeOptions options; | 67 MojoCreateDataPipeOptions options; |
| 63 ScopedPlatformHandle shared_memory_handle; | 68 ScopedPlatformHandle channel_handle, channel_shared_handle, |
| 64 size_t shared_memory_size = 0; | 69 shared_buffer_handle; |
| 65 ScopedPlatformHandle platform_handle = | 70 size_t serialized_read_buffer_size, serialized_write_buffer_size; |
| 66 DataPipe::Deserialize(source, size, platform_handles, &options, | 71 size_t ring_buffer_start, ring_buffer_size; |
| 67 &shared_memory_handle, &shared_memory_size); | 72 |
| 73 ScopedPlatformHandle platform_handle = DataPipe::Deserialize( |
| 74 source, size, platform_handles, &options, &channel_shared_handle, |
| 75 &serialized_read_buffer_size, &serialized_write_buffer_size, |
| 76 &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size); |
| 68 | 77 |
| 69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | 78 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); |
| 70 | 79 |
| 80 // Create shared buffer and pull out the serialised read and write data |
| 81 // for the channel. |
| 82 size_t sz = serialized_write_buffer_size + serialized_read_buffer_size; |
| 83 char* serialized_read_buffer = nullptr; |
| 71 char* serialized_write_buffer = nullptr; | 84 char* serialized_write_buffer = nullptr; |
| 72 size_t serialized_write_buffer_size = 0; | 85 scoped_refptr<PlatformSharedBuffer> channel_shared_buffer; |
| 73 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
| 74 scoped_ptr<PlatformSharedBufferMapping> mapping; | 86 scoped_ptr<PlatformSharedBufferMapping> mapping; |
| 75 if (shared_memory_size) { | 87 if (channel_shared_handle.is_valid()) { |
| 76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | 88 channel_shared_buffer = |
| 77 shared_memory_size, std::move(shared_memory_handle)); | 89 internal::g_platform_support->CreateSharedBufferFromHandle( |
| 78 mapping = shared_buffer->Map(0, shared_memory_size); | 90 sz, std::move(channel_shared_handle)); |
| 79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); | 91 mapping = channel_shared_buffer->Map(0, sz); |
| 80 serialized_write_buffer_size = shared_memory_size; | 92 |
| 93 serialized_read_buffer = static_cast<char*>(mapping->GetBase()); |
| 94 serialized_write_buffer = |
| 95 static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size; |
| 81 } | 96 } |
| 82 | 97 |
| 83 rv->Init(std::move(platform_handle), serialized_write_buffer, | 98 rv->Init(std::move(platform_handle), serialized_read_buffer, |
| 84 serialized_write_buffer_size); | 99 serialized_read_buffer_size, serialized_write_buffer, |
| 100 serialized_write_buffer_size, std::move(shared_buffer_handle), |
| 101 ring_buffer_start, ring_buffer_size); |
| 85 return rv; | 102 return rv; |
| 86 } | 103 } |
| 87 | 104 |
| 88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | 105 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| 89 const MojoCreateDataPipeOptions& options) | 106 const MojoCreateDataPipeOptions& options) |
| 90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { | 107 : data_pipe_(new DataPipe(options)), |
| 91 } | 108 calling_init_(false), |
| 109 peer_closed_(false), |
| 110 in_two_phase_write_(false), |
| 111 two_phase_max_bytes_write_(0) {} |
| 92 | 112 |
| 93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | 113 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
| 94 // See comment in ~MessagePipeDispatcher. | 114 // See comment in ~MessagePipeDispatcher. |
| 95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 115 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
| 96 channel_->Shutdown(); | 116 data_pipe_->Shutdown(); |
| 97 else | 117 else |
| 98 DCHECK(!channel_); | 118 DCHECK(!data_pipe_->GetChannel()); |
| 99 } | 119 } |
| 100 | 120 |
| 101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | 121 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
| 102 lock().AssertAcquired(); | 122 lock().AssertAcquired(); |
| 103 awakable_list_.CancelAll(); | 123 awakable_list_.CancelAll(); |
| 104 } | 124 } |
| 105 | 125 |
| 106 void DataPipeProducerDispatcher::CloseImplNoLock() { | 126 void DataPipeProducerDispatcher::CloseImplNoLock() { |
| 107 lock().AssertAcquired(); | 127 lock().AssertAcquired(); |
| 108 internal::g_io_thread_task_runner->PostTask( | 128 internal::g_io_thread_task_runner->PostTask( |
| 109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | 129 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); |
| 110 } | 130 } |
| 111 | 131 |
| 112 scoped_refptr<Dispatcher> | 132 scoped_refptr<Dispatcher> |
| 113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 133 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 134 // This function is used by TransportData to make sure there are no references |
| 135 // to the dispatcher it is trying to serialise and transport. |
| 114 lock().AssertAcquired(); | 136 lock().AssertAcquired(); |
| 115 | 137 |
| 116 SerializeInternal(); | 138 scoped_refptr<DataPipeProducerDispatcher> rv = |
| 139 Create(data_pipe_->GetOptions()); |
| 140 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get()); |
| 117 | 141 |
| 118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | 142 DCHECK(!in_two_phase_write_); |
| 119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | 143 |
| 120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
| 121 rv->serialized_ = true; | |
| 122 return scoped_refptr<Dispatcher>(rv.get()); | 144 return scoped_refptr<Dispatcher>(rv.get()); |
| 123 } | 145 } |
| 124 | 146 |
| 125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | 147 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
| 126 const void* elements, | 148 const void* elements, |
| 127 uint32_t* num_bytes, | 149 uint32_t* num_bytes, |
| 128 MojoWriteDataFlags flags) { | 150 MojoWriteDataFlags flags) { |
| 129 lock().AssertAcquired(); | 151 lock().AssertAcquired(); |
| 130 if (InTwoPhaseWrite()) | 152 if (in_two_phase_write_) |
| 131 return MOJO_RESULT_BUSY; | 153 return MOJO_RESULT_BUSY; |
| 132 if (error_) | 154 if (peer_closed_) |
| 133 return MOJO_RESULT_FAILED_PRECONDITION; | 155 return MOJO_RESULT_FAILED_PRECONDITION; |
| 134 if (*num_bytes % options_.element_num_bytes != 0) | 156 if (*num_bytes % data_pipe_->GetOptions().element_num_bytes != 0) |
| 135 return MOJO_RESULT_INVALID_ARGUMENT; | 157 return MOJO_RESULT_INVALID_ARGUMENT; |
| 136 if (*num_bytes == 0) | 158 if (*num_bytes == 0) |
| 137 return MOJO_RESULT_OK; // Nothing to do. | 159 return MOJO_RESULT_OK; // Nothing to do. |
| 138 | 160 |
| 139 // For now, we ignore options.capacity_num_bytes as a total of all pending | 161 // Don't write non element sized chunks. |
| 140 // writes (and just treat it per message). We will implement that later if | 162 uint32_t writable = uint32_t(data_pipe_->GetWritableBytes()); |
| 141 // we need to. All current uses want all their data to be sent, and it's not | 163 writable -= writable % data_pipe_->GetOptions().element_num_bytes; |
| 142 // clear that this backpressure should be done at the mojo layer or at a | 164 |
| 143 // higher application layer. | |
| 144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | 165 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
| 145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; | 166 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
| 146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | 167 if (min_num_bytes_to_write > writable) { |
| 147 // Don't return "should wait" since you can't wait for a specified amount of | 168 // Don't return "should wait" since you can't wait for a specified amount of |
| 148 // data. | 169 // data. |
| 149 return MOJO_RESULT_OUT_OF_RANGE; | 170 return MOJO_RESULT_OUT_OF_RANGE; |
| 150 } | 171 } |
| 151 | 172 |
| 152 uint32_t num_bytes_to_write = | 173 if (writable == 0) |
| 153 std::min(*num_bytes, options_.capacity_num_bytes); | |
| 154 if (num_bytes_to_write == 0) | |
| 155 return MOJO_RESULT_SHOULD_WAIT; | 174 return MOJO_RESULT_SHOULD_WAIT; |
| 156 | 175 |
| 157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 176 uint32_t num_bytes_to_write = std::min(*num_bytes, writable); |
| 177 |
| 178 // The failure case for |WriteDataIntoSharedBuffer| is the shared |
| 179 // buffer not existing, so we should wait. |
| 180 if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) { |
| 181 return MOJO_RESULT_SHOULD_WAIT; |
| 182 } |
| 183 |
| 184 // If we can't tell the other end about the write, pretend this write didn't |
| 185 // happen and mark the other end as closed. We deal with any state changes |
| 186 // due to the other side being closed in OnError. |
| 187 if (!data_pipe_->NotifyWrite(num_bytes_to_write)) { |
| 188 peer_closed_ = true; |
| 189 return MOJO_RESULT_FAILED_PRECONDITION; |
| 190 } |
| 158 | 191 |
| 159 *num_bytes = num_bytes_to_write; | 192 *num_bytes = num_bytes_to_write; |
| 160 WriteDataIntoMessages(elements, num_bytes_to_write); | |
| 161 | 193 |
| 194 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| 195 data_pipe_->UpdateFromWrite(num_bytes_to_write); |
| 162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 196 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 163 if (!new_state.equals(old_state)) | 197 if (!new_state.equals(old_state)) |
| 164 awakable_list_.AwakeForStateChange(new_state); | 198 awakable_list_.AwakeForStateChange(new_state); |
| 199 |
| 165 return MOJO_RESULT_OK; | 200 return MOJO_RESULT_OK; |
| 166 } | 201 } |
| 167 | 202 |
| 168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | 203 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
| 169 void** buffer, | 204 void** buffer, |
| 170 uint32_t* buffer_num_bytes, | 205 uint32_t* buffer_num_bytes, |
| 171 MojoWriteDataFlags flags) { | 206 MojoWriteDataFlags flags) { |
| 172 lock().AssertAcquired(); | 207 lock().AssertAcquired(); |
| 173 if (InTwoPhaseWrite()) | 208 if (in_two_phase_write_) |
| 174 return MOJO_RESULT_BUSY; | 209 return MOJO_RESULT_BUSY; |
| 175 if (error_) | 210 if (peer_closed_) |
| 176 return MOJO_RESULT_FAILED_PRECONDITION; | 211 return MOJO_RESULT_FAILED_PRECONDITION; |
| 177 | 212 |
| 178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | 213 size_t max_num_bytes_to_write; |
| 214 void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write); |
| 215 |
| 216 if (max_num_bytes_to_write == 0) |
| 217 return MOJO_RESULT_SHOULD_WAIT; |
| 218 |
| 179 if (*buffer_num_bytes == 0) | 219 if (*buffer_num_bytes == 0) |
| 180 *buffer_num_bytes = options_.capacity_num_bytes; | 220 *buffer_num_bytes = uint32_t(max_num_bytes_to_write); |
| 181 | 221 |
| 182 two_phase_data_.resize(*buffer_num_bytes); | 222 // Don't promise more bytes than we have. |
| 183 *buffer = &two_phase_data_[0]; | 223 *buffer_num_bytes = |
| 224 std::min(uint32_t(max_num_bytes_to_write), *buffer_num_bytes); |
| 184 | 225 |
| 185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | 226 two_phase_max_bytes_write_ = *buffer_num_bytes; |
| 186 // we can construct a MessageInTransit here. But then we need to make | 227 *buffer = temp_buf; |
| 187 // MessageInTransit support changing its data size later. | 228 in_two_phase_write_ = true; |
| 188 | 229 |
| 189 return MOJO_RESULT_OK; | 230 return MOJO_RESULT_OK; |
| 190 } | 231 } |
| 191 | 232 |
| 192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | 233 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
| 193 uint32_t num_bytes_written) { | 234 uint32_t num_bytes_written) { |
| 194 lock().AssertAcquired(); | 235 lock().AssertAcquired(); |
| 195 if (!InTwoPhaseWrite()) | 236 if (!in_two_phase_write_) |
| 196 return MOJO_RESULT_FAILED_PRECONDITION; | 237 return MOJO_RESULT_FAILED_PRECONDITION; |
| 197 | 238 |
| 239 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| 240 in_two_phase_write_ = false; |
| 241 |
| 242 if (num_bytes_written > two_phase_max_bytes_write_ || |
| 243 num_bytes_written % data_pipe_->GetOptions().element_num_bytes != 0) { |
| 244 return MOJO_RESULT_INVALID_ARGUMENT; |
| 245 } |
| 246 |
| 247 data_pipe_->UpdateFromWrite(num_bytes_written); |
| 248 |
| 249 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 250 if (!new_state.equals(old_state)) |
| 251 awakable_list_.AwakeForStateChange(new_state); |
| 252 |
| 198 // Note: Allow successful completion of the two-phase write even if the other | 253 // Note: Allow successful completion of the two-phase write even if the other |
| 199 // side has been closed. | 254 // side has been closed. |
| 200 MojoResult rv = MOJO_RESULT_OK; | 255 // Deal with state changes due to peer being closed in OnError. |
| 201 if (num_bytes_written > two_phase_data_.size() || | 256 if (!data_pipe_->NotifyWrite(num_bytes_written)) |
| 202 num_bytes_written % options_.element_num_bytes != 0) { | 257 peer_closed_ = true; |
| 203 rv = MOJO_RESULT_INVALID_ARGUMENT; | |
| 204 } else if (channel_) { | |
| 205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); | |
| 206 } | |
| 207 | 258 |
| 208 // Two-phase write ended even on failure. | 259 return MOJO_RESULT_OK; |
| 209 two_phase_data_.clear(); | |
| 210 // If we're now writable, we *became* writable (since we weren't writable | |
| 211 // during the two-phase write), so awake producer awakables. | |
| 212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
| 213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
| 214 awakable_list_.AwakeForStateChange(new_state); | |
| 215 | |
| 216 return rv; | |
| 217 } | 260 } |
| 218 | 261 |
| 219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | 262 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
| 220 const { | 263 const { |
| 221 lock().AssertAcquired(); | 264 lock().AssertAcquired(); |
| 222 | 265 |
| 223 HandleSignalsState rv; | 266 HandleSignalsState rv; |
| 224 if (!error_) { | 267 if (!peer_closed_) { |
| 225 if (!InTwoPhaseWrite()) | 268 if (!in_two_phase_write_ && data_pipe_->GetWritableBytes()) |
| 226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 269 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 270 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 228 } else { | 271 } else { |
| 229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 272 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 230 } | 273 } |
| 274 |
| 231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 232 return rv; | 276 return rv; |
| 233 } | 277 } |
| 234 | 278 |
| 235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | 279 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
| 236 Awakable* awakable, | 280 Awakable* awakable, |
| 237 MojoHandleSignals signals, | 281 MojoHandleSignals signals, |
| 238 uintptr_t context, | 282 uintptr_t context, |
| 239 HandleSignalsState* signals_state) { | 283 HandleSignalsState* signals_state) { |
| 240 lock().AssertAcquired(); | 284 lock().AssertAcquired(); |
| 241 if (channel_) | |
| 242 channel_->EnsureLazyInitialized(); | |
| 243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 285 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| 244 if (state.satisfies(signals)) { | 286 if (state.satisfies(signals)) { |
| 245 if (signals_state) | 287 if (signals_state) |
| 246 *signals_state = state; | 288 *signals_state = state; |
| 247 return MOJO_RESULT_ALREADY_EXISTS; | 289 return MOJO_RESULT_ALREADY_EXISTS; |
| 248 } | 290 } |
| 249 if (!state.can_satisfy(signals)) { | 291 if (!state.can_satisfy(signals)) { |
| 250 if (signals_state) | 292 if (signals_state) |
| 251 *signals_state = state; | 293 *signals_state = state; |
| 252 return MOJO_RESULT_FAILED_PRECONDITION; | 294 return MOJO_RESULT_FAILED_PRECONDITION; |
| 253 } | 295 } |
| 254 | 296 |
| 255 awakable_list_.Add(awakable, signals, context); | 297 awakable_list_.Add(awakable, signals, context); |
| 256 return MOJO_RESULT_OK; | 298 return MOJO_RESULT_OK; |
| 257 } | 299 } |
| 258 | 300 |
| 259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | 301 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
| 260 Awakable* awakable, | 302 Awakable* awakable, |
| 261 HandleSignalsState* signals_state) { | 303 HandleSignalsState* signals_state) { |
| 262 lock().AssertAcquired(); | 304 lock().AssertAcquired(); |
| 263 awakable_list_.Remove(awakable); | 305 awakable_list_.Remove(awakable); |
| 264 if (signals_state) | 306 if (signals_state) |
| 265 *signals_state = GetHandleSignalsStateImplNoLock(); | 307 *signals_state = GetHandleSignalsStateImplNoLock(); |
| 266 } | 308 } |
| 267 | 309 |
| 268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | 310 void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
| 269 size_t* max_size, | 311 size_t* max_size, |
| 270 size_t* max_platform_handles) { | 312 size_t* max_platform_handles) { |
| 271 if (!serialized_) | 313 data_pipe_->StartSerialize(max_size, max_platform_handles); |
| 272 SerializeInternal(); | |
| 273 | |
| 274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
| 275 !serialized_write_buffer_.empty(), max_size, | |
| 276 max_platform_handles); | |
| 277 } | 314 } |
| 278 | 315 |
| 279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | 316 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
| 280 void* destination, | 317 void* destination, |
| 281 size_t* actual_size, | 318 size_t* actual_size, |
| 282 PlatformHandleVector* platform_handles) { | 319 PlatformHandleVector* platform_handles) { |
| 283 ScopedPlatformHandle shared_memory_handle; | 320 data_pipe_->EndSerialize(destination, actual_size, platform_handles); |
| 284 size_t shared_memory_size = serialized_write_buffer_.size(); | |
| 285 if (shared_memory_size) { | |
| 286 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
| 287 internal::g_platform_support->CreateSharedBuffer( | |
| 288 shared_memory_size)); | |
| 289 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
| 290 shared_buffer->Map(0, shared_memory_size)); | |
| 291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0], | |
| 292 shared_memory_size); | |
| 293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
| 294 } | |
| 295 | |
| 296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | |
| 297 std::move(shared_memory_handle), shared_memory_size, | |
| 298 destination, actual_size, platform_handles); | |
| 299 CloseImplNoLock(); | 321 CloseImplNoLock(); |
| 300 return true; | 322 return true; |
| 301 } | 323 } |
| 302 | 324 |
| 303 void DataPipeProducerDispatcher::TransportStarted() { | 325 void DataPipeProducerDispatcher::TransportStarted() { |
| 304 started_transport_.Acquire(); | 326 started_transport_.Acquire(); |
| 305 } | 327 } |
| 306 | 328 |
| 307 void DataPipeProducerDispatcher::TransportEnded() { | 329 void DataPipeProducerDispatcher::TransportEnded() { |
| 308 started_transport_.Release(); | 330 started_transport_.Release(); |
| 309 } | 331 } |
| 310 | 332 |
| 311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | 333 bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
| 312 lock().AssertAcquired(); | 334 lock().AssertAcquired(); |
| 313 return InTwoPhaseWrite(); | 335 return in_two_phase_write_; |
| 336 } |
| 337 |
| 338 bool DataPipeProducerDispatcher::ProcessCommand( |
| 339 const DataPipeCommandHeader& command, |
| 340 ScopedPlatformHandleVectorPtr platform_handles) { |
| 341 // Handles write/read case and shared buffer becoming available case. |
| 342 return data_pipe_->ProcessCommand(command, std::move(platform_handles)); |
| 314 } | 343 } |
| 315 | 344 |
| 316 void DataPipeProducerDispatcher::OnReadMessage( | 345 void DataPipeProducerDispatcher::OnReadMessage( |
| 317 const MessageInTransit::View& message_view, | 346 const MessageInTransit::View& message_view, |
| 318 ScopedPlatformHandleVectorPtr platform_handles) { | 347 ScopedPlatformHandleVectorPtr platform_handles) { |
| 319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; | 348 const DataPipeCommandHeader* command = |
| 349 static_cast<const DataPipeCommandHeader*>(message_view.bytes()); |
| 350 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader)); |
| 351 |
| 352 if (started_transport_.Try()) { |
| 353 // We're not in the middle of being sent. |
| 354 |
| 355 // Can get synchronously called back from RawChannel::Init in InitOnIO if |
| 356 // there was initial data. InitOnIO locks, so don't lock twice. |
| 357 scoped_ptr<base::AutoLock> locker; |
| 358 if (!calling_init_) { |
| 359 locker.reset(new base::AutoLock(lock())); |
| 360 } |
| 361 |
| 362 if (ProcessCommand(*command, std::move(platform_handles))) { |
| 363 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 364 } |
| 365 started_transport_.Release(); |
| 366 } else { |
| 367 // DataPipe::Serialize calls ReleaseHandle on the channel, which |
| 368 // acquires RawChannel's read_lock_. The function OnReadMessage is only |
| 369 // called while read_lock_ is acquired, and not after ReleaseHandle has been |
| 370 // called. This means this function will only be called before Serialize |
| 371 // calls ReleaseHandle, meaning the serialisation will not have started yet. |
| 372 // We only notify awakables if we're not in the process of being |
| 373 // transported. |
| 374 ProcessCommand(*command, std::move(platform_handles)); |
| 375 } |
| 320 } | 376 } |
| 321 | 377 |
| 322 void DataPipeProducerDispatcher::OnError(Error error) { | 378 void DataPipeProducerDispatcher::OnError(Error error) { |
| 323 switch (error) { | 379 switch (error) { |
| 324 case ERROR_READ_BROKEN: | |
| 325 case ERROR_READ_BAD_MESSAGE: | |
| 326 case ERROR_READ_UNKNOWN: | |
| 327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; | |
| 328 break; | |
| 329 case ERROR_READ_SHUTDOWN: | 380 case ERROR_READ_SHUTDOWN: |
| 330 // The other side was cleanly closed, so this isn't actually an error. | 381 // The other side was cleanly closed, so this isn't actually an error. |
| 331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; | 382 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; |
| 332 break; | 383 break; |
| 384 case ERROR_READ_BROKEN: |
| 385 LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)"; |
| 386 break; |
| 387 case ERROR_READ_BAD_MESSAGE: |
| 388 // Receiving a bad message means either a bug, data corruption, or |
| 389 // malicious attack (probably due to some other bug). |
| 390 LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad " |
| 391 << "message)"; |
| 392 break; |
| 393 case ERROR_READ_UNKNOWN: |
| 394 LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)"; |
| 395 break; |
| 333 case ERROR_WRITE: | 396 case ERROR_WRITE: |
| 334 // Write errors are slightly notable: they probably shouldn't happen under | 397 LOG(ERROR) << "DataPipeProducerDispatcher write error"; |
| 335 // normal operation (but maybe the other side crashed). | |
| 336 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | |
| 337 break; | 398 break; |
| 338 } | 399 } |
| 339 | 400 |
| 340 error_ = true; | 401 peer_closed_ = true; |
| 341 if (started_transport_.Try()) { | 402 if (started_transport_.Try()) { |
| 342 base::AutoLock locker(lock()); | 403 base::AutoLock locker(lock()); |
| 343 // We can get two OnError callbacks before the post task below completes. | 404 // We can get two OnError callbacks before the post task below completes. |
| 344 // Although RawChannel still has a pointer to this object until Shutdown is | 405 // Although RawChannel still has a pointer to this object until Shutdown is |
| 345 // called, that is safe since this class always does a PostTask to the IO | 406 // called, that is safe since this class always does a PostTask to the IO |
| 346 // thread to self destruct. | 407 // thread to self destruct. |
| 347 if (channel_) { | 408 if (data_pipe_->GetChannel()) { |
| 348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 409 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 349 channel_->Shutdown(); | 410 data_pipe_->Shutdown(); |
| 350 channel_ = nullptr; | |
| 351 } | 411 } |
| 352 started_transport_.Release(); | 412 started_transport_.Release(); |
| 353 } else { | 413 } else { |
| 354 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 414 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
| 355 } | 415 } |
| 356 } | 416 } |
| 357 | 417 |
| 358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | |
| 359 return !two_phase_data_.empty(); | |
| 360 } | |
| 361 | |
| 362 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | |
| 363 const void* elements, | |
| 364 uint32_t num_bytes) { | |
| 365 // The maximum amount of data to send per message (make it a multiple of the | |
| 366 // element size. | |
| 367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
| 368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | |
| 369 DCHECK_GT(max_message_num_bytes, 0u); | |
| 370 | |
| 371 uint32_t offset = 0; | |
| 372 while (offset < num_bytes) { | |
| 373 uint32_t message_num_bytes = | |
| 374 std::min(static_cast<uint32_t>(max_message_num_bytes), | |
| 375 num_bytes - offset); | |
| 376 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 377 MessageInTransit::Type::MESSAGE, message_num_bytes, | |
| 378 static_cast<const char*>(elements) + offset)); | |
| 379 if (!channel_->WriteMessage(std::move(message))) { | |
| 380 error_ = true; | |
| 381 return false; | |
| 382 } | |
| 383 | |
| 384 offset += message_num_bytes; | |
| 385 } | |
| 386 | |
| 387 return true; | |
| 388 } | |
| 389 | |
| 390 void DataPipeProducerDispatcher::SerializeInternal() { | |
| 391 // We need to stop watching handle immediately, even though not on IO thread, | |
| 392 // so that other messages aren't read after this. | |
| 393 if (channel_) { | |
| 394 std::vector<char> serialized_read_buffer; | |
| 395 std::vector<int> fds; | |
| 396 bool write_error = false; | |
| 397 serialized_platform_handle_ = channel_->ReleaseHandle( | |
| 398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, | |
| 399 &write_error); | |
| 400 CHECK(serialized_read_buffer.empty()); | |
| 401 CHECK(fds.empty()); | |
| 402 if (write_error) | |
| 403 serialized_platform_handle_.reset(); | |
| 404 channel_ = nullptr; | |
| 405 } | |
| 406 serialized_ = true; | |
| 407 } | |
| 408 | |
| 409 } // namespace edk | 418 } // namespace edk |
| 410 } // namespace mojo | 419 } // namespace mojo |
| OLD | NEW |