OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 12 matching lines...) Expand all Loading... |
23 | 23 |
24 // RawChannel::ReadBuffer ------------------------------------------------------ | 24 // RawChannel::ReadBuffer ------------------------------------------------------ |
25 | 25 |
26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
27 } | 27 } |
28 | 28 |
29 RawChannel::ReadBuffer::~ReadBuffer() { | 29 RawChannel::ReadBuffer::~ReadBuffer() { |
30 } | 30 } |
31 | 31 |
32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
33 CHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
34 *addr = &buffer_[0] + num_valid_bytes_; | 34 *addr = &buffer_[0] + num_valid_bytes_; |
35 *size = kReadSize; | 35 *size = kReadSize; |
36 } | 36 } |
37 | 37 |
38 // RawChannel::WriteBuffer ----------------------------------------------------- | 38 // RawChannel::WriteBuffer ----------------------------------------------------- |
39 | 39 |
40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) | 40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) |
41 : serialized_platform_handle_size_(serialized_platform_handle_size), | 41 : serialized_platform_handle_size_(serialized_platform_handle_size), |
42 platform_handles_offset_(0), | 42 platform_handles_offset_(0), |
43 data_offset_(0) { | 43 data_offset_(0) { |
44 } | 44 } |
45 | 45 |
46 RawChannel::WriteBuffer::~WriteBuffer() { | 46 RawChannel::WriteBuffer::~WriteBuffer() { |
47 STLDeleteElements(&message_queue_); | 47 STLDeleteElements(&message_queue_); |
48 } | 48 } |
49 | 49 |
50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { |
51 if (message_queue_.empty()) | 51 if (message_queue_.empty()) |
52 return false; | 52 return false; |
53 | 53 |
54 const TransportData* transport_data = | 54 const TransportData* transport_data = |
55 message_queue_.front()->transport_data(); | 55 message_queue_.front()->transport_data(); |
56 if (!transport_data) | 56 if (!transport_data) |
57 return false; | 57 return false; |
58 | 58 |
59 const embedder::PlatformHandleVector* all_platform_handles = | 59 const embedder::PlatformHandleVector* all_platform_handles = |
60 transport_data->platform_handles(); | 60 transport_data->platform_handles(); |
61 if (!all_platform_handles) { | 61 if (!all_platform_handles) { |
62 CHECK_EQ(platform_handles_offset_, 0u); | 62 DCHECK_EQ(platform_handles_offset_, 0u); |
63 return false; | 63 return false; |
64 } | 64 } |
65 if (platform_handles_offset_ >= all_platform_handles->size()) { | 65 if (platform_handles_offset_ >= all_platform_handles->size()) { |
66 CHECK_EQ(platform_handles_offset_, all_platform_handles->size()); | 66 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); |
67 return false; | 67 return false; |
68 } | 68 } |
69 | 69 |
70 return true; | 70 return true; |
71 } | 71 } |
72 | 72 |
73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( | 73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( |
74 size_t* num_platform_handles, | 74 size_t* num_platform_handles, |
75 embedder::PlatformHandle** platform_handles, | 75 embedder::PlatformHandle** platform_handles, |
76 void** serialization_data) { | 76 void** serialization_data) { |
77 CHECK(HavePlatformHandlesToSend()); | 77 DCHECK(HavePlatformHandlesToSend()); |
78 | 78 |
79 TransportData* transport_data = message_queue_.front()->transport_data(); | 79 TransportData* transport_data = message_queue_.front()->transport_data(); |
80 embedder::PlatformHandleVector* all_platform_handles = | 80 embedder::PlatformHandleVector* all_platform_handles = |
81 transport_data->platform_handles(); | 81 transport_data->platform_handles(); |
82 *num_platform_handles = | 82 *num_platform_handles = |
83 all_platform_handles->size() - platform_handles_offset_; | 83 all_platform_handles->size() - platform_handles_offset_; |
84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; | 84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; |
85 size_t serialization_data_offset = | 85 size_t serialization_data_offset = |
86 transport_data->platform_handle_table_offset(); | 86 transport_data->platform_handle_table_offset(); |
87 CHECK_GT(serialization_data_offset, 0u); | 87 DCHECK_GT(serialization_data_offset, 0u); |
88 serialization_data_offset += | 88 serialization_data_offset += |
89 platform_handles_offset_ * serialized_platform_handle_size_; | 89 platform_handles_offset_ * serialized_platform_handle_size_; |
90 *serialization_data = | 90 *serialization_data = |
91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset; | 91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset; |
92 } | 92 } |
93 | 93 |
94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { | 94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
95 buffers->clear(); | 95 buffers->clear(); |
96 | 96 |
97 if (message_queue_.empty()) | 97 if (message_queue_.empty()) |
98 return; | 98 return; |
99 | 99 |
100 MessageInTransit* message = message_queue_.front(); | 100 MessageInTransit* message = message_queue_.front(); |
101 CHECK_LT(data_offset_, message->total_size()); | 101 DCHECK_LT(data_offset_, message->total_size()); |
102 size_t bytes_to_write = message->total_size() - data_offset_; | 102 size_t bytes_to_write = message->total_size() - data_offset_; |
103 | 103 |
104 size_t transport_data_buffer_size = | 104 size_t transport_data_buffer_size = |
105 message->transport_data() ? message->transport_data()->buffer_size() : 0; | 105 message->transport_data() ? message->transport_data()->buffer_size() : 0; |
106 | 106 |
107 if (!transport_data_buffer_size) { | 107 if (!transport_data_buffer_size) { |
108 // Only write from the main buffer. | 108 // Only write from the main buffer. |
109 CHECK_LT(data_offset_, message->main_buffer_size()); | 109 DCHECK_LT(data_offset_, message->main_buffer_size()); |
110 CHECK_LE(bytes_to_write, message->main_buffer_size()); | 110 DCHECK_LE(bytes_to_write, message->main_buffer_size()); |
111 Buffer buffer = { | 111 Buffer buffer = { |
112 static_cast<const char*>(message->main_buffer()) + data_offset_, | 112 static_cast<const char*>(message->main_buffer()) + data_offset_, |
113 bytes_to_write}; | 113 bytes_to_write}; |
114 buffers->push_back(buffer); | 114 buffers->push_back(buffer); |
115 return; | 115 return; |
116 } | 116 } |
117 | 117 |
118 if (data_offset_ >= message->main_buffer_size()) { | 118 if (data_offset_ >= message->main_buffer_size()) { |
119 // Only write from the transport data buffer. | 119 // Only write from the transport data buffer. |
120 CHECK_LT(data_offset_ - message->main_buffer_size(), | 120 DCHECK_LT(data_offset_ - message->main_buffer_size(), |
121 transport_data_buffer_size); | 121 transport_data_buffer_size); |
122 CHECK_LE(bytes_to_write, transport_data_buffer_size); | 122 DCHECK_LE(bytes_to_write, transport_data_buffer_size); |
123 Buffer buffer = { | 123 Buffer buffer = { |
124 static_cast<const char*>(message->transport_data()->buffer()) + | 124 static_cast<const char*>(message->transport_data()->buffer()) + |
125 (data_offset_ - message->main_buffer_size()), | 125 (data_offset_ - message->main_buffer_size()), |
126 bytes_to_write}; | 126 bytes_to_write}; |
127 buffers->push_back(buffer); | 127 buffers->push_back(buffer); |
128 return; | 128 return; |
129 } | 129 } |
130 | 130 |
131 // TODO(vtl): We could actually send out buffers from multiple messages, with | 131 // TODO(vtl): We could actually send out buffers from multiple messages, with |
132 // the "stopping" condition being reaching a message with platform handles | 132 // the "stopping" condition being reaching a message with platform handles |
133 // attached. | 133 // attached. |
134 | 134 |
135 // Write from both buffers. | 135 // Write from both buffers. |
136 CHECK_EQ( | 136 DCHECK_EQ( |
137 bytes_to_write, | 137 bytes_to_write, |
138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); | 138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); |
139 Buffer buffer1 = { | 139 Buffer buffer1 = { |
140 static_cast<const char*>(message->main_buffer()) + data_offset_, | 140 static_cast<const char*>(message->main_buffer()) + data_offset_, |
141 message->main_buffer_size() - data_offset_}; | 141 message->main_buffer_size() - data_offset_}; |
142 buffers->push_back(buffer1); | 142 buffers->push_back(buffer1); |
143 Buffer buffer2 = { | 143 Buffer buffer2 = { |
144 static_cast<const char*>(message->transport_data()->buffer()), | 144 static_cast<const char*>(message->transport_data()->buffer()), |
145 transport_data_buffer_size}; | 145 transport_data_buffer_size}; |
146 buffers->push_back(buffer2); | 146 buffers->push_back(buffer2); |
147 } | 147 } |
148 | 148 |
149 // RawChannel ------------------------------------------------------------------ | 149 // RawChannel ------------------------------------------------------------------ |
150 | 150 |
151 RawChannel::RawChannel() | 151 RawChannel::RawChannel() |
152 : message_loop_for_io_(NULL), | 152 : message_loop_for_io_(NULL), |
153 delegate_(NULL), | 153 delegate_(NULL), |
154 read_stopped_(false), | 154 read_stopped_(false), |
155 write_stopped_(false), | 155 write_stopped_(false), |
156 weak_ptr_factory_(this) { | 156 weak_ptr_factory_(this) { |
157 } | 157 } |
158 | 158 |
159 RawChannel::~RawChannel() { | 159 RawChannel::~RawChannel() { |
160 CHECK(!read_buffer_); | 160 DCHECK(!read_buffer_); |
161 CHECK(!write_buffer_); | 161 DCHECK(!write_buffer_); |
162 | 162 |
163 // No need to take the |write_lock_| here -- if there are still weak pointers | 163 // No need to take the |write_lock_| here -- if there are still weak pointers |
164 // outstanding, then we're hosed anyway (since we wouldn't be able to | 164 // outstanding, then we're hosed anyway (since we wouldn't be able to |
165 // invalidate them cleanly, since we might not be on the I/O thread). | 165 // invalidate them cleanly, since we might not be on the I/O thread). |
166 CHECK(!weak_ptr_factory_.HasWeakPtrs()); | 166 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
167 } | 167 } |
168 | 168 |
169 bool RawChannel::Init(Delegate* delegate) { | 169 bool RawChannel::Init(Delegate* delegate) { |
170 CHECK(delegate); | 170 DCHECK(delegate); |
171 | 171 |
172 CHECK(!delegate_); | 172 DCHECK(!delegate_); |
173 delegate_ = delegate; | 173 delegate_ = delegate; |
174 | 174 |
175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); | 175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
176 CHECK(!message_loop_for_io_); | 176 DCHECK(!message_loop_for_io_); |
177 message_loop_for_io_ = | 177 message_loop_for_io_ = |
178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); | 178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
179 | 179 |
180 // No need to take the lock. No one should be using us yet. | 180 // No need to take the lock. No one should be using us yet. |
181 CHECK(!read_buffer_); | 181 DCHECK(!read_buffer_); |
182 read_buffer_.reset(new ReadBuffer); | 182 read_buffer_.reset(new ReadBuffer); |
183 CHECK(!write_buffer_); | 183 DCHECK(!write_buffer_); |
184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); | 184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); |
185 | 185 |
186 if (!OnInit()) { | 186 if (!OnInit()) { |
187 delegate_ = NULL; | 187 delegate_ = NULL; |
188 message_loop_for_io_ = NULL; | 188 message_loop_for_io_ = NULL; |
189 read_buffer_.reset(); | 189 read_buffer_.reset(); |
190 write_buffer_.reset(); | 190 write_buffer_.reset(); |
191 return false; | 191 return false; |
192 } | 192 } |
193 | 193 |
194 if (ScheduleRead() != IO_PENDING) { | 194 if (ScheduleRead() != IO_PENDING) { |
195 // This will notify the delegate about the read failure. Although we're on | 195 // This will notify the delegate about the read failure. Although we're on |
196 // the I/O thread, don't call it in the nested context. | 196 // the I/O thread, don't call it in the nested context. |
197 message_loop_for_io_->PostTask(FROM_HERE, | 197 message_loop_for_io_->PostTask(FROM_HERE, |
198 base::Bind(&RawChannel::OnReadCompleted, | 198 base::Bind(&RawChannel::OnReadCompleted, |
199 weak_ptr_factory_.GetWeakPtr(), | 199 weak_ptr_factory_.GetWeakPtr(), |
200 false, | 200 false, |
201 0)); | 201 0)); |
202 } | 202 } |
203 | 203 |
204 // ScheduleRead() failure is treated as a read failure (by notifying the | 204 // ScheduleRead() failure is treated as a read failure (by notifying the |
205 // delegate), not as an init failure. | 205 // delegate), not as an init failure. |
206 return true; | 206 return true; |
207 } | 207 } |
208 | 208 |
209 void RawChannel::Shutdown() { | 209 void RawChannel::Shutdown() { |
210 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 210 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
211 | 211 |
212 base::AutoLock locker(write_lock_); | 212 base::AutoLock locker(write_lock_); |
213 | 213 |
214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) | 214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) |
215 << "Shutting down RawChannel with write buffer nonempty"; | 215 << "Shutting down RawChannel with write buffer nonempty"; |
216 | 216 |
217 // Reset the delegate so that it won't receive further calls. | 217 // Reset the delegate so that it won't receive further calls. |
218 delegate_ = NULL; | 218 delegate_ = NULL; |
219 read_stopped_ = true; | 219 read_stopped_ = true; |
220 write_stopped_ = true; | 220 write_stopped_ = true; |
221 weak_ptr_factory_.InvalidateWeakPtrs(); | 221 weak_ptr_factory_.InvalidateWeakPtrs(); |
222 | 222 |
223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
224 } | 224 } |
225 | 225 |
226 // Reminder: This must be thread-safe. | 226 // Reminder: This must be thread-safe. |
227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
228 CHECK(message); | 228 DCHECK(message); |
229 | 229 |
230 base::AutoLock locker(write_lock_); | 230 base::AutoLock locker(write_lock_); |
231 if (write_stopped_) | 231 if (write_stopped_) |
232 return false; | 232 return false; |
233 | 233 |
234 if (!write_buffer_->message_queue_.empty()) { | 234 if (!write_buffer_->message_queue_.empty()) { |
235 EnqueueMessageNoLock(message.Pass()); | 235 EnqueueMessageNoLock(message.Pass()); |
236 return true; | 236 return true; |
237 } | 237 } |
238 | 238 |
239 EnqueueMessageNoLock(message.Pass()); | 239 EnqueueMessageNoLock(message.Pass()); |
240 CHECK_EQ(write_buffer_->data_offset_, 0u); | 240 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
241 | 241 |
242 size_t platform_handles_written = 0; | 242 size_t platform_handles_written = 0; |
243 size_t bytes_written = 0; | 243 size_t bytes_written = 0; |
244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
245 if (io_result == IO_PENDING) | 245 if (io_result == IO_PENDING) |
246 return true; | 246 return true; |
247 | 247 |
248 bool result = OnWriteCompletedNoLock( | 248 bool result = OnWriteCompletedNoLock( |
249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written); | 249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written); |
250 if (!result) { | 250 if (!result) { |
251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | 251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
252 // nested context. | 252 // nested context. |
253 message_loop_for_io_->PostTask(FROM_HERE, | 253 message_loop_for_io_->PostTask(FROM_HERE, |
254 base::Bind(&RawChannel::CallOnFatalError, | 254 base::Bind(&RawChannel::CallOnFatalError, |
255 weak_ptr_factory_.GetWeakPtr(), | 255 weak_ptr_factory_.GetWeakPtr(), |
256 Delegate::FATAL_ERROR_WRITE)); | 256 Delegate::FATAL_ERROR_WRITE)); |
257 } | 257 } |
258 | 258 |
259 return result; | 259 return result; |
260 } | 260 } |
261 | 261 |
262 // Reminder: This must be thread-safe. | 262 // Reminder: This must be thread-safe. |
263 bool RawChannel::IsWriteBufferEmpty() { | 263 bool RawChannel::IsWriteBufferEmpty() { |
264 base::AutoLock locker(write_lock_); | 264 base::AutoLock locker(write_lock_); |
265 return write_buffer_->message_queue_.empty(); | 265 return write_buffer_->message_queue_.empty(); |
266 } | 266 } |
267 | 267 |
268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { | 268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
269 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 269 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
270 | 270 |
271 if (read_stopped_) { | 271 if (read_stopped_) { |
272 NOTREACHED(); | 272 NOTREACHED(); |
273 return; | 273 return; |
274 } | 274 } |
275 | 275 |
276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; | 276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
277 | 277 |
278 // Keep reading data in a loop, and dispatch messages if enough data is | 278 // Keep reading data in a loop, and dispatch messages if enough data is |
279 // received. Exit the loop if any of the following happens: | 279 // received. Exit the loop if any of the following happens: |
(...skipping 23 matching lines...) Expand all Loading... |
303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | 303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
304 // next read). | 304 // next read). |
305 // TODO(vtl): Validate that |message_size| is sane. | 305 // TODO(vtl): Validate that |message_size| is sane. |
306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | 306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
307 &read_buffer_->buffer_[read_buffer_start], | 307 &read_buffer_->buffer_[read_buffer_start], |
308 remaining_bytes, | 308 remaining_bytes, |
309 &message_size) && | 309 &message_size) && |
310 remaining_bytes >= message_size) { | 310 remaining_bytes >= message_size) { |
311 MessageInTransit::View message_view( | 311 MessageInTransit::View message_view( |
312 message_size, &read_buffer_->buffer_[read_buffer_start]); | 312 message_size, &read_buffer_->buffer_[read_buffer_start]); |
313 CHECK_EQ(message_view.total_size(), message_size); | 313 DCHECK_EQ(message_view.total_size(), message_size); |
314 | 314 |
315 const char* error_message = NULL; | 315 const char* error_message = NULL; |
316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | 316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
317 &error_message)) { | 317 &error_message)) { |
318 CHECK(error_message); | 318 DCHECK(error_message); |
319 LOG(WARNING) << "Received invalid message: " << error_message; | 319 LOG(WARNING) << "Received invalid message: " << error_message; |
320 read_stopped_ = true; | 320 read_stopped_ = true; |
321 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 321 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
322 return; | 322 return; |
323 } | 323 } |
324 | 324 |
325 if (message_view.type() == MessageInTransit::kTypeRawChannel) { | 325 if (message_view.type() == MessageInTransit::kTypeRawChannel) { |
326 if (!OnReadMessageForRawChannel(message_view)) { | 326 if (!OnReadMessageForRawChannel(message_view)) { |
327 read_stopped_ = true; | 327 read_stopped_ = true; |
328 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 328 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
(...skipping 19 matching lines...) Expand all Loading... |
348 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 348 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
349 return; | 349 return; |
350 } | 350 } |
351 } | 351 } |
352 } | 352 } |
353 | 353 |
354 // TODO(vtl): In the case that we aren't expecting any platform handles, | 354 // TODO(vtl): In the case that we aren't expecting any platform handles, |
355 // for the POSIX implementation, we should confirm that none are stored. | 355 // for the POSIX implementation, we should confirm that none are stored. |
356 | 356 |
357 // Dispatch the message. | 357 // Dispatch the message. |
358 CHECK(delegate_); | 358 DCHECK(delegate_); |
359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | 359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
360 if (read_stopped_) { | 360 if (read_stopped_) { |
361 // |Shutdown()| was called in |OnReadMessage()|. | 361 // |Shutdown()| was called in |OnReadMessage()|. |
362 // TODO(vtl): Add test for this case. | 362 // TODO(vtl): Add test for this case. |
363 return; | 363 return; |
364 } | 364 } |
365 } | 365 } |
366 | 366 |
367 did_dispatch_message = true; | 367 did_dispatch_message = true; |
368 | 368 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
406 // (2) If we didn't max out |kReadSize|, stop reading for now. | 406 // (2) If we didn't max out |kReadSize|, stop reading for now. |
407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | 407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
408 bytes_read = 0; | 408 bytes_read = 0; |
409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | 409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
410 } while (io_result != IO_PENDING); | 410 } while (io_result != IO_PENDING); |
411 } | 411 } |
412 | 412 |
413 void RawChannel::OnWriteCompleted(bool result, | 413 void RawChannel::OnWriteCompleted(bool result, |
414 size_t platform_handles_written, | 414 size_t platform_handles_written, |
415 size_t bytes_written) { | 415 size_t bytes_written) { |
416 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 416 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
417 | 417 |
418 bool did_fail = false; | 418 bool did_fail = false; |
419 { | 419 { |
420 base::AutoLock locker(write_lock_); | 420 base::AutoLock locker(write_lock_); |
421 CHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); | 421 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
422 | 422 |
423 if (write_stopped_) { | 423 if (write_stopped_) { |
424 NOTREACHED(); | 424 NOTREACHED(); |
425 return; | 425 return; |
426 } | 426 } |
427 | 427 |
428 did_fail = !OnWriteCompletedNoLock( | 428 did_fail = !OnWriteCompletedNoLock( |
429 result, platform_handles_written, bytes_written); | 429 result, platform_handles_written, bytes_written); |
430 } | 430 } |
431 | 431 |
432 if (did_fail) | 432 if (did_fail) |
433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE); | 433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE); |
434 } | 434 } |
435 | 435 |
436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
437 write_lock_.AssertAcquired(); | 437 write_lock_.AssertAcquired(); |
438 write_buffer_->message_queue_.push_back(message.release()); | 438 write_buffer_->message_queue_.push_back(message.release()); |
439 } | 439 } |
440 | 440 |
441 bool RawChannel::OnReadMessageForRawChannel( | 441 bool RawChannel::OnReadMessageForRawChannel( |
442 const MessageInTransit::View& message_view) { | 442 const MessageInTransit::View& message_view) { |
443 // No non-implementation specific |RawChannel| control messages. | 443 // No non-implementation specific |RawChannel| control messages. |
444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() | 444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() |
445 << ")"; | 445 << ")"; |
446 return false; | 446 return false; |
447 } | 447 } |
448 | 448 |
449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { | 449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
450 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 450 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
452 if (delegate_) | 452 if (delegate_) |
453 delegate_->OnFatalError(fatal_error); | 453 delegate_->OnFatalError(fatal_error); |
454 } | 454 } |
455 | 455 |
456 bool RawChannel::OnWriteCompletedNoLock(bool result, | 456 bool RawChannel::OnWriteCompletedNoLock(bool result, |
457 size_t platform_handles_written, | 457 size_t platform_handles_written, |
458 size_t bytes_written) { | 458 size_t bytes_written) { |
459 write_lock_.AssertAcquired(); | 459 write_lock_.AssertAcquired(); |
460 | 460 |
461 CHECK(!write_stopped_); | 461 DCHECK(!write_stopped_); |
462 CHECK(!write_buffer_->message_queue_.empty()); | 462 DCHECK(!write_buffer_->message_queue_.empty()); |
463 | 463 |
464 if (result) { | 464 if (result) { |
465 write_buffer_->platform_handles_offset_ += platform_handles_written; | 465 write_buffer_->platform_handles_offset_ += platform_handles_written; |
466 write_buffer_->data_offset_ += bytes_written; | 466 write_buffer_->data_offset_ += bytes_written; |
467 | 467 |
468 MessageInTransit* message = write_buffer_->message_queue_.front(); | 468 MessageInTransit* message = write_buffer_->message_queue_.front(); |
469 if (write_buffer_->data_offset_ >= message->total_size()) { | 469 if (write_buffer_->data_offset_ >= message->total_size()) { |
470 // Complete write. | 470 // Complete write. |
471 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | 471 DCHECK_EQ(write_buffer_->data_offset_, message->total_size()); |
472 write_buffer_->message_queue_.pop_front(); | 472 write_buffer_->message_queue_.pop_front(); |
473 delete message; | 473 delete message; |
474 write_buffer_->platform_handles_offset_ = 0; | 474 write_buffer_->platform_handles_offset_ = 0; |
475 write_buffer_->data_offset_ = 0; | 475 write_buffer_->data_offset_ = 0; |
476 | 476 |
477 if (write_buffer_->message_queue_.empty()) | 477 if (write_buffer_->message_queue_.empty()) |
478 return true; | 478 return true; |
479 } | 479 } |
480 | 480 |
481 // Schedule the next write. | 481 // Schedule the next write. |
482 IOResult io_result = ScheduleWriteNoLock(); | 482 IOResult io_result = ScheduleWriteNoLock(); |
483 if (io_result == IO_PENDING) | 483 if (io_result == IO_PENDING) |
484 return true; | 484 return true; |
485 CHECK_EQ(io_result, IO_FAILED); | 485 DCHECK_EQ(io_result, IO_FAILED); |
486 } | 486 } |
487 | 487 |
488 write_stopped_ = true; | 488 write_stopped_ = true; |
489 STLDeleteElements(&write_buffer_->message_queue_); | 489 STLDeleteElements(&write_buffer_->message_queue_); |
490 write_buffer_->platform_handles_offset_ = 0; | 490 write_buffer_->platform_handles_offset_ = 0; |
491 write_buffer_->data_offset_ = 0; | 491 write_buffer_->data_offset_ = 0; |
492 return false; | 492 return false; |
493 } | 493 } |
494 | 494 |
495 } // namespace system | 495 } // namespace system |
496 } // namespace mojo | 496 } // namespace mojo |
OLD | NEW |