Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(872)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm>
10 #include <utility> 11 #include <utility>
11 12
12 #include "base/bind.h" 13 #include "base/bind.h"
13 #include "base/logging.h" 14 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h" 16 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_shared_buffer.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h"
17 #include "mojo/edk/embedder/platform_support.h" 18 #include "mojo/edk/embedder/platform_support.h"
18 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/data_pipe.h" 20 #include "mojo/edk/system/transport_data.h"
20 21
21 namespace mojo { 22 namespace mojo {
22 namespace edk { 23 namespace edk {
23 24
24 void DataPipeProducerDispatcher::Init( 25 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe,
25 ScopedPlatformHandle message_pipe, 26 char* serialized_write_buffer,
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { 27 uint32_t serialized_write_buffer_size,
27 if (message_pipe.is_valid()) { 28 char* serialized_read_buffer,
28 channel_ = RawChannel::Create(std::move(message_pipe)); 29 uint32_t serialized_read_buffer_size,
29 channel_->SetSerializedData( 30 ScopedPlatformHandle shared_buffer_handle,
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, 31 uint32_t ring_buffer_start,
31 nullptr, nullptr); 32 uint32_t ring_buffer_size) {
32 internal::g_io_thread_task_runner->PostTask( 33 if (!message_pipe.is_valid()) {
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); 34 peer_closed_ = true;
34 } else {
35 error_ = true;
36 } 35 }
36
37 data_pipe_->Init(std::move(message_pipe), serialized_write_buffer,
38 serialized_write_buffer_size, serialized_read_buffer,
39 serialized_read_buffer_size, std::move(shared_buffer_handle),
40 ring_buffer_start, ring_buffer_size, true /* is_producer */,
41 base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
37 } 42 }
38 43
39 void DataPipeProducerDispatcher::InitOnIO() { 44 void DataPipeProducerDispatcher::InitOnIO() {
40 base::AutoLock locker(lock()); 45 base::AutoLock locker(lock());
41 if (channel_) 46 calling_init_ = true;
42 channel_->Init(this); 47 RawChannel* channel = data_pipe_->GetChannel();
48 if (channel)
49 channel->Init(this);
50 calling_init_ = false;
43 } 51 }
44 52
45 void DataPipeProducerDispatcher::CloseOnIO() { 53 void DataPipeProducerDispatcher::CloseOnIO() {
46 base::AutoLock locker(lock()); 54 base::AutoLock locker(lock());
47 if (channel_) { 55 data_pipe_->Shutdown();
48 channel_->Shutdown();
49 channel_ = nullptr;
50 }
51 } 56 }
52 57
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 58 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
54 return Type::DATA_PIPE_PRODUCER; 59 return Type::DATA_PIPE_PRODUCER;
55 } 60 }
56 61
57 scoped_refptr<DataPipeProducerDispatcher> 62 scoped_refptr<DataPipeProducerDispatcher>
58 DataPipeProducerDispatcher::Deserialize( 63 DataPipeProducerDispatcher::Deserialize(
59 const void* source, 64 const void* source,
60 size_t size, 65 size_t size,
61 PlatformHandleVector* platform_handles) { 66 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options; 67 MojoCreateDataPipeOptions options;
63 ScopedPlatformHandle shared_memory_handle; 68 ScopedPlatformHandle channel_handle, channel_shared_handle,
64 size_t shared_memory_size = 0; 69 shared_buffer_handle;
65 ScopedPlatformHandle platform_handle = 70 uint32_t serialized_read_buffer_size, serialized_write_buffer_size;
66 DataPipe::Deserialize(source, size, platform_handles, &options, 71 uint32_t ring_buffer_start, ring_buffer_size;
67 &shared_memory_handle, &shared_memory_size); 72
73 ScopedPlatformHandle platform_handle = DataPipe::Deserialize(
74 source, size, platform_handles, &options, &channel_shared_handle,
75 &serialized_read_buffer_size, &serialized_write_buffer_size,
76 &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size);
68 77
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); 78 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
70 79
80 // Create shared buffer and pull out the serialised read and write data
81 // for the channel.
82 uint32_t buffer_size =
83 serialized_write_buffer_size + serialized_read_buffer_size;
84 char* serialized_read_buffer = nullptr;
71 char* serialized_write_buffer = nullptr; 85 char* serialized_write_buffer = nullptr;
72 size_t serialized_write_buffer_size = 0; 86 scoped_refptr<PlatformSharedBuffer> channel_shared_buffer;
73 scoped_refptr<PlatformSharedBuffer> shared_buffer;
74 scoped_ptr<PlatformSharedBufferMapping> mapping; 87 scoped_ptr<PlatformSharedBufferMapping> mapping;
75 if (shared_memory_size) { 88 if (channel_shared_handle.is_valid()) {
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( 89 channel_shared_buffer =
77 shared_memory_size, std::move(shared_memory_handle)); 90 internal::g_platform_support->CreateSharedBufferFromHandle(
78 mapping = shared_buffer->Map(0, shared_memory_size); 91 buffer_size, std::move(channel_shared_handle));
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); 92 mapping = channel_shared_buffer->Map(0, buffer_size);
80 serialized_write_buffer_size = shared_memory_size; 93
94 serialized_read_buffer = static_cast<char*>(mapping->GetBase());
95 serialized_write_buffer =
96 static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size;
81 } 97 }
82 98
83 rv->Init(std::move(platform_handle), serialized_write_buffer, 99 rv->Init(std::move(platform_handle), serialized_read_buffer,
84 serialized_write_buffer_size); 100 serialized_read_buffer_size, serialized_write_buffer,
101 serialized_write_buffer_size, std::move(shared_buffer_handle),
102 ring_buffer_start, ring_buffer_size);
85 return rv; 103 return rv;
86 } 104 }
87 105
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 106 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
89 const MojoCreateDataPipeOptions& options) 107 const MojoCreateDataPipeOptions& options)
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { 108 : data_pipe_(new DataPipe(options)),
91 } 109 calling_init_(false),
110 peer_closed_(false),
111 in_two_phase_write_(false),
112 two_phase_max_bytes_write_(0u) {}
92 113
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 114 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
94 // See comment in ~MessagePipeDispatcher. 115 // See comment in ~MessagePipeDispatcher.
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 116 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
96 channel_->Shutdown(); 117 data_pipe_->Shutdown();
97 else 118 else
98 DCHECK(!channel_); 119 DCHECK(!data_pipe_->GetChannel());
99 } 120 }
100 121
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { 122 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
102 lock().AssertAcquired(); 123 lock().AssertAcquired();
103 awakable_list_.CancelAll(); 124 awakable_list_.CancelAll();
104 } 125 }
105 126
106 void DataPipeProducerDispatcher::CloseImplNoLock() { 127 void DataPipeProducerDispatcher::CloseImplNoLock() {
107 lock().AssertAcquired(); 128 lock().AssertAcquired();
108 internal::g_io_thread_task_runner->PostTask( 129 internal::g_io_thread_task_runner->PostTask(
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); 130 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
110 } 131 }
111 132
112 scoped_refptr<Dispatcher> 133 scoped_refptr<Dispatcher>
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 134 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
135 // This function is used by TransportData to make sure there are no references
136 // to the dispatcher it is trying to serialise and transport.
114 lock().AssertAcquired(); 137 lock().AssertAcquired();
115 138
116 SerializeInternal(); 139 scoped_refptr<DataPipeProducerDispatcher> rv =
140 Create(data_pipe_->GetOptions());
141 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
117 142
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); 143 DCHECK(!in_two_phase_write_);
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); 144
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
121 rv->serialized_ = true;
122 return scoped_refptr<Dispatcher>(rv.get()); 145 return scoped_refptr<Dispatcher>(rv.get());
123 } 146 }
124 147
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( 148 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
126 const void* elements, 149 const void* elements,
127 uint32_t* num_bytes, 150 uint32_t* num_bytes,
128 MojoWriteDataFlags flags) { 151 MojoWriteDataFlags flags) {
129 lock().AssertAcquired(); 152 lock().AssertAcquired();
130 if (InTwoPhaseWrite()) 153 if (in_two_phase_write_)
131 return MOJO_RESULT_BUSY; 154 return MOJO_RESULT_BUSY;
132 if (error_) 155 if (peer_closed_)
133 return MOJO_RESULT_FAILED_PRECONDITION; 156 return MOJO_RESULT_FAILED_PRECONDITION;
134 if (*num_bytes % options_.element_num_bytes != 0) 157 if (*num_bytes % data_pipe_->GetOptions().element_num_bytes != 0)
135 return MOJO_RESULT_INVALID_ARGUMENT; 158 return MOJO_RESULT_INVALID_ARGUMENT;
136 if (*num_bytes == 0) 159 if (*num_bytes == 0)
137 return MOJO_RESULT_OK; // Nothing to do. 160 return MOJO_RESULT_OK; // Nothing to do.
138 161
139 // For now, we ignore options.capacity_num_bytes as a total of all pending 162 // Don't write non element sized chunks.
140 // writes (and just treat it per message). We will implement that later if 163 uint32_t writable = data_pipe_->GetWritableBytes();
141 // we need to. All current uses want all their data to be sent, and it's not 164 writable -= writable % data_pipe_->GetOptions().element_num_bytes;
142 // clear that this backpressure should be done at the mojo layer or at a 165
143 // higher application layer.
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 166 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; 167 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { 168 if (min_num_bytes_to_write > writable) {
147 // Don't return "should wait" since you can't wait for a specified amount of 169 // Don't return "should wait" since you can't wait for a specified amount of
148 // data. 170 // data.
149 return MOJO_RESULT_OUT_OF_RANGE; 171 return MOJO_RESULT_OUT_OF_RANGE;
150 } 172 }
151 173
152 uint32_t num_bytes_to_write = 174 if (writable == 0)
153 std::min(*num_bytes, options_.capacity_num_bytes);
154 if (num_bytes_to_write == 0)
155 return MOJO_RESULT_SHOULD_WAIT; 175 return MOJO_RESULT_SHOULD_WAIT;
156 176
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 177 uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
178
179 // The failure case for |WriteDataIntoSharedBuffer| is the shared
180 // buffer not existing, so we should wait.
181 if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
182 return MOJO_RESULT_SHOULD_WAIT;
183 }
184
185 // If we can't tell the other end about the write, pretend this write didn't
186 // happen and mark the other end as closed. We deal with any state changes
187 // due to the other side being closed in OnError.
188 if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
189 peer_closed_ = true;
190 return MOJO_RESULT_FAILED_PRECONDITION;
191 }
158 192
159 *num_bytes = num_bytes_to_write; 193 *num_bytes = num_bytes_to_write;
160 WriteDataIntoMessages(elements, num_bytes_to_write);
161 194
195 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
196 data_pipe_->UpdateFromWrite(num_bytes_to_write);
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 197 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
163 if (!new_state.equals(old_state)) 198 if (!new_state.equals(old_state))
164 awakable_list_.AwakeForStateChange(new_state); 199 awakable_list_.AwakeForStateChange(new_state);
200
165 return MOJO_RESULT_OK; 201 return MOJO_RESULT_OK;
166 } 202 }
167 203
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( 204 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
169 void** buffer, 205 void** buffer,
170 uint32_t* buffer_num_bytes, 206 uint32_t* buffer_num_bytes,
171 MojoWriteDataFlags flags) { 207 MojoWriteDataFlags flags) {
172 lock().AssertAcquired(); 208 lock().AssertAcquired();
173 if (InTwoPhaseWrite()) 209 if (in_two_phase_write_)
174 return MOJO_RESULT_BUSY; 210 return MOJO_RESULT_BUSY;
175 if (error_) 211 if (peer_closed_)
176 return MOJO_RESULT_FAILED_PRECONDITION; 212 return MOJO_RESULT_FAILED_PRECONDITION;
177 213
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. 214 uint32_t max_num_bytes_to_write;
215 void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
216
217 if (max_num_bytes_to_write == 0)
218 return MOJO_RESULT_SHOULD_WAIT;
219
179 if (*buffer_num_bytes == 0) 220 if (*buffer_num_bytes == 0)
180 *buffer_num_bytes = options_.capacity_num_bytes; 221 *buffer_num_bytes = max_num_bytes_to_write;
181 222
182 two_phase_data_.resize(*buffer_num_bytes); 223 // Don't promise more bytes than we have.
183 *buffer = &two_phase_data_[0]; 224 *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes);
184 225
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes 226 two_phase_max_bytes_write_ = *buffer_num_bytes;
186 // we can construct a MessageInTransit here. But then we need to make 227 *buffer = temp_buf;
187 // MessageInTransit support changing its data size later. 228 in_two_phase_write_ = true;
188 229
189 return MOJO_RESULT_OK; 230 return MOJO_RESULT_OK;
190 } 231 }
191 232
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( 233 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
193 uint32_t num_bytes_written) { 234 uint32_t num_bytes_written) {
194 lock().AssertAcquired(); 235 lock().AssertAcquired();
195 if (!InTwoPhaseWrite()) 236 if (!in_two_phase_write_)
196 return MOJO_RESULT_FAILED_PRECONDITION; 237 return MOJO_RESULT_FAILED_PRECONDITION;
197 238
239 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
240 in_two_phase_write_ = false;
241
242 if (num_bytes_written > two_phase_max_bytes_write_ ||
243 num_bytes_written % data_pipe_->GetOptions().element_num_bytes != 0) {
244 return MOJO_RESULT_INVALID_ARGUMENT;
245 }
246
247 data_pipe_->UpdateFromWrite(num_bytes_written);
248
249 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
250 if (!new_state.equals(old_state))
251 awakable_list_.AwakeForStateChange(new_state);
252
198 // Note: Allow successful completion of the two-phase write even if the other 253 // Note: Allow successful completion of the two-phase write even if the other
199 // side has been closed. 254 // side has been closed.
200 MojoResult rv = MOJO_RESULT_OK; 255 // Deal with state changes due to peer being closed in OnError.
201 if (num_bytes_written > two_phase_data_.size() || 256 if (!data_pipe_->NotifyWrite(num_bytes_written))
202 num_bytes_written % options_.element_num_bytes != 0) { 257 peer_closed_ = true;
203 rv = MOJO_RESULT_INVALID_ARGUMENT;
204 } else if (channel_) {
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
206 }
207 258
208 // Two-phase write ended even on failure. 259 return MOJO_RESULT_OK;
209 two_phase_data_.clear();
210 // If we're now writable, we *became* writable (since we weren't writable
211 // during the two-phase write), so awake producer awakables.
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
214 awakable_list_.AwakeForStateChange(new_state);
215
216 return rv;
217 } 260 }
218 261
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() 262 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
220 const { 263 const {
221 lock().AssertAcquired(); 264 lock().AssertAcquired();
222 265
223 HandleSignalsState rv; 266 HandleSignalsState rv;
224 if (!error_) { 267 if (!peer_closed_) {
225 if (!InTwoPhaseWrite()) 268 if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 269 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 270 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
228 } else { 271 } else {
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 272 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
230 } 273 }
274
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
232 return rv; 276 return rv;
233 } 277 }
234 278
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( 279 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
236 Awakable* awakable, 280 Awakable* awakable,
237 MojoHandleSignals signals, 281 MojoHandleSignals signals,
238 uintptr_t context, 282 uintptr_t context,
239 HandleSignalsState* signals_state) { 283 HandleSignalsState* signals_state) {
240 lock().AssertAcquired(); 284 lock().AssertAcquired();
241 if (channel_)
242 channel_->EnsureLazyInitialized();
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 285 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
244 if (state.satisfies(signals)) { 286 if (state.satisfies(signals)) {
245 if (signals_state) 287 if (signals_state)
246 *signals_state = state; 288 *signals_state = state;
247 return MOJO_RESULT_ALREADY_EXISTS; 289 return MOJO_RESULT_ALREADY_EXISTS;
248 } 290 }
249 if (!state.can_satisfy(signals)) { 291 if (!state.can_satisfy(signals)) {
250 if (signals_state) 292 if (signals_state)
251 *signals_state = state; 293 *signals_state = state;
252 return MOJO_RESULT_FAILED_PRECONDITION; 294 return MOJO_RESULT_FAILED_PRECONDITION;
253 } 295 }
254 296
255 awakable_list_.Add(awakable, signals, context); 297 awakable_list_.Add(awakable, signals, context);
256 return MOJO_RESULT_OK; 298 return MOJO_RESULT_OK;
257 } 299 }
258 300
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( 301 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
260 Awakable* awakable, 302 Awakable* awakable,
261 HandleSignalsState* signals_state) { 303 HandleSignalsState* signals_state) {
262 lock().AssertAcquired(); 304 lock().AssertAcquired();
263 awakable_list_.Remove(awakable); 305 awakable_list_.Remove(awakable);
264 if (signals_state) 306 if (signals_state)
265 *signals_state = GetHandleSignalsStateImplNoLock(); 307 *signals_state = GetHandleSignalsStateImplNoLock();
266 } 308 }
267 309
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( 310 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
269 size_t* max_size, 311 size_t* max_size,
270 size_t* max_platform_handles) { 312 size_t* max_platform_handles) {
271 if (!serialized_) 313 data_pipe_->StartSerialize(max_size, max_platform_handles);
272 SerializeInternal();
273
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
275 !serialized_write_buffer_.empty(), max_size,
276 max_platform_handles);
277 } 314 }
278 315
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( 316 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
280 void* destination, 317 void* destination,
281 size_t* actual_size, 318 size_t* actual_size,
282 PlatformHandleVector* platform_handles) { 319 PlatformHandleVector* platform_handles) {
283 ScopedPlatformHandle shared_memory_handle; 320 data_pipe_->EndSerialize(destination, actual_size, platform_handles);
284 size_t shared_memory_size = serialized_write_buffer_.size();
285 if (shared_memory_size) {
286 scoped_refptr<PlatformSharedBuffer> shared_buffer(
287 internal::g_platform_support->CreateSharedBuffer(
288 shared_memory_size));
289 scoped_ptr<PlatformSharedBufferMapping> mapping(
290 shared_buffer->Map(0, shared_memory_size));
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
292 shared_memory_size);
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
294 }
295
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
297 std::move(shared_memory_handle), shared_memory_size,
298 destination, actual_size, platform_handles);
299 CloseImplNoLock(); 321 CloseImplNoLock();
300 return true; 322 return true;
301 } 323 }
302 324
303 void DataPipeProducerDispatcher::TransportStarted() { 325 void DataPipeProducerDispatcher::TransportStarted() {
304 started_transport_.Acquire(); 326 started_transport_.Acquire();
305 } 327 }
306 328
307 void DataPipeProducerDispatcher::TransportEnded() { 329 void DataPipeProducerDispatcher::TransportEnded() {
308 started_transport_.Release(); 330 started_transport_.Release();
309 } 331 }
310 332
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { 333 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
312 lock().AssertAcquired(); 334 lock().AssertAcquired();
313 return InTwoPhaseWrite(); 335 return in_two_phase_write_;
336 }
337
338 bool DataPipeProducerDispatcher::ProcessCommand(
339 const DataPipeCommandHeader& command,
340 ScopedPlatformHandleVectorPtr platform_handles) {
341 // Handles write/read case and shared buffer becoming available case.
342 return data_pipe_->ProcessCommand(command, std::move(platform_handles));
314 } 343 }
315 344
316 void DataPipeProducerDispatcher::OnReadMessage( 345 void DataPipeProducerDispatcher::OnReadMessage(
317 const MessageInTransit::View& message_view, 346 const MessageInTransit::View& message_view,
318 ScopedPlatformHandleVectorPtr platform_handles) { 347 ScopedPlatformHandleVectorPtr platform_handles) {
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; 348 const DataPipeCommandHeader* command =
349 static_cast<const DataPipeCommandHeader*>(message_view.bytes());
350 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
351
352 if (started_transport_.Try()) {
353 // We're not in the middle of being sent.
354
355 // Can get synchronously called back from RawChannel::Init in InitOnIO if
356 // there was initial data. InitOnIO locks, so don't lock twice.
357 scoped_ptr<base::AutoLock> locker;
358 if (!calling_init_) {
359 locker.reset(new base::AutoLock(lock()));
360 }
361
362 if (ProcessCommand(*command, std::move(platform_handles))) {
363 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
364 }
365 started_transport_.Release();
366 } else {
367 // DataPipe::Serialize calls ReleaseHandle on the channel, which
368 // acquires RawChannel's read_lock_. The function OnReadMessage is only
369 // called while read_lock_ is acquired, and not after ReleaseHandle has been
370 // called. This means this function will only be called before Serialize
371 // calls ReleaseHandle, meaning the serialisation will not have started yet.
372 // We only notify awakables if we're not in the process of being
373 // transported.
374 ProcessCommand(*command, std::move(platform_handles));
375 }
320 } 376 }
321 377
322 void DataPipeProducerDispatcher::OnError(Error error) { 378 void DataPipeProducerDispatcher::OnError(Error error) {
323 switch (error) { 379 switch (error) {
324 case ERROR_READ_BROKEN:
325 case ERROR_READ_BAD_MESSAGE:
326 case ERROR_READ_UNKNOWN:
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
328 break;
329 case ERROR_READ_SHUTDOWN: 380 case ERROR_READ_SHUTDOWN:
330 // The other side was cleanly closed, so this isn't actually an error. 381 // The other side was cleanly closed, so this isn't actually an error.
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; 382 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
332 break; 383 break;
384 case ERROR_READ_BROKEN:
385 LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
386 break;
387 case ERROR_READ_BAD_MESSAGE:
388 // Receiving a bad message means either a bug, data corruption, or
389 // malicious attack (probably due to some other bug).
390 LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
391 << "message)";
392 break;
393 case ERROR_READ_UNKNOWN:
394 LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
395 break;
333 case ERROR_WRITE: 396 case ERROR_WRITE:
334 // Write errors are slightly notable: they probably shouldn't happen under 397 LOG(ERROR) << "DataPipeProducerDispatcher write error";
335 // normal operation (but maybe the other side crashed).
336 LOG(WARNING) << "DataPipeProducerDispatcher write error";
337 break; 398 break;
338 } 399 }
339 400
340 error_ = true; 401 peer_closed_ = true;
341 if (started_transport_.Try()) { 402 if (started_transport_.Try()) {
342 base::AutoLock locker(lock()); 403 base::AutoLock locker(lock());
343 // We can get two OnError callbacks before the post task below completes. 404 // We can get two OnError callbacks before the post task below completes.
344 // Although RawChannel still has a pointer to this object until Shutdown is 405 // Although RawChannel still has a pointer to this object until Shutdown is
345 // called, that is safe since this class always does a PostTask to the IO 406 // called, that is safe since this class always does a PostTask to the IO
346 // thread to self destruct. 407 // thread to self destruct.
347 if (channel_) { 408 if (data_pipe_->GetChannel()) {
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 409 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
349 channel_->Shutdown(); 410 data_pipe_->Shutdown();
350 channel_ = nullptr;
351 } 411 }
352 started_transport_.Release(); 412 started_transport_.Release();
353 } else { 413 } else {
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. 414 // We must be waiting to call ReleaseHandle. It will call Shutdown.
355 } 415 }
356 } 416 }
357 417
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
359 return !two_phase_data_.empty();
360 }
361
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
363 const void* elements,
364 uint32_t num_bytes) {
365 // The maximum amount of data to send per message (make it a multiple of the
366 // element size.
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
369 DCHECK_GT(max_message_num_bytes, 0u);
370
371 uint32_t offset = 0;
372 while (offset < num_bytes) {
373 uint32_t message_num_bytes =
374 std::min(static_cast<uint32_t>(max_message_num_bytes),
375 num_bytes - offset);
376 scoped_ptr<MessageInTransit> message(new MessageInTransit(
377 MessageInTransit::Type::MESSAGE, message_num_bytes,
378 static_cast<const char*>(elements) + offset));
379 if (!channel_->WriteMessage(std::move(message))) {
380 error_ = true;
381 return false;
382 }
383
384 offset += message_num_bytes;
385 }
386
387 return true;
388 }
389
390 void DataPipeProducerDispatcher::SerializeInternal() {
391 // We need to stop watching handle immediately, even though not on IO thread,
392 // so that other messages aren't read after this.
393 if (channel_) {
394 std::vector<char> serialized_read_buffer;
395 std::vector<int> fds;
396 bool write_error = false;
397 serialized_platform_handle_ = channel_->ReleaseHandle(
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
399 &write_error);
400 CHECK(serialized_read_buffer.empty());
401 CHECK(fds.empty());
402 if (write_error)
403 serialized_platform_handle_.reset();
404 channel_ = nullptr;
405 }
406 serialized_ = true;
407 }
408
409 } // namespace edk 418 } // namespace edk
410 } // namespace mojo 419 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698