Index: athena/system/device_socket_listener.cc |
diff --git a/athena/system/device_socket_listener.cc b/athena/system/device_socket_listener.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e041e44c5413d38bfdedcc53eb3781a8c7668e27 |
--- /dev/null |
+++ b/athena/system/device_socket_listener.cc |
@@ -0,0 +1,287 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "athena/system/device_socket_listener.h" |
+ |
+#include <errno.h> |
+#include <map> |
+#include <sys/socket.h> |
+#include <sys/types.h> |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/files/file_path.h" |
+#include "base/memory/singleton.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/observer_list.h" |
+#include "base/stl_util.h" |
+#include "ipc/unix_domain_socket_util.h" |
+ |
+namespace athena { |
+ |
+namespace { |
+ |
+typedef ObserverList<DeviceSocketListener> DeviceSocketListeners; |
+ |
+// Reads from a device socket blocks of a particular size. When that amount of |
+// data is read DeviceSocketManager::OnDataAvailable is called on the singleton |
+// instance which then informs all of the listeners on that socket. |
+class DeviceSocketReader : public base::MessagePumpLibevent::Watcher { |
+ public: |
+ DeviceSocketReader(const std::string& socket_path, |
+ size_t data_size) |
+ : socket_path_(socket_path), |
+ data_size_(data_size), |
+ data_(new char[data_size]) { |
+ } |
+ virtual ~DeviceSocketReader() {} |
+ |
+ private: |
+ // Overidden from base::MessagePumpLibevent::Watcher. |
+ virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
+ virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
+ |
+ std::string socket_path_; |
+ size_t data_size_; |
+ scoped_ptr<char[]> data_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader); |
+}; |
+ |
+class DeviceSocketManager; |
+DeviceSocketManager* device_socket_manager_instance_ = NULL; |
+ |
+// A singleton instance for managing all connections to sockets. |
+class DeviceSocketManager { |
+ public: |
+ static void Create(scoped_refptr<base::TaskRunner> io_task_runner) { |
+ device_socket_manager_instance_ = |
+ new DeviceSocketManager(io_task_runner); |
+ } |
+ |
+ static void Shutdown() { |
+ CHECK(device_socket_manager_instance_); |
+ delete device_socket_manager_instance_; |
+ device_socket_manager_instance_ = NULL; |
+ } |
+ |
+ static DeviceSocketManager* GetInstance() { |
+ CHECK(device_socket_manager_instance_); |
+ return device_socket_manager_instance_; |
+ } |
+ |
+ // If there isn't an existing connection to |socket_path|, then opens a |
+ // connection to |socket_path| and starts listening for data. All listeners |
+ // for |socket_path| receives data when data is available on the socket. |
+ void StartListening(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener); |
+ |
+ // Removes |listener| from the list of listeners that receive data from |
+ // |socket_path|. If this is the last listener, then this closes the |
+ // connection to the socket. |
+ void StopListening(const std::string& socket_path, |
+ DeviceSocketListener* listener); |
+ |
+ // Sends data to all the listeners registered to receive data from |
+ // |socket_path|. |
+ void OnDataAvailable(const std::string& socket_path, |
+ const void* data); |
+ |
+ // Notifies listeners of errors reading from the socket and closes it. |
+ void OnError(const std::string& socket_path, int err); |
+ void OnEOF(const std::string& socket_path); |
+ |
+ private: |
+ friend struct DefaultSingletonTraits<DeviceSocketManager>; |
+ |
+ struct SocketData { |
+ SocketData() |
+ : fd(-1) { |
+ } |
+ |
+ int fd; |
+ DeviceSocketListeners observers; |
+ scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; |
+ scoped_ptr<DeviceSocketReader> watcher; |
+ }; |
+ |
+ DeviceSocketManager(scoped_refptr<base::TaskRunner> io_task_runner) |
+ : io_task_runner_(io_task_runner) { |
+ } |
+ |
+ ~DeviceSocketManager() { |
+ STLDeleteContainerPairSecondPointers(socket_data_.begin(), |
+ socket_data_.end()); |
+ } |
+ |
+ void StartListeningOnIO(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener); |
+ |
+ void StopListeningOnIO(const std::string& socket_path, |
+ DeviceSocketListener* listener); |
+ |
+ void CloseSocket(const std::string& socket_path); |
+ |
+ std::map<std::string, SocketData*> socket_data_; |
+ scoped_refptr<base::TaskRunner> io_task_runner_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); |
+}; |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+// DeviceSocketReader |
+ |
+void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd) { |
+ ssize_t read_size = recv(fd, data_.get(), data_size_, 0); |
+ if (read_size < 0) { |
+ if (errno == EINTR) |
+ return; |
+ DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); |
+ return; |
+ } |
+ if (read_size == 0) { |
+ DeviceSocketManager::GetInstance()->OnEOF(socket_path_); |
+ return; |
+ } |
+ if (read_size != static_cast<ssize_t>(data_size_)) |
+ return; |
+ DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, |
+ data_.get()); |
+} |
+ |
+void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd) { |
+ NOTREACHED(); |
+} |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+// DeviceSocketManager |
+ |
+void DeviceSocketManager::StartListening(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener) { |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&DeviceSocketManager::StartListeningOnIO, |
+ base::Unretained(this), socket_path, data_size, listener)); |
+} |
+ |
+void DeviceSocketManager::StopListening(const std::string& socket_path, |
+ DeviceSocketListener* listener) { |
+ io_task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&DeviceSocketManager::StopListeningOnIO, |
+ base::Unretained(this), socket_path, listener)); |
+} |
+ |
+void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, |
+ const void* data) { |
+ CHECK_GT(socket_data_.count(socket_path), 0UL); |
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; |
+ FOR_EACH_OBSERVER(DeviceSocketListener, listeners, OnDataAvailableOnIO(data)); |
+} |
+ |
+void DeviceSocketManager::CloseSocket(const std::string& socket_path) { |
+ if (!socket_data_.count(socket_path)) |
+ return; |
+ SocketData* socket_data = socket_data_[socket_path]; |
+ close(socket_data->fd); |
+ delete socket_data; |
+ socket_data_.erase(socket_path); |
+} |
+ |
+void DeviceSocketManager::OnError(const std::string& socket_path, int err) { |
+ LOG(ERROR) << "Error reading from socket: " << socket_path << ": " |
+ << strerror(err); |
+ CloseSocket(socket_path); |
+ // TODO(flackr): Notify listeners that the socket was closed unexpectedly. |
+} |
+ |
+void DeviceSocketManager::OnEOF(const std::string& socket_path) { |
+ LOG(ERROR) << "EOF reading from socket: " << socket_path; |
+ CloseSocket(socket_path); |
+} |
+ |
+void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener) { |
+ CHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ SocketData* socket_data = NULL; |
+ if (!socket_data_.count(socket_path)) { |
+ int socket_fd = -1; |
+ if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), |
+ &socket_fd)) { |
+ LOG(ERROR) << "Error connecting to socket: " << socket_path; |
+ return; |
+ } |
+ |
+ socket_data = new SocketData; |
+ socket_data_[socket_path] = socket_data; |
+ |
+ socket_data->fd = socket_fd; |
+ |
+ socket_data->controller.reset( |
+ new base::MessagePumpLibevent::FileDescriptorWatcher()); |
+ socket_data->watcher.reset( |
+ new DeviceSocketReader(socket_path, data_size)); |
+ |
+ base::MessageLoopForIO::current()->WatchFileDescriptor( |
+ socket_fd, |
+ true, |
+ base::MessageLoopForIO::WATCH_READ, |
+ socket_data->controller.get(), |
+ socket_data->watcher.get()); |
+ } else { |
+ socket_data = socket_data_[socket_path]; |
+ } |
+ socket_data->observers.AddObserver(listener); |
+} |
+ |
+void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, |
+ DeviceSocketListener* listener) { |
+ if (!socket_data_.count(socket_path)) |
+ return; // Happens if unable to create a socket. |
+ |
+ CHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; |
+ listeners.RemoveObserver(listener); |
+ if (!listeners.might_have_observers()) { |
+ // All listeners for this socket has been removed. Close the socket. |
+ CloseSocket(socket_path); |
+ } |
+} |
+ |
+} // namespace |
+ |
+DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, |
+ size_t data_size) |
+ : socket_path_(socket_path), |
+ data_size_(data_size) { |
+} |
+ |
+DeviceSocketListener::~DeviceSocketListener() { |
+ StopListening(); |
+} |
+ |
+// static |
+void DeviceSocketListener::CreateSocketManager( |
+ scoped_refptr<base::TaskRunner> io_task_runner) { |
+ DeviceSocketManager::Create(io_task_runner); |
+} |
+ |
+// static |
+void DeviceSocketListener::ShutdownSocketManager() { |
+ DeviceSocketManager::Shutdown(); |
+} |
+ |
+void DeviceSocketListener::StartListening() { |
+ DeviceSocketManager::GetInstance()->StartListening(socket_path_, |
+ data_size_, |
+ this); |
+} |
+ |
+void DeviceSocketListener::StopListening() { |
+ DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); |
+} |
+ |
+} // namespace athena |