Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3092)

Unified Diff: athena/system/device_socket_listener.cc

Issue 431183003: Rotate screen in response to accelerator or device orientation sensors. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Pass in task runner to avoid new dependency on content/public/browser/browser_thread.h. Created 6 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: athena/system/device_socket_listener.cc
diff --git a/athena/system/device_socket_listener.cc b/athena/system/device_socket_listener.cc
new file mode 100644
index 0000000000000000000000000000000000000000..e041e44c5413d38bfdedcc53eb3781a8c7668e27
--- /dev/null
+++ b/athena/system/device_socket_listener.cc
@@ -0,0 +1,287 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "athena/system/device_socket_listener.h"
+
+#include <errno.h>
+#include <map>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/files/file_path.h"
+#include "base/memory/singleton.h"
+#include "base/message_loop/message_loop.h"
+#include "base/observer_list.h"
+#include "base/stl_util.h"
+#include "ipc/unix_domain_socket_util.h"
+
+namespace athena {
+
+namespace {
+
+typedef ObserverList<DeviceSocketListener> DeviceSocketListeners;
+
+// Reads from a device socket blocks of a particular size. When that amount of
+// data is read DeviceSocketManager::OnDataAvailable is called on the singleton
+// instance which then informs all of the listeners on that socket.
+class DeviceSocketReader : public base::MessagePumpLibevent::Watcher {
+ public:
+ DeviceSocketReader(const std::string& socket_path,
+ size_t data_size)
+ : socket_path_(socket_path),
+ data_size_(data_size),
+ data_(new char[data_size]) {
+ }
+ virtual ~DeviceSocketReader() {}
+
+ private:
+ // Overidden from base::MessagePumpLibevent::Watcher.
+ virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
+ virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
+
+ std::string socket_path_;
+ size_t data_size_;
+ scoped_ptr<char[]> data_;
+
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader);
+};
+
+class DeviceSocketManager;
+DeviceSocketManager* device_socket_manager_instance_ = NULL;
+
+// A singleton instance for managing all connections to sockets.
+class DeviceSocketManager {
+ public:
+ static void Create(scoped_refptr<base::TaskRunner> io_task_runner) {
+ device_socket_manager_instance_ =
+ new DeviceSocketManager(io_task_runner);
+ }
+
+ static void Shutdown() {
+ CHECK(device_socket_manager_instance_);
+ delete device_socket_manager_instance_;
+ device_socket_manager_instance_ = NULL;
+ }
+
+ static DeviceSocketManager* GetInstance() {
+ CHECK(device_socket_manager_instance_);
+ return device_socket_manager_instance_;
+ }
+
+ // If there isn't an existing connection to |socket_path|, then opens a
+ // connection to |socket_path| and starts listening for data. All listeners
+ // for |socket_path| receives data when data is available on the socket.
+ void StartListening(const std::string& socket_path,
+ size_t data_size,
+ DeviceSocketListener* listener);
+
+ // Removes |listener| from the list of listeners that receive data from
+ // |socket_path|. If this is the last listener, then this closes the
+ // connection to the socket.
+ void StopListening(const std::string& socket_path,
+ DeviceSocketListener* listener);
+
+ // Sends data to all the listeners registered to receive data from
+ // |socket_path|.
+ void OnDataAvailable(const std::string& socket_path,
+ const void* data);
+
+ // Notifies listeners of errors reading from the socket and closes it.
+ void OnError(const std::string& socket_path, int err);
+ void OnEOF(const std::string& socket_path);
+
+ private:
+ friend struct DefaultSingletonTraits<DeviceSocketManager>;
+
+ struct SocketData {
+ SocketData()
+ : fd(-1) {
+ }
+
+ int fd;
+ DeviceSocketListeners observers;
+ scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller;
+ scoped_ptr<DeviceSocketReader> watcher;
+ };
+
+ DeviceSocketManager(scoped_refptr<base::TaskRunner> io_task_runner)
+ : io_task_runner_(io_task_runner) {
+ }
+
+ ~DeviceSocketManager() {
+ STLDeleteContainerPairSecondPointers(socket_data_.begin(),
+ socket_data_.end());
+ }
+
+ void StartListeningOnIO(const std::string& socket_path,
+ size_t data_size,
+ DeviceSocketListener* listener);
+
+ void StopListeningOnIO(const std::string& socket_path,
+ DeviceSocketListener* listener);
+
+ void CloseSocket(const std::string& socket_path);
+
+ std::map<std::string, SocketData*> socket_data_;
+ scoped_refptr<base::TaskRunner> io_task_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+// DeviceSocketReader
+
+void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd) {
+ ssize_t read_size = recv(fd, data_.get(), data_size_, 0);
+ if (read_size < 0) {
+ if (errno == EINTR)
+ return;
+ DeviceSocketManager::GetInstance()->OnError(socket_path_, errno);
+ return;
+ }
+ if (read_size == 0) {
+ DeviceSocketManager::GetInstance()->OnEOF(socket_path_);
+ return;
+ }
+ if (read_size != static_cast<ssize_t>(data_size_))
+ return;
+ DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_,
+ data_.get());
+}
+
+void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd) {
+ NOTREACHED();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// DeviceSocketManager
+
+void DeviceSocketManager::StartListening(const std::string& socket_path,
+ size_t data_size,
+ DeviceSocketListener* listener) {
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&DeviceSocketManager::StartListeningOnIO,
+ base::Unretained(this), socket_path, data_size, listener));
+}
+
+void DeviceSocketManager::StopListening(const std::string& socket_path,
+ DeviceSocketListener* listener) {
+ io_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&DeviceSocketManager::StopListeningOnIO,
+ base::Unretained(this), socket_path, listener));
+}
+
+void DeviceSocketManager::OnDataAvailable(const std::string& socket_path,
+ const void* data) {
+ CHECK_GT(socket_data_.count(socket_path), 0UL);
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->observers;
+ FOR_EACH_OBSERVER(DeviceSocketListener, listeners, OnDataAvailableOnIO(data));
+}
+
+void DeviceSocketManager::CloseSocket(const std::string& socket_path) {
+ if (!socket_data_.count(socket_path))
+ return;
+ SocketData* socket_data = socket_data_[socket_path];
+ close(socket_data->fd);
+ delete socket_data;
+ socket_data_.erase(socket_path);
+}
+
+void DeviceSocketManager::OnError(const std::string& socket_path, int err) {
+ LOG(ERROR) << "Error reading from socket: " << socket_path << ": "
+ << strerror(err);
+ CloseSocket(socket_path);
+ // TODO(flackr): Notify listeners that the socket was closed unexpectedly.
+}
+
+void DeviceSocketManager::OnEOF(const std::string& socket_path) {
+ LOG(ERROR) << "EOF reading from socket: " << socket_path;
+ CloseSocket(socket_path);
+}
+
+void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path,
+ size_t data_size,
+ DeviceSocketListener* listener) {
+ CHECK(io_task_runner_->RunsTasksOnCurrentThread());
+ SocketData* socket_data = NULL;
+ if (!socket_data_.count(socket_path)) {
+ int socket_fd = -1;
+ if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path),
+ &socket_fd)) {
+ LOG(ERROR) << "Error connecting to socket: " << socket_path;
+ return;
+ }
+
+ socket_data = new SocketData;
+ socket_data_[socket_path] = socket_data;
+
+ socket_data->fd = socket_fd;
+
+ socket_data->controller.reset(
+ new base::MessagePumpLibevent::FileDescriptorWatcher());
+ socket_data->watcher.reset(
+ new DeviceSocketReader(socket_path, data_size));
+
+ base::MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_fd,
+ true,
+ base::MessageLoopForIO::WATCH_READ,
+ socket_data->controller.get(),
+ socket_data->watcher.get());
+ } else {
+ socket_data = socket_data_[socket_path];
+ }
+ socket_data->observers.AddObserver(listener);
+}
+
+void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path,
+ DeviceSocketListener* listener) {
+ if (!socket_data_.count(socket_path))
+ return; // Happens if unable to create a socket.
+
+ CHECK(io_task_runner_->RunsTasksOnCurrentThread());
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->observers;
+ listeners.RemoveObserver(listener);
+ if (!listeners.might_have_observers()) {
+ // All listeners for this socket has been removed. Close the socket.
+ CloseSocket(socket_path);
+ }
+}
+
+} // namespace
+
+DeviceSocketListener::DeviceSocketListener(const std::string& socket_path,
+ size_t data_size)
+ : socket_path_(socket_path),
+ data_size_(data_size) {
+}
+
+DeviceSocketListener::~DeviceSocketListener() {
+ StopListening();
+}
+
+// static
+void DeviceSocketListener::CreateSocketManager(
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ DeviceSocketManager::Create(io_task_runner);
+}
+
+// static
+void DeviceSocketListener::ShutdownSocketManager() {
+ DeviceSocketManager::Shutdown();
+}
+
+void DeviceSocketListener::StartListening() {
+ DeviceSocketManager::GetInstance()->StartListening(socket_path_,
+ data_size_,
+ this);
+}
+
+void DeviceSocketListener::StopListening() {
+ DeviceSocketManager::GetInstance()->StopListening(socket_path_, this);
+}
+
+} // namespace athena

Powered by Google App Engine
This is Rietveld 408576698