| Index: base/message_pump_libevent.cc
|
| ===================================================================
|
| --- base/message_pump_libevent.cc (revision 0)
|
| +++ base/message_pump_libevent.cc (revision 0)
|
| @@ -0,0 +1,179 @@
|
| +// Copyright (c) 2008 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/message_pump_libevent.h"
|
| +
|
| +#include "base/logging.h"
|
| +#include "base/time.h"
|
| +#include "third_party/libevent/event.h"
|
| +
|
| +#include <fcntl.h>
|
| +
|
| +namespace base {
|
| +
|
| +// Return 0 on success
|
| +// Too small a function to bother putting in a library?
|
| +static int SetNonBlocking(int fd)
|
| +{
|
| + int flags = fcntl(fd, F_GETFL, 0);
|
| + if (-1 == flags)
|
| + flags = 0;
|
| + return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
| +}
|
| +
|
| +// Called if a byte is received on the wakeup pipe.
|
| +void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
|
| +
|
| + base::MessagePumpLibevent* that =
|
| + static_cast<base::MessagePumpLibevent*>(context);
|
| + DCHECK(that->wakeup_pipe_out_ == socket);
|
| +
|
| + // Remove and discard the wakeup byte.
|
| + char buf;
|
| + int nread = read(socket, &buf, 1);
|
| + DCHECK(nread == 1);
|
| + // Tell libevent to break out of inner loop.
|
| + event_base_loopbreak(that->event_base_);
|
| +}
|
| +
|
| +MessagePumpLibevent::MessagePumpLibevent()
|
| + : keep_running_(true),
|
| + in_run_(false),
|
| + wakeup_pipe_in_(-1),
|
| + wakeup_pipe_out_(-1),
|
| + event_base_(event_base_new()) {
|
| + if (!Init())
|
| + NOTREACHED();
|
| +}
|
| +
|
| +bool MessagePumpLibevent::Init() {
|
| + int fds[2];
|
| + if (pipe(fds))
|
| + return false;
|
| + if (SetNonBlocking(fds[0]))
|
| + return false;
|
| + if (SetNonBlocking(fds[1]))
|
| + return false;
|
| + wakeup_pipe_out_ = fds[0];
|
| + wakeup_pipe_in_ = fds[1];
|
| +
|
| + wakeup_event_ = new event;
|
| + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
|
| + OnWakeup, this);
|
| + event_base_set(event_base_, wakeup_event_);
|
| +
|
| + if (event_add(wakeup_event_, 0))
|
| + return false;
|
| + return true;
|
| +}
|
| +
|
| +MessagePumpLibevent::~MessagePumpLibevent() {
|
| + DCHECK(wakeup_event_);
|
| + DCHECK(event_base_);
|
| + event_del(wakeup_event_);
|
| + delete wakeup_event_;
|
| + event_base_free(event_base_);
|
| +}
|
| +
|
| +void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
|
| + event* e, Watcher* watcher) {
|
| +
|
| + // Set current interest mask and message pump for this event
|
| + event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
|
| +
|
| + // Tell libevent which message pump this socket will belong to when we add it.
|
| + event_base_set(event_base_, e);
|
| +
|
| + // Add this socket to the list of monitored sockets.
|
| + if (event_add(e, NULL))
|
| + NOTREACHED();
|
| +}
|
| +
|
| +void MessagePumpLibevent::UnwatchSocket(event* e) {
|
| + // Remove this socket from the list of monitored sockets.
|
| + if (event_del(e))
|
| + NOTREACHED();
|
| +}
|
| +
|
| +void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
|
| + void* context) {
|
| + // The given socket is ready for I/O.
|
| + // Tell the owner what kind of I/O the socket is ready for.
|
| + Watcher* watcher = static_cast<Watcher*>(context);
|
| + watcher->OnSocketReady(flags);
|
| +}
|
| +
|
| +// Reentrant!
|
| +void MessagePumpLibevent::Run(Delegate* delegate) {
|
| + DCHECK(keep_running_) << "Quit must have been called outside of Run!";
|
| +
|
| + bool old_in_run = in_run_;
|
| + in_run_ = true;
|
| +
|
| + for (;;) {
|
| + bool did_work = delegate->DoWork();
|
| + if (!keep_running_)
|
| + break;
|
| +
|
| + did_work |= delegate->DoDelayedWork(&delayed_work_time_);
|
| + if (!keep_running_)
|
| + break;
|
| +
|
| + if (did_work)
|
| + continue;
|
| +
|
| + did_work = delegate->DoIdleWork();
|
| + if (!keep_running_)
|
| + break;
|
| +
|
| + if (did_work)
|
| + continue;
|
| +
|
| + // EVLOOP_ONCE tells libevent to only block once,
|
| + // but to service all pending events when it wakes up.
|
| + if (delayed_work_time_.is_null()) {
|
| + event_base_loop(event_base_, EVLOOP_ONCE);
|
| + } else {
|
| + TimeDelta delay = delayed_work_time_ - Time::Now();
|
| + if (delay > TimeDelta()) {
|
| + struct timeval poll_tv;
|
| + poll_tv.tv_sec = delay.InSeconds();
|
| + poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
|
| + event_base_loopexit(event_base_, &poll_tv);
|
| + event_base_loop(event_base_, EVLOOP_ONCE);
|
| + } else {
|
| + // It looks like delayed_work_time_ indicates a time in the past, so we
|
| + // need to call DoDelayedWork now.
|
| + delayed_work_time_ = Time();
|
| + }
|
| + }
|
| + }
|
| +
|
| + keep_running_ = true;
|
| + in_run_ = old_in_run;
|
| +}
|
| +
|
| +void MessagePumpLibevent::Quit() {
|
| + DCHECK(in_run_);
|
| + // Tell both libevent and Run that they should break out of their loops.
|
| + keep_running_ = false;
|
| + ScheduleWork();
|
| +}
|
| +
|
| +void MessagePumpLibevent::ScheduleWork() {
|
| + // Tell libevent (in a threadsafe way) that it should break out of its loop.
|
| + char buf = 0;
|
| + int nwrite = write(wakeup_pipe_in_, &buf, 1);
|
| + DCHECK(nwrite == 1);
|
| +}
|
| +
|
| +void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
|
| + // We know that we can't be blocked on Wait right now since this method can
|
| + // only be called on the same thread as Run, so we only need to update our
|
| + // record of how long to sleep when we do sleep.
|
| + delayed_work_time_ = delayed_work_time;
|
| +}
|
| +
|
| +} // namespace base
|
| +
|
|
|