Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(179)

Side by Side Diff: trunk/src/mojo/system/raw_channel.cc

Issue 449063003: Revert 286239 "Reland r285211: "Debugging RawChannelWin: replace..." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | trunk/src/mojo/system/raw_channel_win.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/raw_channel.h" 5 #include "mojo/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
(...skipping 12 matching lines...) Expand all
23 23
24 // RawChannel::ReadBuffer ------------------------------------------------------ 24 // RawChannel::ReadBuffer ------------------------------------------------------
25 25
26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
27 } 27 }
28 28
29 RawChannel::ReadBuffer::~ReadBuffer() { 29 RawChannel::ReadBuffer::~ReadBuffer() {
30 } 30 }
31 31
32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
33 CHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
34 *addr = &buffer_[0] + num_valid_bytes_; 34 *addr = &buffer_[0] + num_valid_bytes_;
35 *size = kReadSize; 35 *size = kReadSize;
36 } 36 }
37 37
38 // RawChannel::WriteBuffer ----------------------------------------------------- 38 // RawChannel::WriteBuffer -----------------------------------------------------
39 39
40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) 40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
41 : serialized_platform_handle_size_(serialized_platform_handle_size), 41 : serialized_platform_handle_size_(serialized_platform_handle_size),
42 platform_handles_offset_(0), 42 platform_handles_offset_(0),
43 data_offset_(0) { 43 data_offset_(0) {
44 } 44 }
45 45
46 RawChannel::WriteBuffer::~WriteBuffer() { 46 RawChannel::WriteBuffer::~WriteBuffer() {
47 STLDeleteElements(&message_queue_); 47 STLDeleteElements(&message_queue_);
48 } 48 }
49 49
50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
51 if (message_queue_.empty()) 51 if (message_queue_.empty())
52 return false; 52 return false;
53 53
54 const TransportData* transport_data = 54 const TransportData* transport_data =
55 message_queue_.front()->transport_data(); 55 message_queue_.front()->transport_data();
56 if (!transport_data) 56 if (!transport_data)
57 return false; 57 return false;
58 58
59 const embedder::PlatformHandleVector* all_platform_handles = 59 const embedder::PlatformHandleVector* all_platform_handles =
60 transport_data->platform_handles(); 60 transport_data->platform_handles();
61 if (!all_platform_handles) { 61 if (!all_platform_handles) {
62 CHECK_EQ(platform_handles_offset_, 0u); 62 DCHECK_EQ(platform_handles_offset_, 0u);
63 return false; 63 return false;
64 } 64 }
65 if (platform_handles_offset_ >= all_platform_handles->size()) { 65 if (platform_handles_offset_ >= all_platform_handles->size()) {
66 CHECK_EQ(platform_handles_offset_, all_platform_handles->size()); 66 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
67 return false; 67 return false;
68 } 68 }
69 69
70 return true; 70 return true;
71 } 71 }
72 72
73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( 73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
74 size_t* num_platform_handles, 74 size_t* num_platform_handles,
75 embedder::PlatformHandle** platform_handles, 75 embedder::PlatformHandle** platform_handles,
76 void** serialization_data) { 76 void** serialization_data) {
77 CHECK(HavePlatformHandlesToSend()); 77 DCHECK(HavePlatformHandlesToSend());
78 78
79 TransportData* transport_data = message_queue_.front()->transport_data(); 79 TransportData* transport_data = message_queue_.front()->transport_data();
80 embedder::PlatformHandleVector* all_platform_handles = 80 embedder::PlatformHandleVector* all_platform_handles =
81 transport_data->platform_handles(); 81 transport_data->platform_handles();
82 *num_platform_handles = 82 *num_platform_handles =
83 all_platform_handles->size() - platform_handles_offset_; 83 all_platform_handles->size() - platform_handles_offset_;
84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; 84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
85 size_t serialization_data_offset = 85 size_t serialization_data_offset =
86 transport_data->platform_handle_table_offset(); 86 transport_data->platform_handle_table_offset();
87 CHECK_GT(serialization_data_offset, 0u); 87 DCHECK_GT(serialization_data_offset, 0u);
88 serialization_data_offset += 88 serialization_data_offset +=
89 platform_handles_offset_ * serialized_platform_handle_size_; 89 platform_handles_offset_ * serialized_platform_handle_size_;
90 *serialization_data = 90 *serialization_data =
91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset; 91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset;
92 } 92 }
93 93
94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { 94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
95 buffers->clear(); 95 buffers->clear();
96 96
97 if (message_queue_.empty()) 97 if (message_queue_.empty())
98 return; 98 return;
99 99
100 MessageInTransit* message = message_queue_.front(); 100 MessageInTransit* message = message_queue_.front();
101 CHECK_LT(data_offset_, message->total_size()); 101 DCHECK_LT(data_offset_, message->total_size());
102 size_t bytes_to_write = message->total_size() - data_offset_; 102 size_t bytes_to_write = message->total_size() - data_offset_;
103 103
104 size_t transport_data_buffer_size = 104 size_t transport_data_buffer_size =
105 message->transport_data() ? message->transport_data()->buffer_size() : 0; 105 message->transport_data() ? message->transport_data()->buffer_size() : 0;
106 106
107 if (!transport_data_buffer_size) { 107 if (!transport_data_buffer_size) {
108 // Only write from the main buffer. 108 // Only write from the main buffer.
109 CHECK_LT(data_offset_, message->main_buffer_size()); 109 DCHECK_LT(data_offset_, message->main_buffer_size());
110 CHECK_LE(bytes_to_write, message->main_buffer_size()); 110 DCHECK_LE(bytes_to_write, message->main_buffer_size());
111 Buffer buffer = { 111 Buffer buffer = {
112 static_cast<const char*>(message->main_buffer()) + data_offset_, 112 static_cast<const char*>(message->main_buffer()) + data_offset_,
113 bytes_to_write}; 113 bytes_to_write};
114 buffers->push_back(buffer); 114 buffers->push_back(buffer);
115 return; 115 return;
116 } 116 }
117 117
118 if (data_offset_ >= message->main_buffer_size()) { 118 if (data_offset_ >= message->main_buffer_size()) {
119 // Only write from the transport data buffer. 119 // Only write from the transport data buffer.
120 CHECK_LT(data_offset_ - message->main_buffer_size(), 120 DCHECK_LT(data_offset_ - message->main_buffer_size(),
121 transport_data_buffer_size); 121 transport_data_buffer_size);
122 CHECK_LE(bytes_to_write, transport_data_buffer_size); 122 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
123 Buffer buffer = { 123 Buffer buffer = {
124 static_cast<const char*>(message->transport_data()->buffer()) + 124 static_cast<const char*>(message->transport_data()->buffer()) +
125 (data_offset_ - message->main_buffer_size()), 125 (data_offset_ - message->main_buffer_size()),
126 bytes_to_write}; 126 bytes_to_write};
127 buffers->push_back(buffer); 127 buffers->push_back(buffer);
128 return; 128 return;
129 } 129 }
130 130
131 // TODO(vtl): We could actually send out buffers from multiple messages, with 131 // TODO(vtl): We could actually send out buffers from multiple messages, with
132 // the "stopping" condition being reaching a message with platform handles 132 // the "stopping" condition being reaching a message with platform handles
133 // attached. 133 // attached.
134 134
135 // Write from both buffers. 135 // Write from both buffers.
136 CHECK_EQ( 136 DCHECK_EQ(
137 bytes_to_write, 137 bytes_to_write,
138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); 138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size);
139 Buffer buffer1 = { 139 Buffer buffer1 = {
140 static_cast<const char*>(message->main_buffer()) + data_offset_, 140 static_cast<const char*>(message->main_buffer()) + data_offset_,
141 message->main_buffer_size() - data_offset_}; 141 message->main_buffer_size() - data_offset_};
142 buffers->push_back(buffer1); 142 buffers->push_back(buffer1);
143 Buffer buffer2 = { 143 Buffer buffer2 = {
144 static_cast<const char*>(message->transport_data()->buffer()), 144 static_cast<const char*>(message->transport_data()->buffer()),
145 transport_data_buffer_size}; 145 transport_data_buffer_size};
146 buffers->push_back(buffer2); 146 buffers->push_back(buffer2);
147 } 147 }
148 148
149 // RawChannel ------------------------------------------------------------------ 149 // RawChannel ------------------------------------------------------------------
150 150
151 RawChannel::RawChannel() 151 RawChannel::RawChannel()
152 : message_loop_for_io_(NULL), 152 : message_loop_for_io_(NULL),
153 delegate_(NULL), 153 delegate_(NULL),
154 read_stopped_(false), 154 read_stopped_(false),
155 write_stopped_(false), 155 write_stopped_(false),
156 weak_ptr_factory_(this) { 156 weak_ptr_factory_(this) {
157 } 157 }
158 158
159 RawChannel::~RawChannel() { 159 RawChannel::~RawChannel() {
160 CHECK(!read_buffer_); 160 DCHECK(!read_buffer_);
161 CHECK(!write_buffer_); 161 DCHECK(!write_buffer_);
162 162
163 // No need to take the |write_lock_| here -- if there are still weak pointers 163 // No need to take the |write_lock_| here -- if there are still weak pointers
164 // outstanding, then we're hosed anyway (since we wouldn't be able to 164 // outstanding, then we're hosed anyway (since we wouldn't be able to
165 // invalidate them cleanly, since we might not be on the I/O thread). 165 // invalidate them cleanly, since we might not be on the I/O thread).
166 CHECK(!weak_ptr_factory_.HasWeakPtrs()); 166 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
167 } 167 }
168 168
169 bool RawChannel::Init(Delegate* delegate) { 169 bool RawChannel::Init(Delegate* delegate) {
170 CHECK(delegate); 170 DCHECK(delegate);
171 171
172 CHECK(!delegate_); 172 DCHECK(!delegate_);
173 delegate_ = delegate; 173 delegate_ = delegate;
174 174
175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
176 CHECK(!message_loop_for_io_); 176 DCHECK(!message_loop_for_io_);
177 message_loop_for_io_ = 177 message_loop_for_io_ =
178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
179 179
180 // No need to take the lock. No one should be using us yet. 180 // No need to take the lock. No one should be using us yet.
181 CHECK(!read_buffer_); 181 DCHECK(!read_buffer_);
182 read_buffer_.reset(new ReadBuffer); 182 read_buffer_.reset(new ReadBuffer);
183 CHECK(!write_buffer_); 183 DCHECK(!write_buffer_);
184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); 184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
185 185
186 if (!OnInit()) { 186 if (!OnInit()) {
187 delegate_ = NULL; 187 delegate_ = NULL;
188 message_loop_for_io_ = NULL; 188 message_loop_for_io_ = NULL;
189 read_buffer_.reset(); 189 read_buffer_.reset();
190 write_buffer_.reset(); 190 write_buffer_.reset();
191 return false; 191 return false;
192 } 192 }
193 193
194 if (ScheduleRead() != IO_PENDING) { 194 if (ScheduleRead() != IO_PENDING) {
195 // This will notify the delegate about the read failure. Although we're on 195 // This will notify the delegate about the read failure. Although we're on
196 // the I/O thread, don't call it in the nested context. 196 // the I/O thread, don't call it in the nested context.
197 message_loop_for_io_->PostTask(FROM_HERE, 197 message_loop_for_io_->PostTask(FROM_HERE,
198 base::Bind(&RawChannel::OnReadCompleted, 198 base::Bind(&RawChannel::OnReadCompleted,
199 weak_ptr_factory_.GetWeakPtr(), 199 weak_ptr_factory_.GetWeakPtr(),
200 false, 200 false,
201 0)); 201 0));
202 } 202 }
203 203
204 // ScheduleRead() failure is treated as a read failure (by notifying the 204 // ScheduleRead() failure is treated as a read failure (by notifying the
205 // delegate), not as an init failure. 205 // delegate), not as an init failure.
206 return true; 206 return true;
207 } 207 }
208 208
209 void RawChannel::Shutdown() { 209 void RawChannel::Shutdown() {
210 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 210 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
211 211
212 base::AutoLock locker(write_lock_); 212 base::AutoLock locker(write_lock_);
213 213
214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) 214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
215 << "Shutting down RawChannel with write buffer nonempty"; 215 << "Shutting down RawChannel with write buffer nonempty";
216 216
217 // Reset the delegate so that it won't receive further calls. 217 // Reset the delegate so that it won't receive further calls.
218 delegate_ = NULL; 218 delegate_ = NULL;
219 read_stopped_ = true; 219 read_stopped_ = true;
220 write_stopped_ = true; 220 write_stopped_ = true;
221 weak_ptr_factory_.InvalidateWeakPtrs(); 221 weak_ptr_factory_.InvalidateWeakPtrs();
222 222
223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
224 } 224 }
225 225
226 // Reminder: This must be thread-safe. 226 // Reminder: This must be thread-safe.
227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
228 CHECK(message); 228 DCHECK(message);
229 229
230 base::AutoLock locker(write_lock_); 230 base::AutoLock locker(write_lock_);
231 if (write_stopped_) 231 if (write_stopped_)
232 return false; 232 return false;
233 233
234 if (!write_buffer_->message_queue_.empty()) { 234 if (!write_buffer_->message_queue_.empty()) {
235 EnqueueMessageNoLock(message.Pass()); 235 EnqueueMessageNoLock(message.Pass());
236 return true; 236 return true;
237 } 237 }
238 238
239 EnqueueMessageNoLock(message.Pass()); 239 EnqueueMessageNoLock(message.Pass());
240 CHECK_EQ(write_buffer_->data_offset_, 0u); 240 DCHECK_EQ(write_buffer_->data_offset_, 0u);
241 241
242 size_t platform_handles_written = 0; 242 size_t platform_handles_written = 0;
243 size_t bytes_written = 0; 243 size_t bytes_written = 0;
244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
245 if (io_result == IO_PENDING) 245 if (io_result == IO_PENDING)
246 return true; 246 return true;
247 247
248 bool result = OnWriteCompletedNoLock( 248 bool result = OnWriteCompletedNoLock(
249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written); 249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written);
250 if (!result) { 250 if (!result) {
251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
252 // nested context. 252 // nested context.
253 message_loop_for_io_->PostTask(FROM_HERE, 253 message_loop_for_io_->PostTask(FROM_HERE,
254 base::Bind(&RawChannel::CallOnFatalError, 254 base::Bind(&RawChannel::CallOnFatalError,
255 weak_ptr_factory_.GetWeakPtr(), 255 weak_ptr_factory_.GetWeakPtr(),
256 Delegate::FATAL_ERROR_WRITE)); 256 Delegate::FATAL_ERROR_WRITE));
257 } 257 }
258 258
259 return result; 259 return result;
260 } 260 }
261 261
262 // Reminder: This must be thread-safe. 262 // Reminder: This must be thread-safe.
263 bool RawChannel::IsWriteBufferEmpty() { 263 bool RawChannel::IsWriteBufferEmpty() {
264 base::AutoLock locker(write_lock_); 264 base::AutoLock locker(write_lock_);
265 return write_buffer_->message_queue_.empty(); 265 return write_buffer_->message_queue_.empty();
266 } 266 }
267 267
268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { 268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
269 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 269 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
270 270
271 if (read_stopped_) { 271 if (read_stopped_) {
272 NOTREACHED(); 272 NOTREACHED();
273 return; 273 return;
274 } 274 }
275 275
276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; 276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
277 277
278 // Keep reading data in a loop, and dispatch messages if enough data is 278 // Keep reading data in a loop, and dispatch messages if enough data is
279 // received. Exit the loop if any of the following happens: 279 // received. Exit the loop if any of the following happens:
(...skipping 23 matching lines...) Expand all
303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the 303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
304 // next read). 304 // next read).
305 // TODO(vtl): Validate that |message_size| is sane. 305 // TODO(vtl): Validate that |message_size| is sane.
306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( 306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
307 &read_buffer_->buffer_[read_buffer_start], 307 &read_buffer_->buffer_[read_buffer_start],
308 remaining_bytes, 308 remaining_bytes,
309 &message_size) && 309 &message_size) &&
310 remaining_bytes >= message_size) { 310 remaining_bytes >= message_size) {
311 MessageInTransit::View message_view( 311 MessageInTransit::View message_view(
312 message_size, &read_buffer_->buffer_[read_buffer_start]); 312 message_size, &read_buffer_->buffer_[read_buffer_start]);
313 CHECK_EQ(message_view.total_size(), message_size); 313 DCHECK_EQ(message_view.total_size(), message_size);
314 314
315 const char* error_message = NULL; 315 const char* error_message = NULL;
316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), 316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
317 &error_message)) { 317 &error_message)) {
318 CHECK(error_message); 318 DCHECK(error_message);
319 LOG(WARNING) << "Received invalid message: " << error_message; 319 LOG(WARNING) << "Received invalid message: " << error_message;
320 read_stopped_ = true; 320 read_stopped_ = true;
321 CallOnFatalError(Delegate::FATAL_ERROR_READ); 321 CallOnFatalError(Delegate::FATAL_ERROR_READ);
322 return; 322 return;
323 } 323 }
324 324
325 if (message_view.type() == MessageInTransit::kTypeRawChannel) { 325 if (message_view.type() == MessageInTransit::kTypeRawChannel) {
326 if (!OnReadMessageForRawChannel(message_view)) { 326 if (!OnReadMessageForRawChannel(message_view)) {
327 read_stopped_ = true; 327 read_stopped_ = true;
328 CallOnFatalError(Delegate::FATAL_ERROR_READ); 328 CallOnFatalError(Delegate::FATAL_ERROR_READ);
(...skipping 19 matching lines...) Expand all
348 CallOnFatalError(Delegate::FATAL_ERROR_READ); 348 CallOnFatalError(Delegate::FATAL_ERROR_READ);
349 return; 349 return;
350 } 350 }
351 } 351 }
352 } 352 }
353 353
354 // TODO(vtl): In the case that we aren't expecting any platform handles, 354 // TODO(vtl): In the case that we aren't expecting any platform handles,
355 // for the POSIX implementation, we should confirm that none are stored. 355 // for the POSIX implementation, we should confirm that none are stored.
356 356
357 // Dispatch the message. 357 // Dispatch the message.
358 CHECK(delegate_); 358 DCHECK(delegate_);
359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); 359 delegate_->OnReadMessage(message_view, platform_handles.Pass());
360 if (read_stopped_) { 360 if (read_stopped_) {
361 // |Shutdown()| was called in |OnReadMessage()|. 361 // |Shutdown()| was called in |OnReadMessage()|.
362 // TODO(vtl): Add test for this case. 362 // TODO(vtl): Add test for this case.
363 return; 363 return;
364 } 364 }
365 } 365 }
366 366
367 did_dispatch_message = true; 367 did_dispatch_message = true;
368 368
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
406 // (2) If we didn't max out |kReadSize|, stop reading for now. 406 // (2) If we didn't max out |kReadSize|, stop reading for now.
407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
408 bytes_read = 0; 408 bytes_read = 0;
409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
410 } while (io_result != IO_PENDING); 410 } while (io_result != IO_PENDING);
411 } 411 }
412 412
413 void RawChannel::OnWriteCompleted(bool result, 413 void RawChannel::OnWriteCompleted(bool result,
414 size_t platform_handles_written, 414 size_t platform_handles_written,
415 size_t bytes_written) { 415 size_t bytes_written) {
416 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 416 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
417 417
418 bool did_fail = false; 418 bool did_fail = false;
419 { 419 {
420 base::AutoLock locker(write_lock_); 420 base::AutoLock locker(write_lock_);
421 CHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); 421 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
422 422
423 if (write_stopped_) { 423 if (write_stopped_) {
424 NOTREACHED(); 424 NOTREACHED();
425 return; 425 return;
426 } 426 }
427 427
428 did_fail = !OnWriteCompletedNoLock( 428 did_fail = !OnWriteCompletedNoLock(
429 result, platform_handles_written, bytes_written); 429 result, platform_handles_written, bytes_written);
430 } 430 }
431 431
432 if (did_fail) 432 if (did_fail)
433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE); 433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE);
434 } 434 }
435 435
436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
437 write_lock_.AssertAcquired(); 437 write_lock_.AssertAcquired();
438 write_buffer_->message_queue_.push_back(message.release()); 438 write_buffer_->message_queue_.push_back(message.release());
439 } 439 }
440 440
441 bool RawChannel::OnReadMessageForRawChannel( 441 bool RawChannel::OnReadMessageForRawChannel(
442 const MessageInTransit::View& message_view) { 442 const MessageInTransit::View& message_view) {
443 // No non-implementation specific |RawChannel| control messages. 443 // No non-implementation specific |RawChannel| control messages.
444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() 444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
445 << ")"; 445 << ")";
446 return false; 446 return false;
447 } 447 }
448 448
449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { 449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
450 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 450 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
452 if (delegate_) 452 if (delegate_)
453 delegate_->OnFatalError(fatal_error); 453 delegate_->OnFatalError(fatal_error);
454 } 454 }
455 455
456 bool RawChannel::OnWriteCompletedNoLock(bool result, 456 bool RawChannel::OnWriteCompletedNoLock(bool result,
457 size_t platform_handles_written, 457 size_t platform_handles_written,
458 size_t bytes_written) { 458 size_t bytes_written) {
459 write_lock_.AssertAcquired(); 459 write_lock_.AssertAcquired();
460 460
461 CHECK(!write_stopped_); 461 DCHECK(!write_stopped_);
462 CHECK(!write_buffer_->message_queue_.empty()); 462 DCHECK(!write_buffer_->message_queue_.empty());
463 463
464 if (result) { 464 if (result) {
465 write_buffer_->platform_handles_offset_ += platform_handles_written; 465 write_buffer_->platform_handles_offset_ += platform_handles_written;
466 write_buffer_->data_offset_ += bytes_written; 466 write_buffer_->data_offset_ += bytes_written;
467 467
468 MessageInTransit* message = write_buffer_->message_queue_.front(); 468 MessageInTransit* message = write_buffer_->message_queue_.front();
469 if (write_buffer_->data_offset_ >= message->total_size()) { 469 if (write_buffer_->data_offset_ >= message->total_size()) {
470 // Complete write. 470 // Complete write.
471 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); 471 DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
472 write_buffer_->message_queue_.pop_front(); 472 write_buffer_->message_queue_.pop_front();
473 delete message; 473 delete message;
474 write_buffer_->platform_handles_offset_ = 0; 474 write_buffer_->platform_handles_offset_ = 0;
475 write_buffer_->data_offset_ = 0; 475 write_buffer_->data_offset_ = 0;
476 476
477 if (write_buffer_->message_queue_.empty()) 477 if (write_buffer_->message_queue_.empty())
478 return true; 478 return true;
479 } 479 }
480 480
481 // Schedule the next write. 481 // Schedule the next write.
482 IOResult io_result = ScheduleWriteNoLock(); 482 IOResult io_result = ScheduleWriteNoLock();
483 if (io_result == IO_PENDING) 483 if (io_result == IO_PENDING)
484 return true; 484 return true;
485 CHECK_EQ(io_result, IO_FAILED); 485 DCHECK_EQ(io_result, IO_FAILED);
486 } 486 }
487 487
488 write_stopped_ = true; 488 write_stopped_ = true;
489 STLDeleteElements(&write_buffer_->message_queue_); 489 STLDeleteElements(&write_buffer_->message_queue_);
490 write_buffer_->platform_handles_offset_ = 0; 490 write_buffer_->platform_handles_offset_ = 0;
491 write_buffer_->data_offset_ = 0; 491 write_buffer_->data_offset_ = 0;
492 return false; 492 return false;
493 } 493 }
494 494
495 } // namespace system 495 } // namespace system
496 } // namespace mojo 496 } // namespace mojo
OLDNEW
« no previous file with comments | « no previous file | trunk/src/mojo/system/raw_channel_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698