Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(580)

Side by Side Diff: ipc/ipc_channel_win.cc

Issue 155905: Separates ipc code from common (http://crbug.com/16829) (Closed)
Patch Set: Fixes reference to 'common_message_traits' it's actually 'common_param_traits' Created 11 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | ipc/ipc_descriptors.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_channel_win.h"
6
7 #include <windows.h>
8 #include <sstream>
9
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/non_thread_safe.h"
13 #include "base/stats_counters.h"
14 #include "base/win_util.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_utils.h"
17
18 namespace IPC {
19 //------------------------------------------------------------------------------
20
21 Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) {
22 memset(&context.overlapped, 0, sizeof(context.overlapped));
23 context.handler = channel;
24 }
25
26 Channel::ChannelImpl::State::~State() {
27 COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context),
28 starts_with_io_context);
29 }
30
31 //------------------------------------------------------------------------------
32
33 Channel::ChannelImpl::ChannelImpl(const std::string& channel_id, Mode mode,
34 Listener* listener)
35 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
36 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
37 pipe_(INVALID_HANDLE_VALUE),
38 listener_(listener),
39 waiting_connect_(mode == MODE_SERVER),
40 processing_incoming_(false),
41 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
42 if (!CreatePipe(channel_id, mode)) {
43 // The pipe may have been closed already.
44 LOG(WARNING) << "Unable to create pipe named \"" << channel_id <<
45 "\" in " << (mode == 0 ? "server" : "client") << " mode.";
46 }
47 }
48
49 void Channel::ChannelImpl::Close() {
50 if (thread_check_.get()) {
51 DCHECK(thread_check_->CalledOnValidThread());
52 }
53
54 bool waited = false;
55 if (input_state_.is_pending || output_state_.is_pending) {
56 CancelIo(pipe_);
57 waited = true;
58 }
59
60 // Closing the handle at this point prevents us from issuing more requests
61 // form OnIOCompleted().
62 if (pipe_ != INVALID_HANDLE_VALUE) {
63 CloseHandle(pipe_);
64 pipe_ = INVALID_HANDLE_VALUE;
65 }
66
67 // Make sure all IO has completed.
68 base::Time start = base::Time::Now();
69 while (input_state_.is_pending || output_state_.is_pending) {
70 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
71 }
72 if (waited) {
73 // We want to see if we block the message loop for too long.
74 UMA_HISTOGRAM_TIMES("AsyncIO.IPCChannelClose", base::Time::Now() - start);
75 }
76
77 while (!output_queue_.empty()) {
78 Message* m = output_queue_.front();
79 output_queue_.pop();
80 delete m;
81 }
82 }
83
84 bool Channel::ChannelImpl::Send(Message* message) {
85 DCHECK(thread_check_->CalledOnValidThread());
86 #ifdef IPC_MESSAGE_DEBUG_EXTRA
87 DLOG(INFO) << "sending message @" << message << " on channel @" << this
88 << " with type " << message->type()
89 << " (" << output_queue_.size() << " in queue)";
90 #endif
91
92 #ifdef IPC_MESSAGE_LOG_ENABLED
93 Logging::current()->OnSendMessage(message, "");
94 #endif
95
96 output_queue_.push(message);
97 // ensure waiting to write
98 if (!waiting_connect_) {
99 if (!output_state_.is_pending) {
100 if (!ProcessOutgoingMessages(NULL, 0))
101 return false;
102 }
103 }
104
105 return true;
106 }
107
108 const std::wstring Channel::ChannelImpl::PipeName(
109 const std::string& channel_id) const {
110 std::wostringstream ss;
111 // XXX(darin): get application name from somewhere else
112 ss << L"\\\\.\\pipe\\chrome." << ASCIIToWide(channel_id);
113 return ss.str();
114 }
115
116 bool Channel::ChannelImpl::CreatePipe(const std::string& channel_id,
117 Mode mode) {
118 DCHECK(pipe_ == INVALID_HANDLE_VALUE);
119 const std::wstring pipe_name = PipeName(channel_id);
120 if (mode == MODE_SERVER) {
121 SECURITY_ATTRIBUTES security_attributes = {0};
122 security_attributes.bInheritHandle = FALSE;
123 security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES);
124 if (!win_util::GetLogonSessionOnlyDACL(
125 reinterpret_cast<SECURITY_DESCRIPTOR**>(
126 &security_attributes.lpSecurityDescriptor))) {
127 NOTREACHED();
128 }
129
130 pipe_ = CreateNamedPipeW(pipe_name.c_str(),
131 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
132 FILE_FLAG_FIRST_PIPE_INSTANCE,
133 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
134 1, // number of pipe instances
135 // output buffer size (XXX tune)
136 Channel::kReadBufferSize,
137 // input buffer size (XXX tune)
138 Channel::kReadBufferSize,
139 5000, // timeout in milliseconds (XXX tune)
140 &security_attributes);
141 LocalFree(security_attributes.lpSecurityDescriptor);
142 } else {
143 pipe_ = CreateFileW(pipe_name.c_str(),
144 GENERIC_READ | GENERIC_WRITE,
145 0,
146 NULL,
147 OPEN_EXISTING,
148 SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION |
149 FILE_FLAG_OVERLAPPED,
150 NULL);
151 }
152 if (pipe_ == INVALID_HANDLE_VALUE) {
153 // If this process is being closed, the pipe may be gone already.
154 LOG(WARNING) << "failed to create pipe: " << GetLastError();
155 return false;
156 }
157
158 // Create the Hello message to be sent when Connect is called
159 scoped_ptr<Message> m(new Message(MSG_ROUTING_NONE,
160 HELLO_MESSAGE_TYPE,
161 IPC::Message::PRIORITY_NORMAL));
162 if (!m->WriteInt(GetCurrentProcessId())) {
163 CloseHandle(pipe_);
164 pipe_ = INVALID_HANDLE_VALUE;
165 return false;
166 }
167
168 output_queue_.push(m.release());
169 return true;
170 }
171
172 bool Channel::ChannelImpl::Connect() {
173 DLOG(WARNING) << "Connect called twice";
174
175 if (!thread_check_.get())
176 thread_check_.reset(new NonThreadSafe());
177
178 if (pipe_ == INVALID_HANDLE_VALUE)
179 return false;
180
181 MessageLoopForIO::current()->RegisterIOHandler(pipe_, this);
182
183 // Check to see if there is a client connected to our pipe...
184 if (waiting_connect_)
185 ProcessConnection();
186
187 if (!input_state_.is_pending) {
188 // Complete setup asynchronously. By not setting input_state_.is_pending
189 // to true, we indicate to OnIOCompleted that this is the special
190 // initialization signal.
191 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod(
192 &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0));
193 }
194
195 if (!waiting_connect_)
196 ProcessOutgoingMessages(NULL, 0);
197 return true;
198 }
199
200 bool Channel::ChannelImpl::ProcessConnection() {
201 DCHECK(thread_check_->CalledOnValidThread());
202 if (input_state_.is_pending)
203 input_state_.is_pending = false;
204
205 // Do we have a client connected to our pipe?
206 if (INVALID_HANDLE_VALUE == pipe_)
207 return false;
208
209 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
210
211 DWORD err = GetLastError();
212 if (ok) {
213 // Uhm, the API documentation says that this function should never
214 // return success when used in overlapped mode.
215 NOTREACHED();
216 return false;
217 }
218
219 switch (err) {
220 case ERROR_IO_PENDING:
221 input_state_.is_pending = true;
222 break;
223 case ERROR_PIPE_CONNECTED:
224 waiting_connect_ = false;
225 break;
226 case ERROR_NO_DATA:
227 // The pipe is being closed.
228 return false;
229 default:
230 NOTREACHED();
231 return false;
232 }
233
234 return true;
235 }
236
237 bool Channel::ChannelImpl::ProcessIncomingMessages(
238 MessageLoopForIO::IOContext* context,
239 DWORD bytes_read) {
240 DCHECK(thread_check_->CalledOnValidThread());
241 if (input_state_.is_pending) {
242 input_state_.is_pending = false;
243 DCHECK(context);
244
245 if (!context || !bytes_read)
246 return false;
247 } else {
248 // This happens at channel initialization.
249 DCHECK(!bytes_read && context == &input_state_.context);
250 }
251
252 for (;;) {
253 if (bytes_read == 0) {
254 if (INVALID_HANDLE_VALUE == pipe_)
255 return false;
256
257 // Read from pipe...
258 BOOL ok = ReadFile(pipe_,
259 input_buf_,
260 Channel::kReadBufferSize,
261 &bytes_read,
262 &input_state_.context.overlapped);
263 if (!ok) {
264 DWORD err = GetLastError();
265 if (err == ERROR_IO_PENDING) {
266 input_state_.is_pending = true;
267 return true;
268 }
269 LOG(ERROR) << "pipe error: " << err;
270 return false;
271 }
272 input_state_.is_pending = true;
273 return true;
274 }
275 DCHECK(bytes_read);
276
277 // Process messages from input buffer.
278
279 const char* p, *end;
280 if (input_overflow_buf_.empty()) {
281 p = input_buf_;
282 end = p + bytes_read;
283 } else {
284 if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
285 input_overflow_buf_.clear();
286 LOG(ERROR) << "IPC message is too big";
287 return false;
288 }
289 input_overflow_buf_.append(input_buf_, bytes_read);
290 p = input_overflow_buf_.data();
291 end = p + input_overflow_buf_.size();
292 }
293
294 while (p < end) {
295 const char* message_tail = Message::FindNext(p, end);
296 if (message_tail) {
297 int len = static_cast<int>(message_tail - p);
298 const Message m(p, len);
299 #ifdef IPC_MESSAGE_DEBUG_EXTRA
300 DLOG(INFO) << "received message on channel @" << this <<
301 " with type " << m.type();
302 #endif
303 if (m.routing_id() == MSG_ROUTING_NONE &&
304 m.type() == HELLO_MESSAGE_TYPE) {
305 // The Hello message contains only the process id.
306 listener_->OnChannelConnected(MessageIterator(m).NextInt());
307 } else {
308 listener_->OnMessageReceived(m);
309 }
310 p = message_tail;
311 } else {
312 // Last message is partial.
313 break;
314 }
315 }
316 input_overflow_buf_.assign(p, end - p);
317
318 bytes_read = 0; // Get more data.
319 }
320
321 return true;
322 }
323
324 bool Channel::ChannelImpl::ProcessOutgoingMessages(
325 MessageLoopForIO::IOContext* context,
326 DWORD bytes_written) {
327 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
328 // no connection?
329 DCHECK(thread_check_->CalledOnValidThread());
330
331 if (output_state_.is_pending) {
332 DCHECK(context);
333 output_state_.is_pending = false;
334 if (!context || bytes_written == 0) {
335 DWORD err = GetLastError();
336 LOG(ERROR) << "pipe error: " << err;
337 return false;
338 }
339 // Message was sent.
340 DCHECK(!output_queue_.empty());
341 Message* m = output_queue_.front();
342 output_queue_.pop();
343 delete m;
344 }
345
346 if (output_queue_.empty())
347 return true;
348
349 if (INVALID_HANDLE_VALUE == pipe_)
350 return false;
351
352 // Write to pipe...
353 Message* m = output_queue_.front();
354 BOOL ok = WriteFile(pipe_,
355 m->data(),
356 m->size(),
357 &bytes_written,
358 &output_state_.context.overlapped);
359 if (!ok) {
360 DWORD err = GetLastError();
361 if (err == ERROR_IO_PENDING) {
362 output_state_.is_pending = true;
363
364 #ifdef IPC_MESSAGE_DEBUG_EXTRA
365 DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
366 this << " with type " << m->type();
367 #endif
368
369 return true;
370 }
371 LOG(ERROR) << "pipe error: " << err;
372 return false;
373 }
374
375 #ifdef IPC_MESSAGE_DEBUG_EXTRA
376 DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
377 " with type " << m->type();
378 #endif
379
380 output_state_.is_pending = true;
381 return true;
382 }
383
384 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
385 DWORD bytes_transfered, DWORD error) {
386 bool ok;
387 DCHECK(thread_check_->CalledOnValidThread());
388 if (context == &input_state_.context) {
389 if (waiting_connect_) {
390 if (!ProcessConnection())
391 return;
392 // We may have some messages queued up to send...
393 if (!output_queue_.empty() && !output_state_.is_pending)
394 ProcessOutgoingMessages(NULL, 0);
395 if (input_state_.is_pending)
396 return;
397 // else, fall-through and look for incoming messages...
398 }
399 // we don't support recursion through OnMessageReceived yet!
400 DCHECK(!processing_incoming_);
401 processing_incoming_ = true;
402 ok = ProcessIncomingMessages(context, bytes_transfered);
403 processing_incoming_ = false;
404 } else {
405 DCHECK(context == &output_state_.context);
406 ok = ProcessOutgoingMessages(context, bytes_transfered);
407 }
408 if (!ok && INVALID_HANDLE_VALUE != pipe_) {
409 // We don't want to re-enter Close().
410 Close();
411 listener_->OnChannelError();
412 }
413 }
414
415 //------------------------------------------------------------------------------
416 // Channel's methods simply call through to ChannelImpl.
417 Channel::Channel(const std::string& channel_id, Mode mode,
418 Listener* listener)
419 : channel_impl_(new ChannelImpl(channel_id, mode, listener)) {
420 }
421
422 Channel::~Channel() {
423 delete channel_impl_;
424 }
425
426 bool Channel::Connect() {
427 return channel_impl_->Connect();
428 }
429
430 void Channel::Close() {
431 channel_impl_->Close();
432 }
433
434 void Channel::set_listener(Listener* listener) {
435 channel_impl_->set_listener(listener);
436 }
437
438 bool Channel::Send(Message* message) {
439 return channel_impl_->Send(message);
440 }
441
442 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_channel_win.h ('k') | ipc/ipc_descriptors.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698