Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(510)

Side by Side Diff: chrome/common/ipc_channel_posix.cc

Issue 12927: First cut at POSIX Implementation of IPC Channel using FIFOs. (Closed)
Patch Set: fix for agl's comments Created 12 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « chrome/common/ipc_channel.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/common/ipc_channel.h" 5 #include "chrome/common/ipc_channel.h"
6 6
7 #include <fcntl.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/stat.h>
11 #include <stddef.h>
12 #include <sys/un.h>
13
14 #include "base/logging.h"
15 #include "base/process_util.h"
16 #include "base/scoped_ptr.h"
17 #include "base/string_util.h"
18 #include "chrome/common/chrome_counters.h"
19 #include "chrome/common/ipc_message_utils.h"
20 #include "third_party/libevent/event.h"
7 21
8 namespace IPC { 22 namespace IPC {
9 23
10 // TODO(playmobil): implement.
11
12 //------------------------------------------------------------------------------ 24 //------------------------------------------------------------------------------
13 25 // TODO(playmobil): Only use FIFOs for debugging, for real work, use a
14 Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener) 26 // socketpair.
15 : factory_(this) { 27 namespace {
16 NOTREACHED(); 28
29 // The -1 is to take the NULL terminator into account.
30 #if defined(OS_LINUX)
31 const size_t kMaxPipeNameLength = UNIX_PATH_MAX - 1;
32 #elif defined(OS_MACOSX)
33 // OS X doesn't define UNIX_PATH_MAX
34 // Per the size specified for the sun_path structure of sockaddr_un in sys/un.h.
35 const size_t kMaxPipeNameLength = 104 - 1;
36 #endif
37
38 // Creates a Fifo with the specified name ready to listen on.
39 bool CreateServerFifo(const std::string &pipe_name, int* server_listen_fd) {
40 DCHECK(server_listen_fd);
41 DCHECK(pipe_name.length() > 0);
42 DCHECK(pipe_name.length() < kMaxPipeNameLength);
43
44 if (pipe_name.length() == 0 || pipe_name.length() > kMaxPipeNameLength) {
45 return false;
46 }
47
48 // Create socket.
49 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
50 if (fd < 0) {
51 return false;
52 }
53
54 // Make socket non-blocking
55 if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
56 close(fd);
57 return false;
58 }
59
60 // Delete any old FS instances.
61 unlink(pipe_name.c_str());
62
63 // Create unix_addr structure
64 struct sockaddr_un unix_addr;
65 memset(&unix_addr, 0, sizeof(unix_addr));
66 unix_addr.sun_family = AF_UNIX;
67 snprintf(unix_addr.sun_path, kMaxPipeNameLength + 1, "%s", pipe_name.c_str());
68 size_t unix_addr_len = offsetof(struct sockaddr_un, sun_path) +
69 strlen(unix_addr.sun_path) + 1;
70
71 // Bind the socket.
72 if (bind(fd, reinterpret_cast<const sockaddr*>(&unix_addr),
73 unix_addr_len) != 0) {
74 close(fd);
75 return false;
76 }
77
78 // Start listening on the socket.
79 const int listen_queue_length = 1;
80 if (listen(fd, listen_queue_length) != 0) {
81 close(fd);
82 return false;
83 }
84
85 *server_listen_fd = fd;
86 return true;
87 }
88
89 // Accept a connection on a fifo.
90 bool ServerAcceptFifoConnection(int server_listen_fd, int* server_socket) {
91 DCHECK(server_socket);
92
93 int accept_fd = accept(server_listen_fd, NULL, 0);
94 if (accept_fd < 0)
95 return false;
96
97 *server_socket = accept_fd;
98 return true;
99 }
100
101 bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) {
102 DCHECK(client_socket);
103 DCHECK(pipe_name.length() < kMaxPipeNameLength);
104
105 // Create socket.
106 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
107 if (fd < 0) {
108 LOG(ERROR) << "fd is invalid";
109 return false;
110 }
111
112 // Make socket non-blocking
113 if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
114 LOG(ERROR) << "fcnt failed";
115 close(fd);
116 return false;
117 }
118
119 // Create server side of socket.
120 struct sockaddr_un server_unix_addr;
121 memset(&server_unix_addr, 0, sizeof(server_unix_addr));
122 server_unix_addr.sun_family = AF_UNIX;
123 snprintf(server_unix_addr.sun_path, kMaxPipeNameLength + 1, "%s",
124 pipe_name.c_str());
125 size_t server_unix_addr_len = offsetof(struct sockaddr_un, sun_path) +
126 strlen(server_unix_addr.sun_path) + 1;
127
128 int ret_val = -1;
129 do {
130 ret_val = connect(fd, reinterpret_cast<sockaddr*>(&server_unix_addr),
131 server_unix_addr_len);
132 } while (ret_val == -1 && errno == EINTR);
133 if (ret_val != 0) {
134 close(fd);
135 return false;
136 }
137
138 *client_socket = fd;
139 return true;
140 }
141
142 } // namespace
143
144 //------------------------------------------------------------------------------
145
146 // PIMPL wrapper for libevent event.
147 // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent.
148 struct Channel::EventHolder {
149 EventHolder() : is_active(false) {}
150 ~EventHolder() {}
151
152 bool is_active;
153
154 // libevent's set functions set all the needed members of this struct, so no
155 // need to initialize before use.
156 struct event event;
157 };
158
159 //------------------------------------------------------------------------------
160
161 Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener)
162 : mode_(mode),
163 server_listen_connection_event_(new EventHolder()),
164 read_event_(new EventHolder()),
165 write_event_(new EventHolder()),
166 message_send_bytes_written_(0),
167 server_listen_pipe_(-1),
168 pipe_(-1),
169 listener_(listener),
170 waiting_connect_(true),
171 processing_incoming_(false),
172 factory_(this) {
173 if (!CreatePipe(channel_id, mode)) {
174 // The pipe may have been closed already.
175 LOG(WARNING) << "Unable to create pipe named \"" << channel_id <<
176 "\" in " << (mode == MODE_SERVER ? "server" : "client") <<
177 " mode error(" << strerror(errno) << ").";
178 }
179 }
180
181 const std::wstring Channel::PipeName(const std::wstring& channel_id) const {
182 std::wostringstream ss;
183 // TODO(playmobil): This should live in the Chrome user data directory.
184 // TODO(playmobil): Cleanup any stale fifos.
185 ss << L"/var/tmp/chrome_" << channel_id;
186 return ss.str();
187 }
188
189 bool Channel::CreatePipe(const std::wstring& channel_id, Mode mode) {
190 DCHECK(server_listen_pipe_ == -1 && pipe_ == -1);
191
192 // TODO(playmobil): Should we just change pipe_name to be a normal string
193 // everywhere?
194 pipe_name_ = WideToUTF8(PipeName(channel_id));
195
196 if (mode == MODE_SERVER) {
197 if (!CreateServerFifo(pipe_name_, &server_listen_pipe_)) {
198 return false;
199 }
200 } else {
201 if (!ClientConnectToFifo(pipe_name_, &pipe_)) {
202 return false;
203 }
204 waiting_connect_ = false;
205 }
206
207 // Create the Hello message to be sent when Connect is called
208 scoped_ptr<Message> msg(new Message(MSG_ROUTING_NONE,
209 HELLO_MESSAGE_TYPE,
210 IPC::Message::PRIORITY_NORMAL));
211 if (!msg->WriteInt(base::GetCurrentProcId())) {
212 Close();
213 return false;
214 }
215
216 output_queue_.push(msg.release());
217 return true;
218 }
219
220 bool Channel::Connect() {
221 if (mode_ == MODE_SERVER) {
222 if (server_listen_pipe_ == -1) {
223 return false;
224 }
225 event *ev = &(server_listen_connection_event_->event);
226 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
227 EV_READ | EV_PERSIST,
228 ev,
229 this);
230 server_listen_connection_event_->is_active = true;
231 } else {
232 if (pipe_ == -1) {
233 return false;
234 }
235 MessageLoopForIO::current()->WatchFileHandle(pipe_,
236 EV_READ | EV_PERSIST,
237 &(read_event_->event),
238 this);
239 read_event_->is_active = true;
240 waiting_connect_ = false;
241 }
242
243 if (!waiting_connect_)
244 return ProcessOutgoingMessages();
245 return true;
246 }
247
248 bool Channel::ProcessIncomingMessages() {
249 ssize_t bytes_read = 0;
250
251 for (;;) {
252 if (bytes_read == 0) {
253 if (pipe_ == -1)
254 return false;
255
256 // Read from pipe.
257 // recv() returns 0 if the connection has closed or EAGAIN if no data is
258 // waiting on the pipe.
259 do {
260 bytes_read = read(pipe_, input_buf_, BUF_SIZE);
261 } while (bytes_read == -1 && errno == EINTR);
262 if (bytes_read < 0) {
263 if (errno == EAGAIN) {
264 return true;
265 } else {
266 LOG(ERROR) << "pipe error: " << strerror(errno);
267 return false;
268 }
269 } else if (bytes_read == 0) {
270 // The pipe has closed...
271 Close();
272 return true;
273 }
274 }
275 DCHECK(bytes_read);
276
277 // Process messages from input buffer.
278 const char *p;
279 const char *end;
280 if (input_overflow_buf_.empty()) {
281 p = input_buf_;
282 end = p + bytes_read;
283 } else {
284 if (input_overflow_buf_.size() >
285 static_cast<size_t>(kMaximumMessageSize - bytes_read)) {
286 input_overflow_buf_.clear();
287 LOG(ERROR) << "IPC message is too big";
288 return false;
289 }
290 input_overflow_buf_.append(input_buf_, bytes_read);
291 p = input_overflow_buf_.data();
292 end = p + input_overflow_buf_.size();
293 }
294
295 while (p < end) {
296 const char* message_tail = Message::FindNext(p, end);
297 if (message_tail) {
298 int len = static_cast<int>(message_tail - p);
299 const Message m(p, len);
300 #ifdef IPC_MESSAGE_DEBUG_EXTRA
301 DLOG(INFO) << "received message on channel @" << this <<
302 " with type " << m.type();
303 #endif
304 if (m.routing_id() == MSG_ROUTING_NONE &&
305 m.type() == HELLO_MESSAGE_TYPE) {
306 // The Hello message contains only the process id.
307 listener_->OnChannelConnected(MessageIterator(m).NextInt());
308 } else {
309 listener_->OnMessageReceived(m);
310 }
311 p = message_tail;
312 } else {
313 // Last message is partial.
314 break;
315 }
316 }
317 input_overflow_buf_.assign(p, end - p);
318
319 bytes_read = 0; // Get more data.
320 }
321
322 return true;
323 }
324
325 bool Channel::ProcessOutgoingMessages() {
326 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
327 // no connection?
328
329 if (output_queue_.empty())
330 return true;
331
332 if (pipe_ == -1)
333 return false;
334
335 // If libevent was monitoring the socket for us (we blocked when trying to
336 // write a message last time), then delete the underlying libevent structure.
337 if (write_event_->is_active) {
338 // TODO(playmobil): This calls event_del(), but we can probably
339 // do with just calling event_add here.
340 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
341 write_event_->is_active = false;
342 }
343
344 // Write out all the messages we can till the write blocks or there are no
345 // more outgoing messages.
346 while (!output_queue_.empty()) {
347 Message* msg = output_queue_.front();
348
349 size_t amt_to_write = msg->size() - message_send_bytes_written_;
350 const char *out_bytes = reinterpret_cast<const char*>(msg->data()) +
351 message_send_bytes_written_;
352 ssize_t bytes_written = -1;
353 do {
354 bytes_written = write(pipe_, out_bytes, amt_to_write);
355 } while (bytes_written == -1 && errno == EINTR);
356
357 if (bytes_written < 0) {
358 LOG(ERROR) << "pipe error: " << strerror(errno);
359 return false;
360 }
361
362 if (static_cast<size_t>(bytes_written) != amt_to_write) {
363 message_send_bytes_written_ += bytes_written;
364
365 // Tell libevent to call us back once things are unblocked.
366 MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
367 EV_WRITE,
368 &(write_event_->event),
369 this);
370 write_event_->is_active = true;
371
372 } else {
373 message_send_bytes_written_ = 0;
374
375 // Message sent OK!
376 #ifdef IPC_MESSAGE_DEBUG_EXTRA
377 DLOG(INFO) << "sent message @" << msg << " on channel @" << this <<
378 " with type " << msg->type();
379 #endif
380 output_queue_.pop();
381 delete msg;
382 }
383 }
384 return true;
385 }
386
387 bool Channel::Send(Message* message) {
388 chrome::Counters::ipc_send_counter().Increment();
389 #ifdef IPC_MESSAGE_DEBUG_EXTRA
390 DLOG(INFO) << "sending message @" << message << " on channel @" << this
391 << " with type " << message->type()
392 << " (" << output_queue_.size() << " in queue)";
393 #endif
394
395 // TODO(playmobil): implement
396 // #ifdef IPC_MESSAGE_LOG_ENABLED
397 // Logging::current()->OnSendMessage(message, L"");
398 // #endif
399
400 output_queue_.push(message);
401 if (!waiting_connect_) {
402 if (!write_event_->is_active) {
403 if (!ProcessOutgoingMessages())
404 return false;
405 }
406 }
407
408 return true;
409 }
410
411 // Called by libevent when we can read from th pipe without blocking.
412 void Channel::OnFileReadReady(int fd) {
413 bool send_server_hello_msg = false;
414 if (waiting_connect_ && mode_ == MODE_SERVER) {
415 if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) {
416 Close();
417 }
418
419 // No need to watch the listening socket any longer since only one client
420 // can connect. So unregister with libevent.
421 event *ev = &(server_listen_connection_event_->event);
422 MessageLoopForIO::current()->UnwatchFileHandle(ev);
423 server_listen_connection_event_->is_active = false;
424
425 // Start watching our end of the socket.
426 MessageLoopForIO::current()->WatchFileHandle(pipe_,
427 EV_READ | EV_PERSIST,
428 &(read_event_->event),
429 this);
430 read_event_->is_active = true;
431 waiting_connect_ = false;
432 send_server_hello_msg = true;
433 }
434
435 if (!waiting_connect_ && fd == pipe_) {
436 if (!ProcessIncomingMessages()) {
437 Close();
438 listener_->OnChannelError();
439 }
440 }
441
442 // If we're a server and handshaking, then we want to make sure that we
443 // only send our handshake message after we've processed the client's.
444 // This gives us a chance to kill the client if the incoming handshake
445 // is invalid.
446 if (send_server_hello_msg) {
447 ProcessOutgoingMessages();
448 }
449 }
450
451 // Called by libevent when we can write to the pipe without blocking.
452 void Channel::OnFileWriteReady(int fd) {
453 if (!ProcessOutgoingMessages()) {
454 Close();
455 listener_->OnChannelError();
456 }
17 } 457 }
18 458
19 void Channel::Close() { 459 void Channel::Close() {
20 NOTREACHED(); 460 // Close can be called multiple time, so we need to make sure we're
21 } 461 // idempotent.
22 462
23 bool Channel::Send(Message* message) { 463 // Unregister libevent for the listening socket and close it.
24 NOTREACHED(); 464 if (server_listen_connection_event_ &&
25 return false; 465 server_listen_connection_event_->is_active) {
26 } 466 MessageLoopForIO::current()->UnwatchFileHandle(
27 467 &(server_listen_connection_event_->event));
28 bool Channel::Connect() { 468 }
29 NOTREACHED(); 469
30 return false; 470 if (server_listen_pipe_ != -1) {
31 } 471 close(server_listen_pipe_);
472 server_listen_pipe_ = -1;
473 }
474
475 // Unregister libevent for the FIFO and close it.
476 if (read_event_ && read_event_->is_active) {
477 MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event));
478 }
479 if (write_event_ && write_event_->is_active) {
480 MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
481 }
482 if (pipe_ != -1) {
483 close(pipe_);
484 pipe_ = -1;
485 }
486
487 delete server_listen_connection_event_;
488 server_listen_connection_event_ = NULL;
489 delete read_event_;
490 read_event_ = NULL;
491 delete write_event_;
492 write_event_ = NULL;
493
494 // Unlink the FIFO
495 unlink(pipe_name_.c_str());
496
497 while (!output_queue_.empty()) {
498 Message* m = output_queue_.front();
499 output_queue_.pop();
500 delete m;
501 }
502 }
503
32 } // namespace IPC 504 } // namespace IPC
OLDNEW
« no previous file with comments | « chrome/common/ipc_channel.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698