Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(537)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h"
15 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
16 #include "mojo/edk/system/transport_data.h" 17 #include "mojo/edk/system/transport_data.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace system { 20 namespace system {
20 21
21 const size_t kReadSize = 4096; 22 const size_t kReadSize = 4096;
22 23
23 // RawChannel::ReadBuffer ------------------------------------------------------ 24 // RawChannel::ReadBuffer ------------------------------------------------------
24 25
25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
26 } 27 }
27 28
28 RawChannel::ReadBuffer::~ReadBuffer() { 29 RawChannel::ReadBuffer::~ReadBuffer() {
29 } 30 }
30 31
31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
33 *addr = &buffer_[0] + num_valid_bytes_; 34 *addr = &buffer_[0] + num_valid_bytes_;
34 *size = kReadSize; 35 *size = kReadSize;
35 } 36 }
36 37
37 // RawChannel::WriteBuffer ----------------------------------------------------- 38 // RawChannel::WriteBuffer -----------------------------------------------------
38 39
39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) 40 RawChannel::WriteBuffer::WriteBuffer()
40 : serialized_platform_handle_size_(serialized_platform_handle_size), 41 : serialized_platform_handle_size_(0),
41 platform_handles_offset_(0), 42 platform_handles_offset_(0),
42 data_offset_(0) { 43 data_offset_(0) {
43 } 44 }
44 45
45 RawChannel::WriteBuffer::~WriteBuffer() { 46 RawChannel::WriteBuffer::~WriteBuffer() {
46 message_queue_.Clear(); 47 message_queue_.Clear();
47 } 48 }
48 49
49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
50 if (message_queue_.IsEmpty()) 51 if (message_queue_.IsEmpty())
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 size_t transport_data_buffer_size = 109 size_t transport_data_buffer_size =
109 message->transport_data() ? message->transport_data()->buffer_size() : 0; 110 message->transport_data() ? message->transport_data()->buffer_size() : 0;
110 111
111 if (!transport_data_buffer_size) { 112 if (!transport_data_buffer_size) {
112 // Only write from the main buffer. 113 // Only write from the main buffer.
113 DCHECK_LT(data_offset_, message->main_buffer_size()); 114 DCHECK_LT(data_offset_, message->main_buffer_size());
114 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 115 DCHECK_LE(bytes_to_write, message->main_buffer_size());
115 Buffer buffer = { 116 Buffer buffer = {
116 static_cast<const char*>(message->main_buffer()) + data_offset_, 117 static_cast<const char*>(message->main_buffer()) + data_offset_,
117 bytes_to_write}; 118 bytes_to_write};
119
118 buffers->push_back(buffer); 120 buffers->push_back(buffer);
119 return; 121 return;
120 } 122 }
121 123
122 if (data_offset_ >= message->main_buffer_size()) { 124 if (data_offset_ >= message->main_buffer_size()) {
123 // Only write from the transport data buffer. 125 // Only write from the transport data buffer.
124 DCHECK_LT(data_offset_ - message->main_buffer_size(), 126 DCHECK_LT(data_offset_ - message->main_buffer_size(),
125 transport_data_buffer_size); 127 transport_data_buffer_size);
126 DCHECK_LE(bytes_to_write, transport_data_buffer_size); 128 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
127 Buffer buffer = { 129 Buffer buffer = {
128 static_cast<const char*>(message->transport_data()->buffer()) + 130 static_cast<const char*>(message->transport_data()->buffer()) +
129 (data_offset_ - message->main_buffer_size()), 131 (data_offset_ - message->main_buffer_size()),
130 bytes_to_write}; 132 bytes_to_write};
133
131 buffers->push_back(buffer); 134 buffers->push_back(buffer);
132 return; 135 return;
133 } 136 }
134 137
135 // TODO(vtl): We could actually send out buffers from multiple messages, with 138 // TODO(vtl): We could actually send out buffers from multiple messages, with
136 // the "stopping" condition being reaching a message with platform handles 139 // the "stopping" condition being reaching a message with platform handles
137 // attached. 140 // attached.
138 141
139 // Write from both buffers. 142 // Write from both buffers.
140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + 143 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
141 transport_data_buffer_size); 144 transport_data_buffer_size);
142 Buffer buffer1 = { 145 Buffer buffer1 = {
143 static_cast<const char*>(message->main_buffer()) + data_offset_, 146 static_cast<const char*>(message->main_buffer()) + data_offset_,
144 message->main_buffer_size() - data_offset_}; 147 message->main_buffer_size() - data_offset_};
145 buffers->push_back(buffer1); 148 buffers->push_back(buffer1);
146 Buffer buffer2 = { 149 Buffer buffer2 = {
147 static_cast<const char*>(message->transport_data()->buffer()), 150 static_cast<const char*>(message->transport_data()->buffer()),
148 transport_data_buffer_size}; 151 transport_data_buffer_size};
149 buffers->push_back(buffer2); 152 buffers->push_back(buffer2);
150 } 153 }
151 154
152 // RawChannel ------------------------------------------------------------------ 155 // RawChannel ------------------------------------------------------------------
153 156
154 RawChannel::RawChannel() 157 RawChannel::RawChannel()
155 : message_loop_for_io_(nullptr), 158 : message_loop_for_io_(nullptr),
159 set_on_shutdown_(nullptr),
156 delegate_(nullptr), 160 delegate_(nullptr),
157 set_on_shutdown_(nullptr), 161 write_ready_(false),
158 write_stopped_(false), 162 write_stopped_(false),
163 debug_started_sending_(false),
164 error_occurred_(false),
159 weak_ptr_factory_(this) { 165 weak_ptr_factory_(this) {
166 read_buffer_.reset(new ReadBuffer);
167 write_buffer_.reset(new WriteBuffer());
160 } 168 }
161 169
162 RawChannel::~RawChannel() { 170 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_); 171 DCHECK(!read_buffer_);
164 DCHECK(!write_buffer_); 172 DCHECK(!write_buffer_);
165 173
166 // No need to take the |write_lock_| here -- if there are still weak pointers 174 // Only want to decrement counter if Init was called.
167 // outstanding, then we're hosed anyway (since we wouldn't be able to 175 if (message_loop_for_io_) {
168 // invalidate them cleanly, since we might not be on the I/O thread). 176 // No need to take the |write_lock_| here -- if there are still weak
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 177 // pointers outstanding, then we're hosed anyway (since we wouldn't be able
178 // to invalidate them cleanly, since we might not be on the I/O thread).
179 // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
180 embedder::internal::ChannelShutdown();
181 }
170 } 182 }
171 183
172 void RawChannel::Init(Delegate* delegate) { 184 void RawChannel::Init(Delegate* delegate) {
185 embedder::internal::ChannelStarted();
173 DCHECK(delegate); 186 DCHECK(delegate);
174 187
188 base::AutoLock read_locker(read_lock_);
189 // solves race where initialiing on io thread while main thread is serializing
190 // this channel and releases handle.
191 base::AutoLock locker(write_lock_);
192
175 DCHECK(!delegate_); 193 DCHECK(!delegate_);
176 delegate_ = delegate; 194 delegate_ = delegate;
177 195
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 196 //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
179 DCHECK(!message_loop_for_io_); 197 DCHECK(!message_loop_for_io_);
180 message_loop_for_io_ = 198 message_loop_for_io_ =
181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 199 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
182 200
183 // No need to take the lock. No one should be using us yet. 201 OnInit();
184 DCHECK(!read_buffer_);
185 read_buffer_.reset(new ReadBuffer);
186 DCHECK(!write_buffer_);
187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
188 202
189 OnInit(); 203 // Although this means that we can call back sync into the caller, that's
204 // easier than posting a task to do this, because there might also be pending
205 // read calls and we can't modify the buffer.
yzshen1 2015/09/23 22:47:08 I haven't understood this part yet: before Schedul
206 if (read_buffer_->num_valid_bytes()) {
207 // We had serialized read buffer data through SetInitialReadBufferData call.
208 // Make sure we read messages out of it now, otherwise the delegate won't
209 // get notified if no other data gets written to the pipe.
210 bool did_dispatch_message = false;
211 bool stop_dispatching = false;
212 DispatchMessages(&did_dispatch_message, &stop_dispatching);
213 }
190 214
191 IOResult io_result = ScheduleRead(); 215 IOResult io_result = ScheduleRead();
192 if (io_result != IO_PENDING) { 216 if (io_result != IO_PENDING) {
193 // This will notify the delegate about the read failure. Although we're on 217 // This will notify the delegate about the read failure. Although we're on
194 // the I/O thread, don't call it in the nested context. 218 // the I/O thread, don't call it in the nested context.
195 message_loop_for_io_->PostTask( 219 message_loop_for_io_->PostTask(
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, 220 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); 221 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
198 } 222 }
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying 223 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
200 // the delegate), not an initialization failure. 224 // the delegate), not an initialization failure.
225
226 write_ready_ = true;
227 write_buffer_->serialized_platform_handle_size_ =
228 GetSerializedPlatformHandleSize();
229 if (!write_buffer_->message_queue_.IsEmpty())
230 SendQueuedMessagesNoLock();
201 } 231 }
202 232
203 void RawChannel::Shutdown() { 233 void RawChannel::Shutdown() {
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 234 weak_ptr_factory_.InvalidateWeakPtrs();
205 235
236 // Normally, we want to flush any pending writes before shutting down. This
237 // doesn't apply when 1) we don't have a handle (for obvious reasons) or
238 // 2) when the other side already quit and asked us to close the handle to
239 // ensure that we read everything out of the pipe first.
240 if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
241 {
242 base::AutoLock read_locker(read_lock_);
243 base::AutoLock locker(write_lock_);
244 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
245 }
246 delete this;
247 return;
248 }
249
250 base::AutoLock read_locker(read_lock_);
206 base::AutoLock locker(write_lock_); 251 base::AutoLock locker(write_lock_);
252 DCHECK(read_buffer_->num_valid_bytes() == 0) <<
253 "RawChannel::Shutdown called but there is pending data to be read";
207 254
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) 255 // happens on shutdown if didn't call init when doing createduplicate
209 << "Shutting down RawChannel with write buffer nonempty"; 256 if (message_loop_for_io()) {
257 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
258 }
210 259
211 // Reset the delegate so that it won't receive further calls. 260 // Reset the delegate so that it won't receive further calls.
212 delegate_ = nullptr; 261 delegate_ = nullptr;
213 if (set_on_shutdown_) { 262 if (set_on_shutdown_) {
214 *set_on_shutdown_ = true; 263 *set_on_shutdown_ = true;
215 set_on_shutdown_ = nullptr; 264 set_on_shutdown_ = nullptr;
216 } 265 }
217 write_stopped_ = true;
218 weak_ptr_factory_.InvalidateWeakPtrs();
219 266
220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 267 // TODO(jam): probably remove this since it doesn't make sense now that we
268 // wait and flush pending messages.
269 // write_stopped_ = true;
270
271
272 bool empty = write_buffer_->message_queue_.IsEmpty();
273
274 // We may have no messages to write. However just because our end of the pipe
275 // wrote everything doesn't mean that the other end read it. We don't want to
276 // call FlushFileBuffers since a) that only works for server end of the pipe,
277 // and b) it pauses this thread (which can block a process on another, or
278 // worse hang if both pipes are in the same process).
279 scoped_ptr<MessageInTransit> quit_message(new MessageInTransit(
280 MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr));
281 EnqueueMessageNoLock(quit_message.Pass());
282
283 if (empty)
284 SendQueuedMessagesNoLock();
285 }
286
287 embedder::ScopedPlatformHandle RawChannel::ReleaseHandle(
288 std::vector<char>* read_buffer) {
289 //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this;
290
291 embedder::ScopedPlatformHandle rv;
292 {
293 base::AutoLock read_locker(read_lock_);
294 base::AutoLock locker(write_lock_);
295 rv = ReleaseHandleNoLock(read_buffer);
296
297 // TODO(jam); if we use these, use nolock versions of these methods that are
298 // copied.
299 if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) {
300 NOTREACHED() << "TODO(JAM)";
301 }
302
303 delegate_ = nullptr;
304
305 // The Unretained is safe because above cancelled IO so we shouldn't get any
306 // channel errors.
307 // |message_loop_for_io_| might not be set yet
308 embedder::internal::g_io_thread_task_runner->PostTask(
309 FROM_HERE,
310 base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
311 }
312
313 return rv;
221 } 314 }
222 315
223 // Reminder: This must be thread-safe. 316 // Reminder: This must be thread-safe.
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 317 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
225 DCHECK(message); 318 DCHECK(message);
226
227 base::AutoLock locker(write_lock_); 319 base::AutoLock locker(write_lock_);
320 DCHECK(!debug_started_sending_);
228 if (write_stopped_) 321 if (write_stopped_)
229 return false; 322 return false;
230 323
231 if (!write_buffer_->message_queue_.IsEmpty()) { 324 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
232 EnqueueMessageNoLock(message.Pass()); 325 EnqueueMessageNoLock(message.Pass());
233 return true; 326 if (queue_was_empty && write_ready_)
234 } 327 SendQueuedMessagesNoLock();
235 328
236 EnqueueMessageNoLock(message.Pass()); 329 return true;
330 }
331
332 void RawChannel::SendQueuedMessagesNoLock() {
237 DCHECK_EQ(write_buffer_->data_offset_, 0u); 333 DCHECK_EQ(write_buffer_->data_offset_, 0u);
238 334
239 size_t platform_handles_written = 0; 335 size_t platform_handles_written = 0;
240 size_t bytes_written = 0; 336 size_t bytes_written = 0;
241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 337 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
242 if (io_result == IO_PENDING) 338 if (io_result == IO_PENDING)
243 return true; 339 return;
244 340
245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, 341 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
246 bytes_written); 342 bytes_written);
247 if (!result) { 343 if (!result) {
248 // Even if we're on the I/O thread, don't call |OnError()| in the nested 344 // Even if we're on the I/O thread, don't call |OnError()| in the nested
249 // context. 345 // context.
250 message_loop_for_io_->PostTask( 346 message_loop_for_io_->PostTask(
251 FROM_HERE, 347 FROM_HERE,
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), 348 base::Bind(&RawChannel::LockAndCallOnError,
349 weak_ptr_factory_.GetWeakPtr(),
253 Delegate::ERROR_WRITE)); 350 Delegate::ERROR_WRITE));
254 } 351 }
255
256 return result;
257 } 352 }
258 353
259 // Reminder: This must be thread-safe. 354 // Reminder: This must be thread-safe.
260 bool RawChannel::IsWriteBufferEmpty() { 355 bool RawChannel::IsWriteBufferEmpty() {
261 base::AutoLock locker(write_lock_); 356 base::AutoLock locker(write_lock_);
262 return write_buffer_->message_queue_.IsEmpty(); 357 return write_buffer_->message_queue_.IsEmpty();
263 } 358 }
264 359
360 bool RawChannel::IsReadBufferEmpty() {
361 base::AutoLock locker(read_lock_);
362 return read_buffer_->num_valid_bytes_ != 0;
363 }
364
365 void RawChannel::SetInitialReadBufferData(char* data, size_t size) {
366 base::AutoLock locker(read_lock_);
367 // TODO(jam): copy power of 2 algorithm below? or share.
368 read_buffer_->buffer_.resize(size+kReadSize);
369 memcpy(&read_buffer_->buffer_[0], data, size);
370 read_buffer_->num_valid_bytes_ = size;
371 }
372
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 373 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 374 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
267 375
376 base::AutoLock locker(read_lock_);
377
268 // Keep reading data in a loop, and dispatch messages if enough data is 378 // Keep reading data in a loop, and dispatch messages if enough data is
269 // received. Exit the loop if any of the following happens: 379 // received. Exit the loop if any of the following happens:
270 // - one or more messages were dispatched; 380 // - one or more messages were dispatched;
271 // - the last read failed, was a partial read or would block; 381 // - the last read failed, was a partial read or would block;
272 // - |Shutdown()| was called. 382 // - |Shutdown()| was called.
273 do { 383 do {
274 switch (io_result) { 384 switch (io_result) {
275 case IO_SUCCEEDED: 385 case IO_SUCCEEDED:
276 break; 386 break;
277 case IO_FAILED_SHUTDOWN: 387 case IO_FAILED_SHUTDOWN:
278 case IO_FAILED_BROKEN: 388 case IO_FAILED_BROKEN:
279 case IO_FAILED_UNKNOWN: 389 case IO_FAILED_UNKNOWN:
280 CallOnError(ReadIOResultToError(io_result)); 390 CallOnError(ReadIOResultToError(io_result));
281 return; // |this| may have been destroyed in |CallOnError()|. 391 return; // |this| may have been destroyed in |CallOnError()|.
282 case IO_PENDING: 392 case IO_PENDING:
283 NOTREACHED(); 393 NOTREACHED();
284 return; 394 return;
285 } 395 }
286 396
287 read_buffer_->num_valid_bytes_ += bytes_read; 397 read_buffer_->num_valid_bytes_ += bytes_read;
288 398
289 // Dispatch all the messages that we can. 399 // Dispatch all the messages that we can.
290 bool did_dispatch_message = false; 400 bool did_dispatch_message = false;
291 // Tracks the offset of the first undispatched message in |read_buffer_|. 401 bool stop_dispatching = false;
292 // Currently, we copy data to ensure that this is zero at the beginning. 402 DispatchMessages(&did_dispatch_message, &stop_dispatching);
293 size_t read_buffer_start = 0; 403 if (stop_dispatching)
294 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 404 return;
295 size_t message_size;
296 // Note that we rely on short-circuit evaluation here:
297 // - |read_buffer_start| may be an invalid index into
298 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
299 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
301 // next read).
302 // TODO(vtl): Validate that |message_size| is sane.
303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
304 &read_buffer_->buffer_[read_buffer_start],
305 remaining_bytes, &message_size) &&
306 remaining_bytes >= message_size) {
307 MessageInTransit::View message_view(
308 message_size, &read_buffer_->buffer_[read_buffer_start]);
309 DCHECK_EQ(message_view.total_size(), message_size);
310
311 const char* error_message = nullptr;
312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
313 &error_message)) {
314 DCHECK(error_message);
315 LOG(ERROR) << "Received invalid message: " << error_message;
316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
317 return; // |this| may have been destroyed in |CallOnError()|.
318 }
319
320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
321 if (!OnReadMessageForRawChannel(message_view)) {
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return; // |this| may have been destroyed in |CallOnError()|.
324 }
325 } else {
326 embedder::ScopedPlatformHandleVectorPtr platform_handles;
327 if (message_view.transport_data_buffer()) {
328 size_t num_platform_handles;
329 const void* platform_handle_table;
330 TransportData::GetPlatformHandleTable(
331 message_view.transport_data_buffer(), &num_platform_handles,
332 &platform_handle_table);
333
334 if (num_platform_handles > 0) {
335 platform_handles =
336 GetReadPlatformHandles(num_platform_handles,
337 platform_handle_table).Pass();
338 if (!platform_handles) {
339 LOG(ERROR) << "Invalid number of platform handles received";
340 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
341 return; // |this| may have been destroyed in |CallOnError()|.
342 }
343 }
344 }
345
346 // TODO(vtl): In the case that we aren't expecting any platform handles,
347 // for the POSIX implementation, we should confirm that none are stored.
348
349 // Dispatch the message.
350 // Detect the case when |Shutdown()| is called; subsequent destruction
351 // is also permitted then.
352 bool shutdown_called = false;
353 DCHECK(!set_on_shutdown_);
354 set_on_shutdown_ = &shutdown_called;
355 DCHECK(delegate_);
356 delegate_->OnReadMessage(message_view, platform_handles.Pass());
357 if (shutdown_called)
358 return;
359 set_on_shutdown_ = nullptr;
360 }
361
362 did_dispatch_message = true;
363
364 // Update our state.
365 read_buffer_start += message_size;
366 remaining_bytes -= message_size;
367 }
368
369 if (read_buffer_start > 0) {
370 // Move data back to start.
371 read_buffer_->num_valid_bytes_ = remaining_bytes;
372 if (read_buffer_->num_valid_bytes_ > 0) {
373 memmove(&read_buffer_->buffer_[0],
374 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
375 }
376 read_buffer_start = 0;
377 }
378 405
379 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 406 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
380 kReadSize) { 407 kReadSize) {
381 // Use power-of-2 buffer sizes. 408 // Use power-of-2 buffer sizes.
382 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 409 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
383 // maximum message size to whatever extent necessary). 410 // maximum message size to whatever extent necessary).
384 // TODO(vtl): We may often be able to peek at the header and get the real 411 // TODO(vtl): We may often be able to peek at the header and get the real
385 // required extra space (which may be much bigger than |kReadSize|). 412 // required extra space (which may be much bigger than |kReadSize|).
386 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 413 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
387 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 414 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
(...skipping 30 matching lines...) Expand all
418 if (write_stopped_) { 445 if (write_stopped_) {
419 NOTREACHED(); 446 NOTREACHED();
420 return; 447 return;
421 } 448 }
422 449
423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, 450 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
424 bytes_written); 451 bytes_written);
425 } 452 }
426 453
427 if (did_fail) { 454 if (did_fail) {
455 base::AutoLock locker(read_lock_);
428 CallOnError(Delegate::ERROR_WRITE); 456 CallOnError(Delegate::ERROR_WRITE);
429 return; // |this| may have been destroyed in |CallOnError()|. 457 return; // |this| may have been destroyed in |CallOnError()|.
430 } 458 }
431 } 459 }
432 460
433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 461 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
434 write_lock_.AssertAcquired(); 462 write_lock_.AssertAcquired();
463 DCHECK(HandleForDebuggingNoLock().is_valid());
435 write_buffer_->message_queue_.AddMessage(message.Pass()); 464 write_buffer_->message_queue_.AddMessage(message.Pass());
436 } 465 }
437 466
438 bool RawChannel::OnReadMessageForRawChannel( 467 bool RawChannel::OnReadMessageForRawChannel(
439 const MessageInTransit::View& message_view) { 468 const MessageInTransit::View& message_view) {
469 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
470 base::MessageLoop::current()->PostTask(
471 FROM_HERE,
472 base::Bind(&RawChannel::HandleQuitMessage, base::Unretained(this)));
473 return true;
474 }
475
440 // No non-implementation specific |RawChannel| control messages. 476 // No non-implementation specific |RawChannel| control messages.
441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() 477 LOG(ERROR) << "Invalid control message (type " << message_view.type()
442 << ")"; 478 << ")";
443 return false; 479 return false;
444 } 480 }
445 481
446 // static
447 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( 482 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
448 IOResult io_result) { 483 IOResult io_result) {
449 switch (io_result) { 484 switch (io_result) {
450 case IO_FAILED_SHUTDOWN: 485 case IO_FAILED_SHUTDOWN:
451 return Delegate::ERROR_READ_SHUTDOWN; 486 return Delegate::ERROR_READ_SHUTDOWN;
452 case IO_FAILED_BROKEN: 487 case IO_FAILED_BROKEN:
453 return Delegate::ERROR_READ_BROKEN; 488 return Delegate::ERROR_READ_BROKEN;
454 case IO_FAILED_UNKNOWN: 489 case IO_FAILED_UNKNOWN:
455 return Delegate::ERROR_READ_UNKNOWN; 490 return Delegate::ERROR_READ_UNKNOWN;
456 case IO_SUCCEEDED: 491 case IO_SUCCEEDED:
457 case IO_PENDING: 492 case IO_PENDING:
458 NOTREACHED(); 493 NOTREACHED();
459 break; 494 break;
460 } 495 }
461 return Delegate::ERROR_READ_UNKNOWN; 496 return Delegate::ERROR_READ_UNKNOWN;
462 } 497 }
463 498
464 void RawChannel::CallOnError(Delegate::Error error) { 499 void RawChannel::CallOnError(Delegate::Error error) {
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 500 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 501 read_lock_.AssertAcquired();
502 error_occurred_ = true;
467 if (delegate_) { 503 if (delegate_) {
468 delegate_->OnError(error); 504 delegate_->OnError(error);
469 return; // |this| may have been destroyed in |OnError()|. 505 } else {
506 // We depend on delegate to delete since it could be waiting to call
507 // ReleaseHandle.
508 base::MessageLoop::current()->PostTask(
509 FROM_HERE,
510 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
470 } 511 }
471 } 512 }
472 513
514 void RawChannel::LockAndCallOnError(Delegate::Error error) {
515 base::AutoLock locker(read_lock_);
516 CallOnError(error);
517 }
518
473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 519 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
474 size_t platform_handles_written, 520 size_t platform_handles_written,
475 size_t bytes_written) { 521 size_t bytes_written) {
476 write_lock_.AssertAcquired(); 522 write_lock_.AssertAcquired();
477 523
478 DCHECK(!write_stopped_); 524 DCHECK(!write_stopped_);
479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 525 DCHECK(!write_buffer_->message_queue_.IsEmpty());
480 526
481 if (io_result == IO_SUCCEEDED) { 527 if (io_result == IO_SUCCEEDED) {
482 write_buffer_->platform_handles_offset_ += platform_handles_written; 528 write_buffer_->platform_handles_offset_ += platform_handles_written;
(...skipping 18 matching lines...) Expand all
501 DCHECK_NE(io_result, IO_SUCCEEDED); 547 DCHECK_NE(io_result, IO_SUCCEEDED);
502 } 548 }
503 549
504 write_stopped_ = true; 550 write_stopped_ = true;
505 write_buffer_->message_queue_.Clear(); 551 write_buffer_->message_queue_.Clear();
506 write_buffer_->platform_handles_offset_ = 0; 552 write_buffer_->platform_handles_offset_ = 0;
507 write_buffer_->data_offset_ = 0; 553 write_buffer_->data_offset_ = 0;
508 return false; 554 return false;
509 } 555 }
510 556
557 void RawChannel::HandleQuitMessage() {
558 base::AutoLock locker(read_lock_);
559 CallOnError(Delegate::ERROR_READ_SHUTDOWN);
560 }
561
562 void RawChannel::DispatchMessages(bool* did_dispatch_message,
563 bool* stop_dispatching) {
564 *did_dispatch_message = false;
565 *stop_dispatching = false;
566 // Tracks the offset of the first undispatched message in |read_buffer_|.
567 // Currently, we copy data to ensure that this is zero at the beginning.
568 size_t read_buffer_start = 0;
569 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
570 size_t message_size;
571 // Note that we rely on short-circuit evaluation here:
572 // - |read_buffer_start| may be an invalid index into
573 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
574 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
575 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
576 // next read).
577 // TODO(vtl): Validate that |message_size| is sane.
578 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
579 &read_buffer_->buffer_[read_buffer_start],
580 remaining_bytes, &message_size) &&
581 remaining_bytes >= message_size) {
582 MessageInTransit::View message_view(
583 message_size, &read_buffer_->buffer_[read_buffer_start]);
584 DCHECK_EQ(message_view.total_size(), message_size);
585
586 const char* error_message = nullptr;
587 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
588 &error_message)) {
589 DCHECK(error_message);
590 LOG(ERROR) << "Received invalid message: " << error_message;
591 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
592 *stop_dispatching = true;
593 return; // |this| may have been destroyed in |CallOnError()|.
594 }
595
596 if (message_view.type() != MessageInTransit::Type::MESSAGE) {
597 if (!OnReadMessageForRawChannel(message_view)) {
598 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
599 *stop_dispatching = true;
600 return; // |this| may have been destroyed in |CallOnError()|.
601 }
602 } else {
603 embedder::ScopedPlatformHandleVectorPtr platform_handles;
604 if (message_view.transport_data_buffer()) {
605 size_t num_platform_handles;
606 const void* platform_handle_table;
607 TransportData::GetPlatformHandleTable(
608 message_view.transport_data_buffer(), &num_platform_handles,
609 &platform_handle_table);
610
611 if (num_platform_handles > 0) {
612 platform_handles =
613 GetReadPlatformHandles(num_platform_handles,
614 platform_handle_table).Pass();
615 if (!platform_handles) {
616 LOG(ERROR) << "Invalid number of platform handles received";
617 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
618 *stop_dispatching = true;
619 return; // |this| may have been destroyed in |CallOnError()|.
620 }
621 }
622 }
623
624 // TODO(vtl): In the case that we aren't expecting any platform handles,
625 // for the POSIX implementation, we should confirm that none are stored.
626
627 // Dispatch the message.
628 // Detect the case when |Shutdown()| is called; subsequent destruction
629 // is also permitted then.
630 bool shutdown_called = false;
631 DCHECK(!set_on_shutdown_);
632 set_on_shutdown_ = &shutdown_called;
633 // Note: it's valid to get here without a delegate. i.e. after Shutdown
634 // is called, if this object still has a valid handle we keep it alive
635 // until the other side closes it in response to the RAW_CHANNEL_QUIT
636 // message. In the meantime the sender could have sent us a message.
637 if (delegate_)
638 delegate_->OnReadMessage(message_view, platform_handles.Pass());
639 if (shutdown_called) {
640 *stop_dispatching = true;
641 return;
642 }
643 set_on_shutdown_ = nullptr;
644 }
645
646 *did_dispatch_message = true;
647
648 // Update our state.
649 read_buffer_start += message_size;
650 remaining_bytes -= message_size;
651 }
652
653 if (read_buffer_start > 0) {
654 // Move data back to start.
655 read_buffer_->num_valid_bytes_ = remaining_bytes;
656 if (read_buffer_->num_valid_bytes_ > 0) {
657 memmove(&read_buffer_->buffer_[0],
658 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
659 }
660 read_buffer_start = 0;
661 }
662 }
663
511 } // namespace system 664 } // namespace system
512 } // namespace mojo 665 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698