Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more cleanup Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "third_party/mojo/src/mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop.h"
15 #include "third_party/mojo/src/mojo/edk/system/message_in_transit.h" 15 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "third_party/mojo/src/mojo/edk/system/transport_data.h" 16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/transport_data.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace system { 20 namespace edk {
20 21
21 const size_t kReadSize = 4096; 22 const size_t kReadSize = 4096;
22 23
23 // RawChannel::ReadBuffer ------------------------------------------------------ 24 // RawChannel::ReadBuffer ------------------------------------------------------
24 25
25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
26 } 27 }
27 28
28 RawChannel::ReadBuffer::~ReadBuffer() { 29 RawChannel::ReadBuffer::~ReadBuffer() {
29 } 30 }
30 31
31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
33 *addr = &buffer_[0] + num_valid_bytes_; 34 *addr = &buffer_[0] + num_valid_bytes_;
34 *size = kReadSize; 35 *size = kReadSize;
35 } 36 }
36 37
37 // RawChannel::WriteBuffer ----------------------------------------------------- 38 // RawChannel::WriteBuffer -----------------------------------------------------
38 39
39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) 40 RawChannel::WriteBuffer::WriteBuffer()
40 : serialized_platform_handle_size_(serialized_platform_handle_size), 41 : serialized_platform_handle_size_(0),
41 platform_handles_offset_(0), 42 platform_handles_offset_(0),
42 data_offset_(0) { 43 data_offset_(0) {
43 } 44 }
44 45
45 RawChannel::WriteBuffer::~WriteBuffer() { 46 RawChannel::WriteBuffer::~WriteBuffer() {
46 message_queue_.Clear(); 47 message_queue_.Clear();
47 } 48 }
48 49
49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
50 if (message_queue_.IsEmpty()) 51 if (message_queue_.IsEmpty())
51 return false; 52 return false;
52 53
53 const TransportData* transport_data = 54 const TransportData* transport_data =
54 message_queue_.PeekMessage()->transport_data(); 55 message_queue_.PeekMessage()->transport_data();
55 if (!transport_data) 56 if (!transport_data)
56 return false; 57 return false;
57 58
58 const embedder::PlatformHandleVector* all_platform_handles = 59 const PlatformHandleVector* all_platform_handles =
59 transport_data->platform_handles(); 60 transport_data->platform_handles();
60 if (!all_platform_handles) { 61 if (!all_platform_handles) {
61 DCHECK_EQ(platform_handles_offset_, 0u); 62 DCHECK_EQ(platform_handles_offset_, 0u);
62 return false; 63 return false;
63 } 64 }
64 if (platform_handles_offset_ >= all_platform_handles->size()) { 65 if (platform_handles_offset_ >= all_platform_handles->size()) {
65 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); 66 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
66 return false; 67 return false;
67 } 68 }
68 69
69 return true; 70 return true;
70 } 71 }
71 72
72 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( 73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
73 size_t* num_platform_handles, 74 size_t* num_platform_handles,
74 embedder::PlatformHandle** platform_handles, 75 PlatformHandle** platform_handles,
75 void** serialization_data) { 76 void** serialization_data) {
76 DCHECK(HavePlatformHandlesToSend()); 77 DCHECK(HavePlatformHandlesToSend());
77 78
78 MessageInTransit* message = message_queue_.PeekMessage(); 79 MessageInTransit* message = message_queue_.PeekMessage();
79 TransportData* transport_data = message->transport_data(); 80 TransportData* transport_data = message->transport_data();
80 embedder::PlatformHandleVector* all_platform_handles = 81 PlatformHandleVector* all_platform_handles =
81 transport_data->platform_handles(); 82 transport_data->platform_handles();
82 *num_platform_handles = 83 *num_platform_handles =
83 all_platform_handles->size() - platform_handles_offset_; 84 all_platform_handles->size() - platform_handles_offset_;
84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; 85 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
85 86
86 if (serialized_platform_handle_size_ > 0) { 87 if (serialized_platform_handle_size_ > 0) {
87 size_t serialization_data_offset = 88 size_t serialization_data_offset =
88 transport_data->platform_handle_table_offset(); 89 transport_data->platform_handle_table_offset();
89 serialization_data_offset += 90 serialization_data_offset +=
90 platform_handles_offset_ * serialized_platform_handle_size_; 91 platform_handles_offset_ * serialized_platform_handle_size_;
(...skipping 17 matching lines...) Expand all
108 size_t transport_data_buffer_size = 109 size_t transport_data_buffer_size =
109 message->transport_data() ? message->transport_data()->buffer_size() : 0; 110 message->transport_data() ? message->transport_data()->buffer_size() : 0;
110 111
111 if (!transport_data_buffer_size) { 112 if (!transport_data_buffer_size) {
112 // Only write from the main buffer. 113 // Only write from the main buffer.
113 DCHECK_LT(data_offset_, message->main_buffer_size()); 114 DCHECK_LT(data_offset_, message->main_buffer_size());
114 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 115 DCHECK_LE(bytes_to_write, message->main_buffer_size());
115 Buffer buffer = { 116 Buffer buffer = {
116 static_cast<const char*>(message->main_buffer()) + data_offset_, 117 static_cast<const char*>(message->main_buffer()) + data_offset_,
117 bytes_to_write}; 118 bytes_to_write};
119
118 buffers->push_back(buffer); 120 buffers->push_back(buffer);
119 return; 121 return;
120 } 122 }
121 123
122 if (data_offset_ >= message->main_buffer_size()) { 124 if (data_offset_ >= message->main_buffer_size()) {
123 // Only write from the transport data buffer. 125 // Only write from the transport data buffer.
124 DCHECK_LT(data_offset_ - message->main_buffer_size(), 126 DCHECK_LT(data_offset_ - message->main_buffer_size(),
125 transport_data_buffer_size); 127 transport_data_buffer_size);
126 DCHECK_LE(bytes_to_write, transport_data_buffer_size); 128 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
127 Buffer buffer = { 129 Buffer buffer = {
128 static_cast<const char*>(message->transport_data()->buffer()) + 130 static_cast<const char*>(message->transport_data()->buffer()) +
129 (data_offset_ - message->main_buffer_size()), 131 (data_offset_ - message->main_buffer_size()),
130 bytes_to_write}; 132 bytes_to_write};
133
131 buffers->push_back(buffer); 134 buffers->push_back(buffer);
132 return; 135 return;
133 } 136 }
134 137
135 // TODO(vtl): We could actually send out buffers from multiple messages, with 138 // TODO(vtl): We could actually send out buffers from multiple messages, with
136 // the "stopping" condition being reaching a message with platform handles 139 // the "stopping" condition being reaching a message with platform handles
137 // attached. 140 // attached.
138 141
139 // Write from both buffers. 142 // Write from both buffers.
140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + 143 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
141 transport_data_buffer_size); 144 transport_data_buffer_size);
142 Buffer buffer1 = { 145 Buffer buffer1 = {
143 static_cast<const char*>(message->main_buffer()) + data_offset_, 146 static_cast<const char*>(message->main_buffer()) + data_offset_,
144 message->main_buffer_size() - data_offset_}; 147 message->main_buffer_size() - data_offset_};
145 buffers->push_back(buffer1); 148 buffers->push_back(buffer1);
146 Buffer buffer2 = { 149 Buffer buffer2 = {
147 static_cast<const char*>(message->transport_data()->buffer()), 150 static_cast<const char*>(message->transport_data()->buffer()),
148 transport_data_buffer_size}; 151 transport_data_buffer_size};
149 buffers->push_back(buffer2); 152 buffers->push_back(buffer2);
150 } 153 }
151 154
152 // RawChannel ------------------------------------------------------------------ 155 // RawChannel ------------------------------------------------------------------
153 156
154 RawChannel::RawChannel() 157 RawChannel::RawChannel()
155 : message_loop_for_io_(nullptr), 158 : message_loop_for_io_(nullptr),
156 delegate_(nullptr), 159 delegate_(nullptr),
157 set_on_shutdown_(nullptr), 160 write_ready_(false),
158 write_stopped_(false), 161 write_stopped_(false),
162 error_occurred_(false),
159 weak_ptr_factory_(this) { 163 weak_ptr_factory_(this) {
164 read_buffer_.reset(new ReadBuffer);
165 write_buffer_.reset(new WriteBuffer());
160 } 166 }
161 167
162 RawChannel::~RawChannel() { 168 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_); 169 DCHECK(!read_buffer_);
164 DCHECK(!write_buffer_); 170 DCHECK(!write_buffer_);
165 171
166 // No need to take the |write_lock_| here -- if there are still weak pointers 172 // Only want to decrement counter if Init was called.
167 // outstanding, then we're hosed anyway (since we wouldn't be able to 173 if (message_loop_for_io_) {
168 // invalidate them cleanly, since we might not be on the I/O thread). 174 // No need to take the |write_lock_| here -- if there are still weak
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 175 // pointers outstanding, then we're hosed anyway (since we wouldn't be able
176 // to invalidate them cleanly, since we might not be on the I/O thread).
177 // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
178 internal::ChannelShutdown();
179 }
170 } 180 }
171 181
172 void RawChannel::Init(Delegate* delegate) { 182 void RawChannel::Init(Delegate* delegate) {
183 internal::ChannelStarted();
173 DCHECK(delegate); 184 DCHECK(delegate);
174 185
186 base::AutoLock read_locker(read_lock_);
187 // solves race where initialiing on io thread while main thread is serializing
188 // this channel and releases handle.
189 base::AutoLock locker(write_lock_);
190
175 DCHECK(!delegate_); 191 DCHECK(!delegate_);
176 delegate_ = delegate; 192 delegate_ = delegate;
177 193
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
179 DCHECK(!message_loop_for_io_); 194 DCHECK(!message_loop_for_io_);
180 message_loop_for_io_ = 195 message_loop_for_io_ =
181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 196 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
182 197
183 // No need to take the lock. No one should be using us yet. 198 OnInit();
184 DCHECK(!read_buffer_);
185 read_buffer_.reset(new ReadBuffer);
186 DCHECK(!write_buffer_);
187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
188 199
189 OnInit(); 200 if (read_buffer_->num_valid_bytes()) {
201 // We had serialized read buffer data through SetInitialReadBufferData call.
202 // Make sure we read messages out of it now, otherwise the delegate won't
203 // get notified if no other data gets written to the pipe.
204 // Although this means that we can call back synchronously into the caller,
205 // that's easier than posting a task to do this. That is because if we post
206 // a task, a pending read could have started and we wouldn't be able to move
207 // the read buffer since it can be in use by the OS in an async operation.
208 bool did_dispatch_message = false;
209 bool stop_dispatching = false;
210 DispatchMessages(&did_dispatch_message, &stop_dispatching);
211 }
190 212
191 IOResult io_result = ScheduleRead(); 213 IOResult io_result = ScheduleRead();
192 if (io_result != IO_PENDING) { 214 if (io_result != IO_PENDING) {
193 // This will notify the delegate about the read failure. Although we're on 215 // This will notify the delegate about the read failure. Although we're on
194 // the I/O thread, don't call it in the nested context. 216 // the I/O thread, don't call it in the nested context.
195 message_loop_for_io_->PostTask( 217 message_loop_for_io_->PostTask(
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, 218 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); 219 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
198 } 220 }
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying 221 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
200 // the delegate), not an initialization failure. 222 // the delegate), not an initialization failure.
223
224 write_ready_ = true;
225 write_buffer_->serialized_platform_handle_size_ =
226 GetSerializedPlatformHandleSize();
227 if (!write_buffer_->message_queue_.IsEmpty())
228 SendQueuedMessagesNoLock();
201 } 229 }
202 230
203 void RawChannel::Shutdown() { 231 void RawChannel::Shutdown() {
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 232 weak_ptr_factory_.InvalidateWeakPtrs();
205 233
234 // Normally, we want to flush any pending writes before shutting down. This
235 // doesn't apply when 1) we don't have a handle (for obvious reasons) or
236 // 2) when the other side already quit and asked us to close the handle to
237 // ensure that we read everything out of the pipe first.
238 if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
239 {
240 base::AutoLock read_locker(read_lock_);
241 base::AutoLock locker(write_lock_);
242 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
243 }
244 delete this;
245 return;
246 }
247
248 base::AutoLock read_locker(read_lock_);
206 base::AutoLock locker(write_lock_); 249 base::AutoLock locker(write_lock_);
250 DCHECK(read_buffer_->num_valid_bytes() == 0) <<
251 "RawChannel::Shutdown called but there is pending data to be read";
207 252
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) 253 // happens on shutdown if didn't call init when doing createduplicate
209 << "Shutting down RawChannel with write buffer nonempty"; 254 if (message_loop_for_io()) {
255 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
256 }
210 257
211 // Reset the delegate so that it won't receive further calls. 258 // Reset the delegate so that it won't receive further calls.
212 delegate_ = nullptr; 259 delegate_ = nullptr;
213 if (set_on_shutdown_) { 260
214 *set_on_shutdown_ = true; 261 bool empty = write_buffer_->message_queue_.IsEmpty();
215 set_on_shutdown_ = nullptr; 262
263 // We may have no messages to write. However just because our end of the pipe
264 // wrote everything doesn't mean that the other end read it. We don't want to
265 // call FlushFileBuffers since a) that only works for server end of the pipe,
266 // and b) it pauses this thread (which can block a process on another, or
267 // worse hang if both pipes are in the same process).
268 scoped_ptr<MessageInTransit> quit_message(new MessageInTransit(
269 MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr));
270 EnqueueMessageNoLock(quit_message.Pass());
271 write_stopped_ = true;
272
273 if (empty)
274 SendQueuedMessagesNoLock();
275 }
276
277 ScopedPlatformHandle RawChannel::ReleaseHandle(
278 std::vector<char>* read_buffer) {
279 ScopedPlatformHandle rv;
280 {
281 base::AutoLock read_locker(read_lock_);
282 base::AutoLock locker(write_lock_);
283 rv = ReleaseHandleNoLock(read_buffer);
284
285 // TODO(jam); if we use these, use nolock versions of these methods that are
286 // copied.
287 if (!write_buffer_->message_queue_.IsEmpty()) {
288 NOTREACHED() << "TODO(JAM)";
289 }
290
291 delegate_ = nullptr;
292
293 // The Unretained is safe because above cancelled IO so we shouldn't get any
294 // channel errors.
295 // |message_loop_for_io_| might not be set yet
296 internal::g_io_thread_task_runner->PostTask(
297 FROM_HERE,
298 base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
216 } 299 }
217 write_stopped_ = true;
218 weak_ptr_factory_.InvalidateWeakPtrs();
219 300
220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 301 return rv;
221 } 302 }
222 303
223 // Reminder: This must be thread-safe. 304 // Reminder: This must be thread-safe.
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 305 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
225 DCHECK(message); 306 DCHECK(message);
226
227 base::AutoLock locker(write_lock_); 307 base::AutoLock locker(write_lock_);
228 if (write_stopped_) 308 if (write_stopped_)
229 return false; 309 return false;
230 310
231 if (!write_buffer_->message_queue_.IsEmpty()) { 311 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
232 EnqueueMessageNoLock(message.Pass()); 312 EnqueueMessageNoLock(message.Pass());
233 return true; 313 if (queue_was_empty && write_ready_)
234 } 314 return SendQueuedMessagesNoLock();
235 315
236 EnqueueMessageNoLock(message.Pass()); 316 return true;
317 }
318
319 bool RawChannel::SendQueuedMessagesNoLock() {
237 DCHECK_EQ(write_buffer_->data_offset_, 0u); 320 DCHECK_EQ(write_buffer_->data_offset_, 0u);
238 321
239 size_t platform_handles_written = 0; 322 size_t platform_handles_written = 0;
240 size_t bytes_written = 0; 323 size_t bytes_written = 0;
241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 324 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
242 if (io_result == IO_PENDING) 325 if (io_result == IO_PENDING)
243 return true; 326 return true;
244 327
245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, 328 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
246 bytes_written); 329 bytes_written);
247 if (!result) { 330 if (!result) {
248 // Even if we're on the I/O thread, don't call |OnError()| in the nested 331 // Even if we're on the I/O thread, don't call |OnError()| in the nested
249 // context. 332 // context.
250 message_loop_for_io_->PostTask( 333 message_loop_for_io_->PostTask(
251 FROM_HERE, 334 FROM_HERE,
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), 335 base::Bind(&RawChannel::LockAndCallOnError,
336 weak_ptr_factory_.GetWeakPtr(),
253 Delegate::ERROR_WRITE)); 337 Delegate::ERROR_WRITE));
254 } 338 }
255 339
256 return result; 340 return result;
257 } 341 }
258 342
259 // Reminder: This must be thread-safe. 343 // Reminder: This must be thread-safe.
260 bool RawChannel::IsWriteBufferEmpty() { 344 bool RawChannel::IsWriteBufferEmpty() {
261 base::AutoLock locker(write_lock_); 345 base::AutoLock locker(write_lock_);
262 return write_buffer_->message_queue_.IsEmpty(); 346 return write_buffer_->message_queue_.IsEmpty();
263 } 347 }
264 348
349 bool RawChannel::IsReadBufferEmpty() {
350 base::AutoLock locker(read_lock_);
351 return read_buffer_->num_valid_bytes_ != 0;
352 }
353
354 void RawChannel::SetInitialReadBufferData(char* data, size_t size) {
355 base::AutoLock locker(read_lock_);
356 // TODO(jam): copy power of 2 algorithm below? or share.
357 read_buffer_->buffer_.resize(size+kReadSize);
358 memcpy(&read_buffer_->buffer_[0], data, size);
359 read_buffer_->num_valid_bytes_ = size;
360 }
361
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 362 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 363 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
267 364
365 base::AutoLock locker(read_lock_);
366
268 // Keep reading data in a loop, and dispatch messages if enough data is 367 // Keep reading data in a loop, and dispatch messages if enough data is
269 // received. Exit the loop if any of the following happens: 368 // received. Exit the loop if any of the following happens:
270 // - one or more messages were dispatched; 369 // - one or more messages were dispatched;
271 // - the last read failed, was a partial read or would block; 370 // - the last read failed, was a partial read or would block;
272 // - |Shutdown()| was called. 371 // - |Shutdown()| was called.
273 do { 372 do {
274 switch (io_result) { 373 switch (io_result) {
275 case IO_SUCCEEDED: 374 case IO_SUCCEEDED:
276 break; 375 break;
277 case IO_FAILED_SHUTDOWN: 376 case IO_FAILED_SHUTDOWN:
278 case IO_FAILED_BROKEN: 377 case IO_FAILED_BROKEN:
279 case IO_FAILED_UNKNOWN: 378 case IO_FAILED_UNKNOWN:
280 CallOnError(ReadIOResultToError(io_result)); 379 CallOnError(ReadIOResultToError(io_result));
281 return; // |this| may have been destroyed in |CallOnError()|. 380 return; // |this| may have been destroyed in |CallOnError()|.
282 case IO_PENDING: 381 case IO_PENDING:
283 NOTREACHED(); 382 NOTREACHED();
284 return; 383 return;
285 } 384 }
286 385
287 read_buffer_->num_valid_bytes_ += bytes_read; 386 read_buffer_->num_valid_bytes_ += bytes_read;
288 387
289 // Dispatch all the messages that we can. 388 // Dispatch all the messages that we can.
290 bool did_dispatch_message = false; 389 bool did_dispatch_message = false;
291 // Tracks the offset of the first undispatched message in |read_buffer_|. 390 bool stop_dispatching = false;
292 // Currently, we copy data to ensure that this is zero at the beginning. 391 DispatchMessages(&did_dispatch_message, &stop_dispatching);
293 size_t read_buffer_start = 0; 392 if (stop_dispatching)
294 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 393 return;
295 size_t message_size;
296 // Note that we rely on short-circuit evaluation here:
297 // - |read_buffer_start| may be an invalid index into
298 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
299 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
301 // next read).
302 // TODO(vtl): Validate that |message_size| is sane.
303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
304 &read_buffer_->buffer_[read_buffer_start],
305 remaining_bytes, &message_size) &&
306 remaining_bytes >= message_size) {
307 MessageInTransit::View message_view(
308 message_size, &read_buffer_->buffer_[read_buffer_start]);
309 DCHECK_EQ(message_view.total_size(), message_size);
310
311 const char* error_message = nullptr;
312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
313 &error_message)) {
314 DCHECK(error_message);
315 LOG(ERROR) << "Received invalid message: " << error_message;
316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
317 return; // |this| may have been destroyed in |CallOnError()|.
318 }
319
320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
321 if (!OnReadMessageForRawChannel(message_view)) {
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return; // |this| may have been destroyed in |CallOnError()|.
324 }
325 } else {
326 embedder::ScopedPlatformHandleVectorPtr platform_handles;
327 if (message_view.transport_data_buffer()) {
328 size_t num_platform_handles;
329 const void* platform_handle_table;
330 TransportData::GetPlatformHandleTable(
331 message_view.transport_data_buffer(), &num_platform_handles,
332 &platform_handle_table);
333
334 if (num_platform_handles > 0) {
335 platform_handles =
336 GetReadPlatformHandles(num_platform_handles,
337 platform_handle_table).Pass();
338 if (!platform_handles) {
339 LOG(ERROR) << "Invalid number of platform handles received";
340 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
341 return; // |this| may have been destroyed in |CallOnError()|.
342 }
343 }
344 }
345
346 // TODO(vtl): In the case that we aren't expecting any platform handles,
347 // for the POSIX implementation, we should confirm that none are stored.
348
349 // Dispatch the message.
350 // Detect the case when |Shutdown()| is called; subsequent destruction
351 // is also permitted then.
352 bool shutdown_called = false;
353 DCHECK(!set_on_shutdown_);
354 set_on_shutdown_ = &shutdown_called;
355 DCHECK(delegate_);
356 delegate_->OnReadMessage(message_view, platform_handles.Pass());
357 if (shutdown_called)
358 return;
359 set_on_shutdown_ = nullptr;
360 }
361
362 did_dispatch_message = true;
363
364 // Update our state.
365 read_buffer_start += message_size;
366 remaining_bytes -= message_size;
367 }
368
369 if (read_buffer_start > 0) {
370 // Move data back to start.
371 read_buffer_->num_valid_bytes_ = remaining_bytes;
372 if (read_buffer_->num_valid_bytes_ > 0) {
373 memmove(&read_buffer_->buffer_[0],
374 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
375 }
376 read_buffer_start = 0;
377 }
378 394
379 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 395 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
380 kReadSize) { 396 kReadSize) {
381 // Use power-of-2 buffer sizes. 397 // Use power-of-2 buffer sizes.
382 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 398 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
383 // maximum message size to whatever extent necessary). 399 // maximum message size to whatever extent necessary).
384 // TODO(vtl): We may often be able to peek at the header and get the real 400 // TODO(vtl): We may often be able to peek at the header and get the real
385 // required extra space (which may be much bigger than |kReadSize|). 401 // required extra space (which may be much bigger than |kReadSize|).
386 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 402 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
387 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 403 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
(...skipping 18 matching lines...) Expand all
406 422
407 void RawChannel::OnWriteCompleted(IOResult io_result, 423 void RawChannel::OnWriteCompleted(IOResult io_result,
408 size_t platform_handles_written, 424 size_t platform_handles_written,
409 size_t bytes_written) { 425 size_t bytes_written) {
410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 426 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
411 DCHECK_NE(io_result, IO_PENDING); 427 DCHECK_NE(io_result, IO_PENDING);
412 428
413 bool did_fail = false; 429 bool did_fail = false;
414 { 430 {
415 base::AutoLock locker(write_lock_); 431 base::AutoLock locker(write_lock_);
416 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty());
417
418 if (write_stopped_) {
419 NOTREACHED();
420 return;
421 }
422
423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, 432 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
424 bytes_written); 433 bytes_written);
425 } 434 }
426 435
427 if (did_fail) { 436 if (did_fail) {
428 CallOnError(Delegate::ERROR_WRITE); 437 LockAndCallOnError(Delegate::ERROR_WRITE);
429 return; // |this| may have been destroyed in |CallOnError()|. 438 return; // |this| may have been destroyed in |CallOnError()|.
430 } 439 }
431 } 440 }
432 441
433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 442 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
434 write_lock_.AssertAcquired(); 443 write_lock_.AssertAcquired();
444 DCHECK(HandleForDebuggingNoLock().is_valid());
435 write_buffer_->message_queue_.AddMessage(message.Pass()); 445 write_buffer_->message_queue_.AddMessage(message.Pass());
436 } 446 }
437 447
438 bool RawChannel::OnReadMessageForRawChannel( 448 bool RawChannel::OnReadMessageForRawChannel(
439 const MessageInTransit::View& message_view) { 449 const MessageInTransit::View& message_view) {
450 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
451 message_loop_for_io_->PostTask(
452 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
453 weak_ptr_factory_.GetWeakPtr(),
454 Delegate::ERROR_READ_SHUTDOWN));
455 return true;
456 }
457
440 // No non-implementation specific |RawChannel| control messages. 458 // No non-implementation specific |RawChannel| control messages.
441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() 459 LOG(ERROR) << "Invalid control message (type " << message_view.type()
442 << ")"; 460 << ")";
443 return false; 461 return false;
444 } 462 }
445 463
446 // static
447 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( 464 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
448 IOResult io_result) { 465 IOResult io_result) {
449 switch (io_result) { 466 switch (io_result) {
450 case IO_FAILED_SHUTDOWN: 467 case IO_FAILED_SHUTDOWN:
451 return Delegate::ERROR_READ_SHUTDOWN; 468 return Delegate::ERROR_READ_SHUTDOWN;
452 case IO_FAILED_BROKEN: 469 case IO_FAILED_BROKEN:
453 return Delegate::ERROR_READ_BROKEN; 470 return Delegate::ERROR_READ_BROKEN;
454 case IO_FAILED_UNKNOWN: 471 case IO_FAILED_UNKNOWN:
455 return Delegate::ERROR_READ_UNKNOWN; 472 return Delegate::ERROR_READ_UNKNOWN;
456 case IO_SUCCEEDED: 473 case IO_SUCCEEDED:
457 case IO_PENDING: 474 case IO_PENDING:
458 NOTREACHED(); 475 NOTREACHED();
459 break; 476 break;
460 } 477 }
461 return Delegate::ERROR_READ_UNKNOWN; 478 return Delegate::ERROR_READ_UNKNOWN;
462 } 479 }
463 480
464 void RawChannel::CallOnError(Delegate::Error error) { 481 void RawChannel::CallOnError(Delegate::Error error) {
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 482 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 483 read_lock_.AssertAcquired();
484 error_occurred_ = true;
467 if (delegate_) { 485 if (delegate_) {
468 delegate_->OnError(error); 486 delegate_->OnError(error);
469 return; // |this| may have been destroyed in |OnError()|. 487 } else {
488 // We depend on delegate to delete since it could be waiting to call
489 // ReleaseHandle.
490 base::MessageLoop::current()->PostTask(
491 FROM_HERE,
492 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
470 } 493 }
471 } 494 }
472 495
496 void RawChannel::LockAndCallOnError(Delegate::Error error) {
497 base::AutoLock locker(read_lock_);
498 CallOnError(error);
499 }
500
473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 501 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
474 size_t platform_handles_written, 502 size_t platform_handles_written,
475 size_t bytes_written) { 503 size_t bytes_written) {
476 write_lock_.AssertAcquired(); 504 write_lock_.AssertAcquired();
477 505
478 DCHECK(!write_stopped_);
479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 506 DCHECK(!write_buffer_->message_queue_.IsEmpty());
480 507
481 if (io_result == IO_SUCCEEDED) { 508 if (io_result == IO_SUCCEEDED) {
482 write_buffer_->platform_handles_offset_ += platform_handles_written; 509 write_buffer_->platform_handles_offset_ += platform_handles_written;
483 write_buffer_->data_offset_ += bytes_written; 510 write_buffer_->data_offset_ += bytes_written;
484 511
485 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); 512 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
486 if (write_buffer_->data_offset_ >= message->total_size()) { 513 if (write_buffer_->data_offset_ >= message->total_size()) {
487 // Complete write. 514 // Complete write.
488 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); 515 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
(...skipping 12 matching lines...) Expand all
501 DCHECK_NE(io_result, IO_SUCCEEDED); 528 DCHECK_NE(io_result, IO_SUCCEEDED);
502 } 529 }
503 530
504 write_stopped_ = true; 531 write_stopped_ = true;
505 write_buffer_->message_queue_.Clear(); 532 write_buffer_->message_queue_.Clear();
506 write_buffer_->platform_handles_offset_ = 0; 533 write_buffer_->platform_handles_offset_ = 0;
507 write_buffer_->data_offset_ = 0; 534 write_buffer_->data_offset_ = 0;
508 return false; 535 return false;
509 } 536 }
510 537
511 } // namespace system 538 void RawChannel::DispatchMessages(bool* did_dispatch_message,
539 bool* stop_dispatching) {
540 *did_dispatch_message = false;
541 *stop_dispatching = false;
542 // Tracks the offset of the first undispatched message in |read_buffer_|.
543 // Currently, we copy data to ensure that this is zero at the beginning.
544 size_t read_buffer_start = 0;
545 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
546 size_t message_size;
547 // Note that we rely on short-circuit evaluation here:
548 // - |read_buffer_start| may be an invalid index into
549 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
550 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
551 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
552 // next read).
553 // TODO(vtl): Validate that |message_size| is sane.
554 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
555 &read_buffer_->buffer_[read_buffer_start],
556 remaining_bytes, &message_size) &&
557 remaining_bytes >= message_size) {
558 MessageInTransit::View message_view(
559 message_size, &read_buffer_->buffer_[read_buffer_start]);
560 DCHECK_EQ(message_view.total_size(), message_size);
561
562 const char* error_message = nullptr;
563 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
564 &error_message)) {
565 DCHECK(error_message);
566 LOG(ERROR) << "Received invalid message: " << error_message;
567 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
568 *stop_dispatching = true;
569 return; // |this| may have been destroyed in |CallOnError()|.
570 }
571
572 if (message_view.type() != MessageInTransit::Type::MESSAGE) {
573 if (!OnReadMessageForRawChannel(message_view)) {
574 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
575 *stop_dispatching = true;
576 return; // |this| may have been destroyed in |CallOnError()|.
577 }
578 } else {
579 ScopedPlatformHandleVectorPtr platform_handles;
580 if (message_view.transport_data_buffer()) {
581 size_t num_platform_handles;
582 const void* platform_handle_table;
583 TransportData::GetPlatformHandleTable(
584 message_view.transport_data_buffer(), &num_platform_handles,
585 &platform_handle_table);
586
587 if (num_platform_handles > 0) {
588 platform_handles =
589 GetReadPlatformHandles(num_platform_handles,
590 platform_handle_table).Pass();
591 if (!platform_handles) {
592 LOG(ERROR) << "Invalid number of platform handles received";
593 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
594 *stop_dispatching = true;
595 return; // |this| may have been destroyed in |CallOnError()|.
596 }
597 }
598 }
599
600 // TODO(vtl): In the case that we aren't expecting any platform handles,
601 // for the POSIX implementation, we should confirm that none are stored.
602
603 // Dispatch the message.
604 // Note: it's valid to get here without a delegate. i.e. after Shutdown
605 // is called, if this object still has a valid handle we keep it alive
606 // until the other side closes it in response to the RAW_CHANNEL_QUIT
607 // message. In the meantime the sender could have sent us a message.
608 if (delegate_)
609 delegate_->OnReadMessage(message_view, platform_handles.Pass());
610 }
611
612 *did_dispatch_message = true;
613
614 // Update our state.
615 read_buffer_start += message_size;
616 remaining_bytes -= message_size;
617 }
618
619 if (read_buffer_start > 0) {
620 // Move data back to start.
621 read_buffer_->num_valid_bytes_ = remaining_bytes;
622 if (read_buffer_->num_valid_bytes_ > 0) {
623 memmove(&read_buffer_->buffer_[0],
624 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
625 }
626 read_buffer_start = 0;
627 }
628 }
629
630 } // namespace edk
512 } // namespace mojo 631 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698