OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "ipc/ipc_channel_win.h" |
| 6 |
| 7 #include <windows.h> |
| 8 #include <sstream> |
| 9 |
| 10 #include "base/compiler_specific.h" |
| 11 #include "base/logging.h" |
| 12 #include "base/non_thread_safe.h" |
| 13 #include "base/stats_counters.h" |
| 14 #include "base/win_util.h" |
| 15 #include "ipc/ipc_logging.h" |
| 16 #include "ipc/ipc_message_utils.h" |
| 17 |
| 18 namespace IPC { |
| 19 //------------------------------------------------------------------------------ |
| 20 |
| 21 Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) { |
| 22 memset(&context.overlapped, 0, sizeof(context.overlapped)); |
| 23 context.handler = channel; |
| 24 } |
| 25 |
| 26 Channel::ChannelImpl::State::~State() { |
| 27 COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context), |
| 28 starts_with_io_context); |
| 29 } |
| 30 |
| 31 //------------------------------------------------------------------------------ |
| 32 |
| 33 Channel::ChannelImpl::ChannelImpl(const std::string& channel_id, Mode mode, |
| 34 Listener* listener) |
| 35 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), |
| 36 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), |
| 37 pipe_(INVALID_HANDLE_VALUE), |
| 38 listener_(listener), |
| 39 waiting_connect_(mode == MODE_SERVER), |
| 40 processing_incoming_(false), |
| 41 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { |
| 42 if (!CreatePipe(channel_id, mode)) { |
| 43 // The pipe may have been closed already. |
| 44 LOG(WARNING) << "Unable to create pipe named \"" << channel_id << |
| 45 "\" in " << (mode == 0 ? "server" : "client") << " mode."; |
| 46 } |
| 47 } |
| 48 |
| 49 void Channel::ChannelImpl::Close() { |
| 50 if (thread_check_.get()) { |
| 51 DCHECK(thread_check_->CalledOnValidThread()); |
| 52 } |
| 53 |
| 54 bool waited = false; |
| 55 if (input_state_.is_pending || output_state_.is_pending) { |
| 56 CancelIo(pipe_); |
| 57 waited = true; |
| 58 } |
| 59 |
| 60 // Closing the handle at this point prevents us from issuing more requests |
| 61 // form OnIOCompleted(). |
| 62 if (pipe_ != INVALID_HANDLE_VALUE) { |
| 63 CloseHandle(pipe_); |
| 64 pipe_ = INVALID_HANDLE_VALUE; |
| 65 } |
| 66 |
| 67 // Make sure all IO has completed. |
| 68 base::Time start = base::Time::Now(); |
| 69 while (input_state_.is_pending || output_state_.is_pending) { |
| 70 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); |
| 71 } |
| 72 if (waited) { |
| 73 // We want to see if we block the message loop for too long. |
| 74 UMA_HISTOGRAM_TIMES("AsyncIO.IPCChannelClose", base::Time::Now() - start); |
| 75 } |
| 76 |
| 77 while (!output_queue_.empty()) { |
| 78 Message* m = output_queue_.front(); |
| 79 output_queue_.pop(); |
| 80 delete m; |
| 81 } |
| 82 } |
| 83 |
| 84 bool Channel::ChannelImpl::Send(Message* message) { |
| 85 DCHECK(thread_check_->CalledOnValidThread()); |
| 86 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 87 DLOG(INFO) << "sending message @" << message << " on channel @" << this |
| 88 << " with type " << message->type() |
| 89 << " (" << output_queue_.size() << " in queue)"; |
| 90 #endif |
| 91 |
| 92 #ifdef IPC_MESSAGE_LOG_ENABLED |
| 93 Logging::current()->OnSendMessage(message, ""); |
| 94 #endif |
| 95 |
| 96 output_queue_.push(message); |
| 97 // ensure waiting to write |
| 98 if (!waiting_connect_) { |
| 99 if (!output_state_.is_pending) { |
| 100 if (!ProcessOutgoingMessages(NULL, 0)) |
| 101 return false; |
| 102 } |
| 103 } |
| 104 |
| 105 return true; |
| 106 } |
| 107 |
| 108 const std::wstring Channel::ChannelImpl::PipeName( |
| 109 const std::string& channel_id) const { |
| 110 std::wostringstream ss; |
| 111 // XXX(darin): get application name from somewhere else |
| 112 ss << L"\\\\.\\pipe\\chrome." << ASCIIToWide(channel_id); |
| 113 return ss.str(); |
| 114 } |
| 115 |
| 116 bool Channel::ChannelImpl::CreatePipe(const std::string& channel_id, |
| 117 Mode mode) { |
| 118 DCHECK(pipe_ == INVALID_HANDLE_VALUE); |
| 119 const std::wstring pipe_name = PipeName(channel_id); |
| 120 if (mode == MODE_SERVER) { |
| 121 SECURITY_ATTRIBUTES security_attributes = {0}; |
| 122 security_attributes.bInheritHandle = FALSE; |
| 123 security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES); |
| 124 if (!win_util::GetLogonSessionOnlyDACL( |
| 125 reinterpret_cast<SECURITY_DESCRIPTOR**>( |
| 126 &security_attributes.lpSecurityDescriptor))) { |
| 127 NOTREACHED(); |
| 128 } |
| 129 |
| 130 pipe_ = CreateNamedPipeW(pipe_name.c_str(), |
| 131 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | |
| 132 FILE_FLAG_FIRST_PIPE_INSTANCE, |
| 133 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, |
| 134 1, // number of pipe instances |
| 135 // output buffer size (XXX tune) |
| 136 Channel::kReadBufferSize, |
| 137 // input buffer size (XXX tune) |
| 138 Channel::kReadBufferSize, |
| 139 5000, // timeout in milliseconds (XXX tune) |
| 140 &security_attributes); |
| 141 LocalFree(security_attributes.lpSecurityDescriptor); |
| 142 } else { |
| 143 pipe_ = CreateFileW(pipe_name.c_str(), |
| 144 GENERIC_READ | GENERIC_WRITE, |
| 145 0, |
| 146 NULL, |
| 147 OPEN_EXISTING, |
| 148 SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | |
| 149 FILE_FLAG_OVERLAPPED, |
| 150 NULL); |
| 151 } |
| 152 if (pipe_ == INVALID_HANDLE_VALUE) { |
| 153 // If this process is being closed, the pipe may be gone already. |
| 154 LOG(WARNING) << "failed to create pipe: " << GetLastError(); |
| 155 return false; |
| 156 } |
| 157 |
| 158 // Create the Hello message to be sent when Connect is called |
| 159 scoped_ptr<Message> m(new Message(MSG_ROUTING_NONE, |
| 160 HELLO_MESSAGE_TYPE, |
| 161 IPC::Message::PRIORITY_NORMAL)); |
| 162 if (!m->WriteInt(GetCurrentProcessId())) { |
| 163 CloseHandle(pipe_); |
| 164 pipe_ = INVALID_HANDLE_VALUE; |
| 165 return false; |
| 166 } |
| 167 |
| 168 output_queue_.push(m.release()); |
| 169 return true; |
| 170 } |
| 171 |
| 172 bool Channel::ChannelImpl::Connect() { |
| 173 DLOG(WARNING) << "Connect called twice"; |
| 174 |
| 175 if (!thread_check_.get()) |
| 176 thread_check_.reset(new NonThreadSafe()); |
| 177 |
| 178 if (pipe_ == INVALID_HANDLE_VALUE) |
| 179 return false; |
| 180 |
| 181 MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); |
| 182 |
| 183 // Check to see if there is a client connected to our pipe... |
| 184 if (waiting_connect_) |
| 185 ProcessConnection(); |
| 186 |
| 187 if (!input_state_.is_pending) { |
| 188 // Complete setup asynchronously. By not setting input_state_.is_pending |
| 189 // to true, we indicate to OnIOCompleted that this is the special |
| 190 // initialization signal. |
| 191 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( |
| 192 &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0)); |
| 193 } |
| 194 |
| 195 if (!waiting_connect_) |
| 196 ProcessOutgoingMessages(NULL, 0); |
| 197 return true; |
| 198 } |
| 199 |
| 200 bool Channel::ChannelImpl::ProcessConnection() { |
| 201 DCHECK(thread_check_->CalledOnValidThread()); |
| 202 if (input_state_.is_pending) |
| 203 input_state_.is_pending = false; |
| 204 |
| 205 // Do we have a client connected to our pipe? |
| 206 if (INVALID_HANDLE_VALUE == pipe_) |
| 207 return false; |
| 208 |
| 209 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); |
| 210 |
| 211 DWORD err = GetLastError(); |
| 212 if (ok) { |
| 213 // Uhm, the API documentation says that this function should never |
| 214 // return success when used in overlapped mode. |
| 215 NOTREACHED(); |
| 216 return false; |
| 217 } |
| 218 |
| 219 switch (err) { |
| 220 case ERROR_IO_PENDING: |
| 221 input_state_.is_pending = true; |
| 222 break; |
| 223 case ERROR_PIPE_CONNECTED: |
| 224 waiting_connect_ = false; |
| 225 break; |
| 226 case ERROR_NO_DATA: |
| 227 // The pipe is being closed. |
| 228 return false; |
| 229 default: |
| 230 NOTREACHED(); |
| 231 return false; |
| 232 } |
| 233 |
| 234 return true; |
| 235 } |
| 236 |
| 237 bool Channel::ChannelImpl::ProcessIncomingMessages( |
| 238 MessageLoopForIO::IOContext* context, |
| 239 DWORD bytes_read) { |
| 240 DCHECK(thread_check_->CalledOnValidThread()); |
| 241 if (input_state_.is_pending) { |
| 242 input_state_.is_pending = false; |
| 243 DCHECK(context); |
| 244 |
| 245 if (!context || !bytes_read) |
| 246 return false; |
| 247 } else { |
| 248 // This happens at channel initialization. |
| 249 DCHECK(!bytes_read && context == &input_state_.context); |
| 250 } |
| 251 |
| 252 for (;;) { |
| 253 if (bytes_read == 0) { |
| 254 if (INVALID_HANDLE_VALUE == pipe_) |
| 255 return false; |
| 256 |
| 257 // Read from pipe... |
| 258 BOOL ok = ReadFile(pipe_, |
| 259 input_buf_, |
| 260 Channel::kReadBufferSize, |
| 261 &bytes_read, |
| 262 &input_state_.context.overlapped); |
| 263 if (!ok) { |
| 264 DWORD err = GetLastError(); |
| 265 if (err == ERROR_IO_PENDING) { |
| 266 input_state_.is_pending = true; |
| 267 return true; |
| 268 } |
| 269 LOG(ERROR) << "pipe error: " << err; |
| 270 return false; |
| 271 } |
| 272 input_state_.is_pending = true; |
| 273 return true; |
| 274 } |
| 275 DCHECK(bytes_read); |
| 276 |
| 277 // Process messages from input buffer. |
| 278 |
| 279 const char* p, *end; |
| 280 if (input_overflow_buf_.empty()) { |
| 281 p = input_buf_; |
| 282 end = p + bytes_read; |
| 283 } else { |
| 284 if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { |
| 285 input_overflow_buf_.clear(); |
| 286 LOG(ERROR) << "IPC message is too big"; |
| 287 return false; |
| 288 } |
| 289 input_overflow_buf_.append(input_buf_, bytes_read); |
| 290 p = input_overflow_buf_.data(); |
| 291 end = p + input_overflow_buf_.size(); |
| 292 } |
| 293 |
| 294 while (p < end) { |
| 295 const char* message_tail = Message::FindNext(p, end); |
| 296 if (message_tail) { |
| 297 int len = static_cast<int>(message_tail - p); |
| 298 const Message m(p, len); |
| 299 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 300 DLOG(INFO) << "received message on channel @" << this << |
| 301 " with type " << m.type(); |
| 302 #endif |
| 303 if (m.routing_id() == MSG_ROUTING_NONE && |
| 304 m.type() == HELLO_MESSAGE_TYPE) { |
| 305 // The Hello message contains only the process id. |
| 306 listener_->OnChannelConnected(MessageIterator(m).NextInt()); |
| 307 } else { |
| 308 listener_->OnMessageReceived(m); |
| 309 } |
| 310 p = message_tail; |
| 311 } else { |
| 312 // Last message is partial. |
| 313 break; |
| 314 } |
| 315 } |
| 316 input_overflow_buf_.assign(p, end - p); |
| 317 |
| 318 bytes_read = 0; // Get more data. |
| 319 } |
| 320 |
| 321 return true; |
| 322 } |
| 323 |
| 324 bool Channel::ChannelImpl::ProcessOutgoingMessages( |
| 325 MessageLoopForIO::IOContext* context, |
| 326 DWORD bytes_written) { |
| 327 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's |
| 328 // no connection? |
| 329 DCHECK(thread_check_->CalledOnValidThread()); |
| 330 |
| 331 if (output_state_.is_pending) { |
| 332 DCHECK(context); |
| 333 output_state_.is_pending = false; |
| 334 if (!context || bytes_written == 0) { |
| 335 DWORD err = GetLastError(); |
| 336 LOG(ERROR) << "pipe error: " << err; |
| 337 return false; |
| 338 } |
| 339 // Message was sent. |
| 340 DCHECK(!output_queue_.empty()); |
| 341 Message* m = output_queue_.front(); |
| 342 output_queue_.pop(); |
| 343 delete m; |
| 344 } |
| 345 |
| 346 if (output_queue_.empty()) |
| 347 return true; |
| 348 |
| 349 if (INVALID_HANDLE_VALUE == pipe_) |
| 350 return false; |
| 351 |
| 352 // Write to pipe... |
| 353 Message* m = output_queue_.front(); |
| 354 BOOL ok = WriteFile(pipe_, |
| 355 m->data(), |
| 356 m->size(), |
| 357 &bytes_written, |
| 358 &output_state_.context.overlapped); |
| 359 if (!ok) { |
| 360 DWORD err = GetLastError(); |
| 361 if (err == ERROR_IO_PENDING) { |
| 362 output_state_.is_pending = true; |
| 363 |
| 364 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 365 DLOG(INFO) << "sent pending message @" << m << " on channel @" << |
| 366 this << " with type " << m->type(); |
| 367 #endif |
| 368 |
| 369 return true; |
| 370 } |
| 371 LOG(ERROR) << "pipe error: " << err; |
| 372 return false; |
| 373 } |
| 374 |
| 375 #ifdef IPC_MESSAGE_DEBUG_EXTRA |
| 376 DLOG(INFO) << "sent message @" << m << " on channel @" << this << |
| 377 " with type " << m->type(); |
| 378 #endif |
| 379 |
| 380 output_state_.is_pending = true; |
| 381 return true; |
| 382 } |
| 383 |
| 384 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, |
| 385 DWORD bytes_transfered, DWORD error) { |
| 386 bool ok; |
| 387 DCHECK(thread_check_->CalledOnValidThread()); |
| 388 if (context == &input_state_.context) { |
| 389 if (waiting_connect_) { |
| 390 if (!ProcessConnection()) |
| 391 return; |
| 392 // We may have some messages queued up to send... |
| 393 if (!output_queue_.empty() && !output_state_.is_pending) |
| 394 ProcessOutgoingMessages(NULL, 0); |
| 395 if (input_state_.is_pending) |
| 396 return; |
| 397 // else, fall-through and look for incoming messages... |
| 398 } |
| 399 // we don't support recursion through OnMessageReceived yet! |
| 400 DCHECK(!processing_incoming_); |
| 401 processing_incoming_ = true; |
| 402 ok = ProcessIncomingMessages(context, bytes_transfered); |
| 403 processing_incoming_ = false; |
| 404 } else { |
| 405 DCHECK(context == &output_state_.context); |
| 406 ok = ProcessOutgoingMessages(context, bytes_transfered); |
| 407 } |
| 408 if (!ok && INVALID_HANDLE_VALUE != pipe_) { |
| 409 // We don't want to re-enter Close(). |
| 410 Close(); |
| 411 listener_->OnChannelError(); |
| 412 } |
| 413 } |
| 414 |
| 415 //------------------------------------------------------------------------------ |
| 416 // Channel's methods simply call through to ChannelImpl. |
| 417 Channel::Channel(const std::string& channel_id, Mode mode, |
| 418 Listener* listener) |
| 419 : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { |
| 420 } |
| 421 |
| 422 Channel::~Channel() { |
| 423 delete channel_impl_; |
| 424 } |
| 425 |
| 426 bool Channel::Connect() { |
| 427 return channel_impl_->Connect(); |
| 428 } |
| 429 |
| 430 void Channel::Close() { |
| 431 channel_impl_->Close(); |
| 432 } |
| 433 |
| 434 void Channel::set_listener(Listener* listener) { |
| 435 channel_impl_->set_listener(listener); |
| 436 } |
| 437 |
| 438 bool Channel::Send(Message* message) { |
| 439 return channel_impl_->Send(message); |
| 440 } |
| 441 |
| 442 } // namespace IPC |
OLD | NEW |