Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(528)

Side by Side Diff: chrome/common/ipc_channel_posix.cc

Issue 13757: message_pump_libevent refactor: (Closed)
Patch Set: Another small fix. Created 12 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/common/ipc_channel_posix.h" 5 #include "chrome/common/ipc_channel_posix.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <fcntl.h> 8 #include <fcntl.h>
9 #include <stddef.h>
9 #include <sys/types.h> 10 #include <sys/types.h>
10 #include <sys/socket.h> 11 #include <sys/socket.h>
11 #include <sys/stat.h> 12 #include <sys/stat.h>
12 #include <stddef.h> 13 #if defined(OS_LINUX)
14 #include <linux/un.h>
15 #elif defined(OS_MACOSX)
16 #include <sys/un.h>
Mark Mentovai 2008/12/15 21:43:40 Did you try to use <sys/un.h> on Linux here?
17 #endif
18
13 19
14 #include "base/logging.h" 20 #include "base/logging.h"
15 #include "base/process_util.h" 21 #include "base/process_util.h"
16 #include "base/scoped_ptr.h" 22 #include "base/scoped_ptr.h"
17 #include "base/string_util.h" 23 #include "base/string_util.h"
18 #include "chrome/common/chrome_counters.h" 24 #include "chrome/common/chrome_counters.h"
19 #include "chrome/common/ipc_message_utils.h" 25 #include "chrome/common/ipc_message_utils.h"
20 26
21 #if defined(OS_LINUX)
22 #include <linux/un.h>
23 #elif defined(OS_MACOSX)
24 #include <sys/un.h>
25 #endif
26
27 namespace IPC { 27 namespace IPC {
28 28
29 //------------------------------------------------------------------------------ 29 //------------------------------------------------------------------------------
30 // TODO(playmobil): Only use FIFOs for debugging, for real work, use a 30 // TODO(playmobil): Only use FIFOs for debugging, for real work, use a
31 // socketpair. 31 // socketpair.
32 namespace { 32 namespace {
33 33
34 // The -1 is to take the NULL terminator into account. 34 // The -1 is to take the NULL terminator into account.
35 #if defined(OS_LINUX) 35 #if defined(OS_LINUX)
36 const size_t kMaxPipeNameLength = UNIX_PATH_MAX - 1; 36 const size_t kMaxPipeNameLength = UNIX_PATH_MAX - 1;
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 *client_socket = fd; 143 *client_socket = fd;
144 return true; 144 return true;
145 } 145 }
146 146
147 } // namespace 147 } // namespace
148 //------------------------------------------------------------------------------ 148 //------------------------------------------------------------------------------
149 149
150 Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, 150 Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode,
151 Listener* listener) 151 Listener* listener)
152 : mode_(mode), 152 : mode_(mode),
153 server_listen_connection_event_(new EventHolder()), 153 is_blocked_on_write_(false),
Mark Mentovai 2008/12/15 21:43:40 Indentation messed up through here.
154 read_event_(new EventHolder()),
155 write_event_(new EventHolder()),
156 message_send_bytes_written_(0), 154 message_send_bytes_written_(0),
157 server_listen_pipe_(-1), 155 server_listen_pipe_(-1),
158 pipe_(-1), 156 pipe_(-1),
159 listener_(listener), 157 listener_(listener),
160 waiting_connect_(true), 158 waiting_connect_(true),
161 processing_incoming_(false), 159 processing_incoming_(false),
162 factory_(this) { 160 factory_(this) {
163 if (!CreatePipe(channel_id, mode)) { 161 if (!CreatePipe(channel_id, mode)) {
164 // The pipe may have been closed already. 162 // The pipe may have been closed already.
165 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << 163 LOG(WARNING) << "Unable to create pipe named \"" << channel_id <<
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 205
208 output_queue_.push(msg.release()); 206 output_queue_.push(msg.release());
209 return true; 207 return true;
210 } 208 }
211 209
212 bool Channel::ChannelImpl::Connect() { 210 bool Channel::ChannelImpl::Connect() {
213 if (mode_ == MODE_SERVER) { 211 if (mode_ == MODE_SERVER) {
214 if (server_listen_pipe_ == -1) { 212 if (server_listen_pipe_ == -1) {
215 return false; 213 return false;
216 } 214 }
217 event *ev = &(server_listen_connection_event_->event); 215 MessageLoopForIO::current()->WatchFileDescriptor(
218 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, 216 server_listen_pipe_,
219 EV_READ | EV_PERSIST, 217 true,
220 ev, 218 MessageLoopForIO::WATCH_READ,
221 this); 219 &server_listen_connection_watcher_,
222 server_listen_connection_event_->is_active = true; 220 this);
223 } else { 221 } else {
224 if (pipe_ == -1) { 222 if (pipe_ == -1) {
225 return false; 223 return false;
226 } 224 }
227 MessageLoopForIO::current()->WatchFileHandle(pipe_, 225 MessageLoopForIO::current()->WatchFileDescriptor(
228 EV_READ | EV_PERSIST, 226 pipe_,
229 &(read_event_->event), 227 true,
230 this); 228 MessageLoopForIO::WATCH_READ,
231 read_event_->is_active = true; 229 &read_watcher_,
230 this);
232 waiting_connect_ = false; 231 waiting_connect_ = false;
233 } 232 }
234 233
235 if (!waiting_connect_) 234 if (!waiting_connect_)
236 return ProcessOutgoingMessages(); 235 return ProcessOutgoingMessages();
237 return true; 236 return true;
238 } 237 }
239 238
240 bool Channel::ChannelImpl::ProcessIncomingMessages() { 239 bool Channel::ChannelImpl::ProcessIncomingMessages() {
241 ssize_t bytes_read = 0; 240 ssize_t bytes_read = 0;
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
310 309
311 bytes_read = 0; // Get more data. 310 bytes_read = 0; // Get more data.
312 } 311 }
313 312
314 return true; 313 return true;
315 } 314 }
316 315
317 bool Channel::ChannelImpl::ProcessOutgoingMessages() { 316 bool Channel::ChannelImpl::ProcessOutgoingMessages() {
318 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's 317 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
319 // no connection? 318 // no connection?
319 is_blocked_on_write_ = false;
320 320
321 if (output_queue_.empty()) 321 if (output_queue_.empty())
322 return true; 322 return true;
323 323
324 if (pipe_ == -1) 324 if (pipe_ == -1)
325 return false; 325 return false;
326 326
327 // If libevent was monitoring the socket for us (we blocked when trying to
328 // write a message last time), then delete the underlying libevent structure.
329 if (write_event_->is_active) {
330 // TODO(playmobil): This calls event_del(), but we can probably
331 // do with just calling event_add here.
332 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
333 write_event_->is_active = false;
334 }
335
336 // Write out all the messages we can till the write blocks or there are no 327 // Write out all the messages we can till the write blocks or there are no
337 // more outgoing messages. 328 // more outgoing messages.
338 while (!output_queue_.empty()) { 329 while (!output_queue_.empty()) {
339 Message* msg = output_queue_.front(); 330 Message* msg = output_queue_.front();
340 331
341 size_t amt_to_write = msg->size() - message_send_bytes_written_; 332 size_t amt_to_write = msg->size() - message_send_bytes_written_;
342 const char *out_bytes = reinterpret_cast<const char*>(msg->data()) + 333 const char *out_bytes = reinterpret_cast<const char*>(msg->data()) +
343 message_send_bytes_written_; 334 message_send_bytes_written_;
344 ssize_t bytes_written = -1; 335 ssize_t bytes_written = -1;
345 do { 336 do {
346 bytes_written = write(pipe_, out_bytes, amt_to_write); 337 bytes_written = write(pipe_, out_bytes, amt_to_write);
347 } while (bytes_written == -1 && errno == EINTR); 338 } while (bytes_written == -1 && errno == EINTR);
348 339
349 if (bytes_written < 0) { 340 if (bytes_written < 0) {
350 LOG(ERROR) << "pipe error: " << strerror(errno); 341 LOG(ERROR) << "pipe error: " << strerror(errno);
351 return false; 342 return false;
352 } 343 }
353 344
354 if (static_cast<size_t>(bytes_written) != amt_to_write) { 345 if (static_cast<size_t>(bytes_written) != amt_to_write) {
355 message_send_bytes_written_ += bytes_written; 346 message_send_bytes_written_ += bytes_written;
356 347
357 // Tell libevent to call us back once things are unblocked. 348 // Tell libevent to call us back once things are unblocked.
358 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, 349 is_blocked_on_write_ = true;
359 EV_WRITE, 350 MessageLoopForIO::current()->WatchFileDescriptor(
360 &(write_event_->event), 351 pipe_,
361 this); 352 false, // One shot
362 write_event_->is_active = true; 353 MessageLoopForIO::WATCH_WRITE,
363 354 &write_watcher_,
355 this);
364 } else { 356 } else {
365 message_send_bytes_written_ = 0; 357 message_send_bytes_written_ = 0;
366 358
367 // Message sent OK! 359 // Message sent OK!
368 #ifdef IPC_MESSAGE_DEBUG_EXTRA 360 #ifdef IPC_MESSAGE_DEBUG_EXTRA
369 DLOG(INFO) << "sent message @" << msg << " on channel @" << this << 361 DLOG(INFO) << "sent message @" << msg << " on channel @" << this <<
370 " with type " << msg->type(); 362 " with type " << msg->type();
371 #endif 363 #endif
372 output_queue_.pop(); 364 output_queue_.pop();
373 delete msg; 365 delete msg;
(...skipping 10 matching lines...) Expand all
384 << " (" << output_queue_.size() << " in queue)"; 376 << " (" << output_queue_.size() << " in queue)";
385 #endif 377 #endif
386 378
387 // TODO(playmobil): implement 379 // TODO(playmobil): implement
388 // #ifdef IPC_MESSAGE_LOG_ENABLED 380 // #ifdef IPC_MESSAGE_LOG_ENABLED
389 // Logging::current()->OnSendMessage(message, L""); 381 // Logging::current()->OnSendMessage(message, L"");
390 // #endif 382 // #endif
391 383
392 output_queue_.push(message); 384 output_queue_.push(message);
393 if (!waiting_connect_) { 385 if (!waiting_connect_) {
394 if (!write_event_->is_active) { 386 if (!is_blocked_on_write_) {
395 if (!ProcessOutgoingMessages()) 387 if (!ProcessOutgoingMessages())
396 return false; 388 return false;
397 } 389 }
398 } 390 }
399 391
400 return true; 392 return true;
401 } 393 }
402 394
403 // Called by libevent when we can read from th pipe without blocking. 395 // Called by libevent when we can read from th pipe without blocking.
404 void Channel::ChannelImpl::OnFileReadReady(int fd) { 396 void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
405 bool send_server_hello_msg = false; 397 bool send_server_hello_msg = false;
406 if (waiting_connect_ && mode_ == MODE_SERVER) { 398 if (waiting_connect_ && mode_ == MODE_SERVER) {
407 if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { 399 if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) {
408 Close(); 400 Close();
409 } 401 }
410 402
411 // No need to watch the listening socket any longer since only one client 403 // No need to watch the listening socket any longer since only one client
412 // can connect. So unregister with libevent. 404 // can connect. So unregister with libevent.
413 event *ev = &(server_listen_connection_event_->event); 405 server_listen_connection_watcher_.StopWatchingFileDescriptor();
414 MessageLoopForIO::current()->UnwatchFileHandle(ev);
415 server_listen_connection_event_->is_active = false;
416 406
417 // Start watching our end of the socket. 407 // Start watching our end of the socket.
418 MessageLoopForIO::current()->WatchFileHandle(pipe_, 408 MessageLoopForIO::current()->WatchFileDescriptor(
419 EV_READ | EV_PERSIST, 409 pipe_,
420 &(read_event_->event), 410 true,
421 this); 411 MessageLoopForIO::WATCH_READ,
422 read_event_->is_active = true; 412 &read_watcher_,
413 this);
414
423 waiting_connect_ = false; 415 waiting_connect_ = false;
424 send_server_hello_msg = true; 416 send_server_hello_msg = true;
425 } 417 }
426 418
427 if (!waiting_connect_ && fd == pipe_) { 419 if (!waiting_connect_ && fd == pipe_) {
428 if (!ProcessIncomingMessages()) { 420 if (!ProcessIncomingMessages()) {
429 Close(); 421 Close();
430 listener_->OnChannelError(); 422 listener_->OnChannelError();
431 } 423 }
432 } 424 }
433 425
434 // If we're a server and handshaking, then we want to make sure that we 426 // If we're a server and handshaking, then we want to make sure that we
435 // only send our handshake message after we've processed the client's. 427 // only send our handshake message after we've processed the client's.
436 // This gives us a chance to kill the client if the incoming handshake 428 // This gives us a chance to kill the client if the incoming handshake
437 // is invalid. 429 // is invalid.
438 if (send_server_hello_msg) { 430 if (send_server_hello_msg) {
431 // This should be our first write so there' sno chance we can block here...
432 DCHECK(is_blocked_on_write_ == false);
439 ProcessOutgoingMessages(); 433 ProcessOutgoingMessages();
440 } 434 }
441 } 435 }
442 436
443 // Called by libevent when we can write to the pipe without blocking. 437 // Called by libevent when we can write to the pipe without blocking.
444 void Channel::ChannelImpl::OnFileWriteReady(int fd) { 438 void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
445 if (!ProcessOutgoingMessages()) { 439 if (!ProcessOutgoingMessages()) {
446 Close(); 440 Close();
447 listener_->OnChannelError(); 441 listener_->OnChannelError();
448 } 442 }
449 } 443 }
450 444
451 void Channel::ChannelImpl::Close() { 445 void Channel::ChannelImpl::Close() {
452 // Close can be called multiple time, so we need to make sure we're 446 // Close can be called multiple time, so we need to make sure we're
453 // idempotent. 447 // idempotent.
454 448
455 // Unregister libevent for the listening socket and close it. 449 // Unregister libevent for the listening socket and close it.
456 if (server_listen_connection_event_ && 450 server_listen_connection_watcher_.StopWatchingFileDescriptor();
457 server_listen_connection_event_->is_active) {
458 MessageLoopForIO::current()->UnwatchFileHandle(
459 &(server_listen_connection_event_->event));
460 }
461 451
462 if (server_listen_pipe_ != -1) { 452 if (server_listen_pipe_ != -1) {
463 close(server_listen_pipe_); 453 close(server_listen_pipe_);
464 server_listen_pipe_ = -1; 454 server_listen_pipe_ = -1;
465 } 455 }
466 456
467 // Unregister libevent for the FIFO and close it. 457 // Unregister libevent for the FIFO and close it.
468 if (read_event_ && read_event_->is_active) { 458 read_watcher_.StopWatchingFileDescriptor();
469 MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event)); 459 write_watcher_.StopWatchingFileDescriptor();
470 }
471 if (write_event_ && write_event_->is_active) {
472 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
473 }
474 if (pipe_ != -1) { 460 if (pipe_ != -1) {
475 close(pipe_); 461 close(pipe_);
476 pipe_ = -1; 462 pipe_ = -1;
477 } 463 }
478 464
479 delete server_listen_connection_event_;
480 server_listen_connection_event_ = NULL;
481 delete read_event_;
482 read_event_ = NULL;
483 delete write_event_;
484 write_event_ = NULL;
485
486 // Unlink the FIFO 465 // Unlink the FIFO
487 unlink(pipe_name_.c_str()); 466 unlink(pipe_name_.c_str());
488 467
489 while (!output_queue_.empty()) { 468 while (!output_queue_.empty()) {
490 Message* m = output_queue_.front(); 469 Message* m = output_queue_.front();
491 output_queue_.pop(); 470 output_queue_.pop();
492 delete m; 471 delete m;
493 } 472 }
494 } 473 }
495 474
(...skipping 18 matching lines...) Expand all
514 493
515 void Channel::set_listener(Listener* listener) { 494 void Channel::set_listener(Listener* listener) {
516 channel_impl_->set_listener(listener); 495 channel_impl_->set_listener(listener);
517 } 496 }
518 497
519 bool Channel::Send(Message* message) { 498 bool Channel::Send(Message* message) {
520 return channel_impl_->Send(message); 499 return channel_impl_->Send(message);
521 } 500 }
522 501
523 } // namespace IPC 502 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698