OLD | NEW |
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/common/ipc_channel.h" | 5 #include "chrome/common/ipc_channel.h" |
6 | 6 |
| 7 #include <fcntl.h> |
| 8 #include <sys/types.h> |
| 9 #include <sys/socket.h> |
| 10 #include <sys/stat.h> |
| 11 #include <stddef.h> |
| 12 #include <sys/un.h> |
| 13 |
| 14 #include "base/logging.h" |
| 15 #include "base/process_util.h" |
| 16 #include "base/scoped_ptr.h" |
| 17 #include "base/string_util.h" |
| 18 #include "chrome/common/chrome_counters.h" |
| 19 #include "chrome/common/ipc_message_utils.h" |
| 20 #include "third_party/libevent/event.h" |
7 | 21 |
8 namespace IPC { | 22 namespace IPC { |
9 | 23 |
10 // TODO(playmobil): implement. | |
11 | |
12 //------------------------------------------------------------------------------ | 24 //------------------------------------------------------------------------------ |
13 | 25 // TODO(playmobil): Only use FIFOs for debugging, for real work, use a |
14 Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener) | 26 // socketpair. |
15 : factory_(this) { | 27 namespace { |
16 NOTREACHED(); | 28 |
| 29 // The -1 is to take the NULL terminator into account. |
| 30 #if defined(OS_LINUX) |
| 31 const size_t kMaxPipeNameLength = UNIX_PATH_MAX - 1; |
| 32 #elif defined(OS_MACOSX) |
| 33 // OS X doesn't define UNIX_PATH_MAX |
| 34 // Per the size specified for the sun_path structure of sockaddr_un in sys/un.h. |
| 35 const size_t kMaxPipeNameLength = 104 - 1; |
| 36 #endif |
| 37 |
| 38 // Creates a Fifo with the specified name ready to listen on. |
| 39 bool CreateServerFifo(const std::string &pipe_name, int* server_listen_fd) { |
| 40 DCHECK(server_listen_fd); |
| 41 DCHECK(pipe_name.length() > 0); |
| 42 DCHECK(pipe_name.length() < kMaxPipeNameLength); |
| 43 |
| 44 if (pipe_name.length() == 0 || pipe_name.length() > kMaxPipeNameLength) { |
| 45 return false; |
| 46 } |
| 47 |
| 48 // Create socket. |
| 49 int fd = socket(AF_UNIX, SOCK_STREAM, 0); |
| 50 if (fd < 0) { |
| 51 return false; |
| 52 } |
| 53 |
| 54 // Make socket non-blocking |
| 55 if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { |
| 56 close(fd); |
| 57 return false; |
| 58 } |
| 59 |
| 60 // Delete any old FS instances. |
| 61 unlink(pipe_name.c_str()); |
| 62 |
| 63 // Create unix_addr structure |
| 64 struct sockaddr_un unix_addr; |
| 65 memset(&unix_addr, 0, sizeof(unix_addr)); |
| 66 unix_addr.sun_family = AF_UNIX; |
| 67 snprintf(unix_addr.sun_path, kMaxPipeNameLength + 1, "%s", pipe_name.c_str()); |
| 68 size_t unix_addr_len = offsetof(struct sockaddr_un, sun_path) + |
| 69 strlen(unix_addr.sun_path) + 1; |
| 70 |
| 71 // Bind the socket. |
| 72 if (bind(fd, reinterpret_cast<const sockaddr*>(&unix_addr), |
| 73 unix_addr_len) != 0) { |
| 74 close(fd); |
| 75 return false; |
| 76 } |
| 77 |
| 78 // Start listening on the socket. |
| 79 const int listen_queue_length = 1; |
| 80 if (listen(fd, listen_queue_length) != 0) { |
| 81 close(fd); |
| 82 return false; |
| 83 } |
| 84 |
| 85 *server_listen_fd = fd; |
| 86 return true; |
| 87 } |
| 88 |
| 89 // Accept a connection on a fifo. |
| 90 bool ServerAcceptFifoConnection(int server_listen_fd, int* server_socket) { |
| 91 DCHECK(server_socket); |
| 92 |
| 93 int accept_fd = accept(server_listen_fd, NULL, 0); |
| 94 if (accept_fd < 0) |
| 95 return false; |
| 96 |
| 97 *server_socket = accept_fd; |
| 98 return true; |
| 99 } |
| 100 |
| 101 bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) { |
| 102 DCHECK(client_socket); |
| 103 DCHECK(pipe_name.length() < kMaxPipeNameLength); |
| 104 |
| 105 // Create socket. |
| 106 int fd = socket(AF_UNIX, SOCK_STREAM, 0); |
| 107 if (fd < 0) { |
| 108 LOG(ERROR) << "fd is invalid"; |
| 109 return false; |
| 110 } |
| 111 |
| 112 // Make socket non-blocking |
| 113 if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { |
| 114 LOG(ERROR) << "fcnt failed"; |
| 115 close(fd); |
| 116 return false; |
| 117 } |
| 118 |
| 119 // Create server side of socket. |
| 120 struct sockaddr_un server_unix_addr; |
| 121 memset(&server_unix_addr, 0, sizeof(server_unix_addr)); |
| 122 server_unix_addr.sun_family = AF_UNIX; |
| 123 snprintf(server_unix_addr.sun_path, kMaxPipeNameLength + 1, "%s", |
| 124 pipe_name.c_str()); |
| 125 size_t server_unix_addr_len = offsetof(struct sockaddr_un, sun_path) + |
| 126 strlen(server_unix_addr.sun_path) + 1; |
| 127 |
| 128 int ret_val = -1; |
| 129 do { |
| 130 ret_val = connect(fd, reinterpret_cast<sockaddr*>(&server_unix_addr), |
| 131 server_unix_addr_len); |
| 132 } while (ret_val == -1 && errno == EINTR); |
| 133 if (ret_val != 0) { |
| 134 close(fd); |
| 135 return false; |
| 136 } |
| 137 |
| 138 *client_socket = fd; |
| 139 return true; |
| 140 } |
| 141 |
| 142 } // namespace |
| 143 |
| 144 //------------------------------------------------------------------------------ |
| 145 |
| 146 // PIMPL wrapper for libevent event. |
| 147 // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. |
| 148 struct Channel::EventHolder { |
| 149 EventHolder() : is_active(false) {} |
| 150 ~EventHolder() {} |
| 151 |
| 152 bool is_active; |
| 153 |
| 154 // libevent's set functions set all the needed members of this struct, so no |
| 155 // need to initialize before use. |
| 156 struct event event; |
| 157 }; |
| 158 |
| 159 //------------------------------------------------------------------------------ |
| 160 |
| 161 Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener) |
| 162 : mode_(mode), |
| 163 server_listen_connection_event_(new EventHolder()), |
| 164 read_event_(new EventHolder()), |
| 165 write_event_(new EventHolder()), |
| 166 message_send_bytes_written_(0), |
| 167 server_listen_pipe_(-1), |
| 168 pipe_(-1), |
| 169 listener_(listener), |
| 170 waiting_connect_(true), |
| 171 processing_incoming_(false), |
| 172 factory_(this) { |
| 173 if (!CreatePipe(channel_id, mode)) { |
| 174 // The pipe may have been closed already. |
| 175 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << |
| 176 "\" in " << (mode == MODE_SERVER ? "server" : "client") << |
| 177 " mode error(" << strerror(errno) << ")."; |
| 178 } |
| 179 } |
| 180 |
| 181 const std::wstring Channel::PipeName(const std::wstring& channel_id) const { |
| 182 std::wostringstream ss; |
| 183 // TODO(playmobil): This should live in the Chrome user data directory. |
| 184 // TODO(playmobil): Cleanup any stale fifos. |
| 185 ss << L"/var/tmp/chrome_" << channel_id; |
| 186 return ss.str(); |
| 187 } |
| 188 |
| 189 bool Channel::CreatePipe(const std::wstring& channel_id, Mode mode) { |
| 190 DCHECK(server_listen_pipe_ == -1 && pipe_ == -1); |
| 191 |
| 192 // TODO(playmobil): Should we just change pipe_name to be a normal string |
| 193 // everywhere? |
| 194 pipe_name_ = WideToUTF8(PipeName(channel_id)); |
| 195 |
| 196 if (mode == MODE_SERVER) { |
| 197 if (!CreateServerFifo(pipe_name_, &server_listen_pipe_)) { |
| 198 return false; |
| 199 } |
| 200 } else { |
| 201 if (!ClientConnectToFifo(pipe_name_, &pipe_)) { |
| 202 return false; |
| 203 } |
| 204 waiting_connect_ = false; |
| 205 } |
| 206 |
| 207 // Create the Hello message to be sent when Connect is called |
| 208 scoped_ptr<Message> msg(new Message(MSG_ROUTING_NONE, |
| 209 HELLO_MESSAGE_TYPE, |
| 210 IPC::Message::PRIORITY_NORMAL)); |
| 211 if (!msg->WriteInt(base::GetCurrentProcId())) { |
| 212 Close(); |
| 213 return false; |
| 214 } |
| 215 |
| 216 output_queue_.push(msg.release()); |
| 217 return true; |
| 218 } |
| 219 |
| 220 bool Channel::Connect() { |
| 221 if (mode_ == MODE_SERVER) { |
| 222 if (server_listen_pipe_ == -1) { |
| 223 return false; |
| 224 } |
| 225 event *ev = &(server_listen_connection_event_->event); |
| 226 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, |
| 227 EV_READ | EV_PERSIST, |
| 228 ev, |
| 229 this); |
| 230 server_listen_connection_event_->is_active = true; |
| 231 } else { |
| 232 if (pipe_ == -1) { |
| 233 return false; |
| 234 } |
| 235 MessageLoopForIO::current()->WatchFileHandle(pipe_, |
| 236 EV_READ | EV_PERSIST, |
| 237 &(read_event_->event), |
| 238 this); |
| 239 read_event_->is_active = true; |
| 240 waiting_connect_ = false; |
| 241 } |
| 242 |
| 243 if (!waiting_connect_) |
| 244 return ProcessOutgoingMessages(); |
| 245 return true; |
| 246 } |
| 247 |
| 248 bool Channel::ProcessIncomingMessages() { |
| 249 ssize_t bytes_read = 0; |
| 250 |
| 251 for (;;) { |
| 252 if (bytes_read == 0) { |
| 253 if (pipe_ == -1) |
| 254 return false; |
| 255 |
| 256 // Read from pipe. |
| 257 // recv() returns 0 if the connection has closed or EAGAIN if no data is |
| 258 // waiting on the pipe. |
| 259 do { |
| 260 bytes_read = read(pipe_, input_buf_, BUF_SIZE); |
| 261 } while (bytes_read == -1 && errno == EINTR); |
| 262 if (bytes_read < 0) { |
| 263 if (errno == EAGAIN) { |
| 264 return true; |
| 265 } else { |
| 266 LOG(ERROR) << "pipe error: " << strerror(errno); |
| 267 return false; |
| 268 } |
| 269 } else if (bytes_read == 0) { |
| 270 // The pipe has closed... |
| 271 Close(); |
| 272 return true; |
| 273 } |
| 274 } |
| 275 DCHECK(bytes_read); |
| 276 |
| 277 // Process messages from input buffer. |
| 278 const char *p; |
| 279 const char *end; |
| 280 if (input_overflow_buf_.empty()) { |
| 281 p = input_buf_; |
| 282 end = p + bytes_read; |
| 283 } else { |
| 284 if (input_overflow_buf_.size() > |
| 285 static_cast<size_t>(kMaximumMessageSize - bytes_read)) { |
| 286 input_overflow_buf_.clear(); |
| 287 LOG(ERROR) << "IPC message is too big"; |
| 288 return false; |
| 289 } |
| 290 input_overflow_buf_.append(input_buf_, bytes_read); |
| 291 p = input_overflow_buf_.data(); |
| 292 end = p + input_overflow_buf_.size(); |
| 293 } |
| 294 |
| 295 while (p < end) { |
| 296 const char* message_tail = Message::FindNext(p, end); |
| 297 if (message_tail) { |
| 298 int len = static_cast<int>(message_tail - p); |
| 299 const Message m(p, len); |
| 300 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 301 DLOG(INFO) << "received message on channel @" << this << |
| 302 " with type " << m.type(); |
| 303 #endif |
| 304 if (m.routing_id() == MSG_ROUTING_NONE && |
| 305 m.type() == HELLO_MESSAGE_TYPE) { |
| 306 // The Hello message contains only the process id. |
| 307 listener_->OnChannelConnected(MessageIterator(m).NextInt()); |
| 308 } else { |
| 309 listener_->OnMessageReceived(m); |
| 310 } |
| 311 p = message_tail; |
| 312 } else { |
| 313 // Last message is partial. |
| 314 break; |
| 315 } |
| 316 } |
| 317 input_overflow_buf_.assign(p, end - p); |
| 318 |
| 319 bytes_read = 0; // Get more data. |
| 320 } |
| 321 |
| 322 return true; |
| 323 } |
| 324 |
| 325 bool Channel::ProcessOutgoingMessages() { |
| 326 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
| 327 // no connection? |
| 328 |
| 329 if (output_queue_.empty()) |
| 330 return true; |
| 331 |
| 332 if (pipe_ == -1) |
| 333 return false; |
| 334 |
| 335 // If libevent was monitoring the socket for us (we blocked when trying to |
| 336 // write a message last time), then delete the underlying libevent structure. |
| 337 if (write_event_->is_active) { |
| 338 // TODO(playmobil): This calls event_del(), but we can probably |
| 339 // do with just calling event_add here. |
| 340 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); |
| 341 write_event_->is_active = false; |
| 342 } |
| 343 |
| 344 // Write out all the messages we can till the write blocks or there are no |
| 345 // more outgoing messages. |
| 346 while (!output_queue_.empty()) { |
| 347 Message* msg = output_queue_.front(); |
| 348 |
| 349 size_t amt_to_write = msg->size() - message_send_bytes_written_; |
| 350 const char *out_bytes = reinterpret_cast<const char*>(msg->data()) + |
| 351 message_send_bytes_written_; |
| 352 ssize_t bytes_written = -1; |
| 353 do { |
| 354 bytes_written = write(pipe_, out_bytes, amt_to_write); |
| 355 } while (bytes_written == -1 && errno == EINTR); |
| 356 |
| 357 if (bytes_written < 0) { |
| 358 LOG(ERROR) << "pipe error: " << strerror(errno); |
| 359 return false; |
| 360 } |
| 361 |
| 362 if (static_cast<size_t>(bytes_written) != amt_to_write) { |
| 363 message_send_bytes_written_ += bytes_written; |
| 364 |
| 365 // Tell libevent to call us back once things are unblocked. |
| 366 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, |
| 367 EV_WRITE, |
| 368 &(write_event_->event), |
| 369 this); |
| 370 write_event_->is_active = true; |
| 371 |
| 372 } else { |
| 373 message_send_bytes_written_ = 0; |
| 374 |
| 375 // Message sent OK! |
| 376 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 377 DLOG(INFO) << "sent message @" << msg << " on channel @" << this << |
| 378 " with type " << msg->type(); |
| 379 #endif |
| 380 output_queue_.pop(); |
| 381 delete msg; |
| 382 } |
| 383 } |
| 384 return true; |
| 385 } |
| 386 |
| 387 bool Channel::Send(Message* message) { |
| 388 chrome::Counters::ipc_send_counter().Increment(); |
| 389 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 390 DLOG(INFO) << "sending message @" << message << " on channel @" << this |
| 391 << " with type " << message->type() |
| 392 << " (" << output_queue_.size() << " in queue)"; |
| 393 #endif |
| 394 |
| 395 // TODO(playmobil): implement |
| 396 // #ifdef IPC_MESSAGE_LOG_ENABLED |
| 397 // Logging::current()->OnSendMessage(message, L""); |
| 398 // #endif |
| 399 |
| 400 output_queue_.push(message); |
| 401 if (!waiting_connect_) { |
| 402 if (!write_event_->is_active) { |
| 403 if (!ProcessOutgoingMessages()) |
| 404 return false; |
| 405 } |
| 406 } |
| 407 |
| 408 return true; |
| 409 } |
| 410 |
| 411 // Called by libevent when we can read from th pipe without blocking. |
| 412 void Channel::OnFileReadReady(int fd) { |
| 413 bool send_server_hello_msg = false; |
| 414 if (waiting_connect_ && mode_ == MODE_SERVER) { |
| 415 if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { |
| 416 Close(); |
| 417 } |
| 418 |
| 419 // No need to watch the listening socket any longer since only one client |
| 420 // can connect. So unregister with libevent. |
| 421 event *ev = &(server_listen_connection_event_->event); |
| 422 MessageLoopForIO::current()->UnwatchFileHandle(ev); |
| 423 server_listen_connection_event_->is_active = false; |
| 424 |
| 425 // Start watching our end of the socket. |
| 426 MessageLoopForIO::current()->WatchFileHandle(pipe_, |
| 427 EV_READ | EV_PERSIST, |
| 428 &(read_event_->event), |
| 429 this); |
| 430 read_event_->is_active = true; |
| 431 waiting_connect_ = false; |
| 432 send_server_hello_msg = true; |
| 433 } |
| 434 |
| 435 if (!waiting_connect_ && fd == pipe_) { |
| 436 if (!ProcessIncomingMessages()) { |
| 437 Close(); |
| 438 listener_->OnChannelError(); |
| 439 } |
| 440 } |
| 441 |
| 442 // If we're a server and handshaking, then we want to make sure that we |
| 443 // only send our handshake message after we've processed the client's. |
| 444 // This gives us a chance to kill the client if the incoming handshake |
| 445 // is invalid. |
| 446 if (send_server_hello_msg) { |
| 447 ProcessOutgoingMessages(); |
| 448 } |
| 449 } |
| 450 |
| 451 // Called by libevent when we can write to the pipe without blocking. |
| 452 void Channel::OnFileWriteReady(int fd) { |
| 453 if (!ProcessOutgoingMessages()) { |
| 454 Close(); |
| 455 listener_->OnChannelError(); |
| 456 } |
17 } | 457 } |
18 | 458 |
19 void Channel::Close() { | 459 void Channel::Close() { |
20 NOTREACHED(); | 460 // Close can be called multiple time, so we need to make sure we're |
21 } | 461 // idempotent. |
22 | 462 |
23 bool Channel::Send(Message* message) { | 463 // Unregister libevent for the listening socket and close it. |
24 NOTREACHED(); | 464 if (server_listen_connection_event_ && |
25 return false; | 465 server_listen_connection_event_->is_active) { |
26 } | 466 MessageLoopForIO::current()->UnwatchFileHandle( |
27 | 467 &(server_listen_connection_event_->event)); |
28 bool Channel::Connect() { | 468 } |
29 NOTREACHED(); | 469 |
30 return false; | 470 if (server_listen_pipe_ != -1) { |
31 } | 471 close(server_listen_pipe_); |
| 472 server_listen_pipe_ = -1; |
| 473 } |
| 474 |
| 475 // Unregister libevent for the FIFO and close it. |
| 476 if (read_event_ && read_event_->is_active) { |
| 477 MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event)); |
| 478 } |
| 479 if (write_event_ && write_event_->is_active) { |
| 480 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); |
| 481 } |
| 482 if (pipe_ != -1) { |
| 483 close(pipe_); |
| 484 pipe_ = -1; |
| 485 } |
| 486 |
| 487 delete server_listen_connection_event_; |
| 488 server_listen_connection_event_ = NULL; |
| 489 delete read_event_; |
| 490 read_event_ = NULL; |
| 491 delete write_event_; |
| 492 write_event_ = NULL; |
| 493 |
| 494 // Unlink the FIFO |
| 495 unlink(pipe_name_.c_str()); |
| 496 |
| 497 while (!output_queue_.empty()) { |
| 498 Message* m = output_queue_.front(); |
| 499 output_queue_.pop(); |
| 500 delete m; |
| 501 } |
| 502 } |
| 503 |
32 } // namespace IPC | 504 } // namespace IPC |
OLD | NEW |