Index: athena/system/device_socket_listener.cc |
diff --git a/athena/system/device_socket_listener.cc b/athena/system/device_socket_listener.cc |
deleted file mode 100644 |
index e041e44c5413d38bfdedcc53eb3781a8c7668e27..0000000000000000000000000000000000000000 |
--- a/athena/system/device_socket_listener.cc |
+++ /dev/null |
@@ -1,287 +0,0 @@ |
-// Copyright 2014 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-#include "athena/system/device_socket_listener.h" |
- |
-#include <errno.h> |
-#include <map> |
-#include <sys/socket.h> |
-#include <sys/types.h> |
-#include <vector> |
- |
-#include "base/bind.h" |
-#include "base/files/file_path.h" |
-#include "base/memory/singleton.h" |
-#include "base/message_loop/message_loop.h" |
-#include "base/observer_list.h" |
-#include "base/stl_util.h" |
-#include "ipc/unix_domain_socket_util.h" |
- |
-namespace athena { |
- |
-namespace { |
- |
-typedef ObserverList<DeviceSocketListener> DeviceSocketListeners; |
- |
-// Reads from a device socket blocks of a particular size. When that amount of |
-// data is read DeviceSocketManager::OnDataAvailable is called on the singleton |
-// instance which then informs all of the listeners on that socket. |
-class DeviceSocketReader : public base::MessagePumpLibevent::Watcher { |
- public: |
- DeviceSocketReader(const std::string& socket_path, |
- size_t data_size) |
- : socket_path_(socket_path), |
- data_size_(data_size), |
- data_(new char[data_size]) { |
- } |
- virtual ~DeviceSocketReader() {} |
- |
- private: |
- // Overidden from base::MessagePumpLibevent::Watcher. |
- virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
- virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
- |
- std::string socket_path_; |
- size_t data_size_; |
- scoped_ptr<char[]> data_; |
- |
- DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader); |
-}; |
- |
-class DeviceSocketManager; |
-DeviceSocketManager* device_socket_manager_instance_ = NULL; |
- |
-// A singleton instance for managing all connections to sockets. |
-class DeviceSocketManager { |
- public: |
- static void Create(scoped_refptr<base::TaskRunner> io_task_runner) { |
- device_socket_manager_instance_ = |
- new DeviceSocketManager(io_task_runner); |
- } |
- |
- static void Shutdown() { |
- CHECK(device_socket_manager_instance_); |
- delete device_socket_manager_instance_; |
- device_socket_manager_instance_ = NULL; |
- } |
- |
- static DeviceSocketManager* GetInstance() { |
- CHECK(device_socket_manager_instance_); |
- return device_socket_manager_instance_; |
- } |
- |
- // If there isn't an existing connection to |socket_path|, then opens a |
- // connection to |socket_path| and starts listening for data. All listeners |
- // for |socket_path| receives data when data is available on the socket. |
- void StartListening(const std::string& socket_path, |
- size_t data_size, |
- DeviceSocketListener* listener); |
- |
- // Removes |listener| from the list of listeners that receive data from |
- // |socket_path|. If this is the last listener, then this closes the |
- // connection to the socket. |
- void StopListening(const std::string& socket_path, |
- DeviceSocketListener* listener); |
- |
- // Sends data to all the listeners registered to receive data from |
- // |socket_path|. |
- void OnDataAvailable(const std::string& socket_path, |
- const void* data); |
- |
- // Notifies listeners of errors reading from the socket and closes it. |
- void OnError(const std::string& socket_path, int err); |
- void OnEOF(const std::string& socket_path); |
- |
- private: |
- friend struct DefaultSingletonTraits<DeviceSocketManager>; |
- |
- struct SocketData { |
- SocketData() |
- : fd(-1) { |
- } |
- |
- int fd; |
- DeviceSocketListeners observers; |
- scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; |
- scoped_ptr<DeviceSocketReader> watcher; |
- }; |
- |
- DeviceSocketManager(scoped_refptr<base::TaskRunner> io_task_runner) |
- : io_task_runner_(io_task_runner) { |
- } |
- |
- ~DeviceSocketManager() { |
- STLDeleteContainerPairSecondPointers(socket_data_.begin(), |
- socket_data_.end()); |
- } |
- |
- void StartListeningOnIO(const std::string& socket_path, |
- size_t data_size, |
- DeviceSocketListener* listener); |
- |
- void StopListeningOnIO(const std::string& socket_path, |
- DeviceSocketListener* listener); |
- |
- void CloseSocket(const std::string& socket_path); |
- |
- std::map<std::string, SocketData*> socket_data_; |
- scoped_refptr<base::TaskRunner> io_task_runner_; |
- |
- DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); |
-}; |
- |
-//////////////////////////////////////////////////////////////////////////////// |
-// DeviceSocketReader |
- |
-void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd) { |
- ssize_t read_size = recv(fd, data_.get(), data_size_, 0); |
- if (read_size < 0) { |
- if (errno == EINTR) |
- return; |
- DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); |
- return; |
- } |
- if (read_size == 0) { |
- DeviceSocketManager::GetInstance()->OnEOF(socket_path_); |
- return; |
- } |
- if (read_size != static_cast<ssize_t>(data_size_)) |
- return; |
- DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, |
- data_.get()); |
-} |
- |
-void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd) { |
- NOTREACHED(); |
-} |
- |
-//////////////////////////////////////////////////////////////////////////////// |
-// DeviceSocketManager |
- |
-void DeviceSocketManager::StartListening(const std::string& socket_path, |
- size_t data_size, |
- DeviceSocketListener* listener) { |
- io_task_runner_->PostTask(FROM_HERE, |
- base::Bind(&DeviceSocketManager::StartListeningOnIO, |
- base::Unretained(this), socket_path, data_size, listener)); |
-} |
- |
-void DeviceSocketManager::StopListening(const std::string& socket_path, |
- DeviceSocketListener* listener) { |
- io_task_runner_->PostTask(FROM_HERE, |
- base::Bind(&DeviceSocketManager::StopListeningOnIO, |
- base::Unretained(this), socket_path, listener)); |
-} |
- |
-void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, |
- const void* data) { |
- CHECK_GT(socket_data_.count(socket_path), 0UL); |
- DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; |
- FOR_EACH_OBSERVER(DeviceSocketListener, listeners, OnDataAvailableOnIO(data)); |
-} |
- |
-void DeviceSocketManager::CloseSocket(const std::string& socket_path) { |
- if (!socket_data_.count(socket_path)) |
- return; |
- SocketData* socket_data = socket_data_[socket_path]; |
- close(socket_data->fd); |
- delete socket_data; |
- socket_data_.erase(socket_path); |
-} |
- |
-void DeviceSocketManager::OnError(const std::string& socket_path, int err) { |
- LOG(ERROR) << "Error reading from socket: " << socket_path << ": " |
- << strerror(err); |
- CloseSocket(socket_path); |
- // TODO(flackr): Notify listeners that the socket was closed unexpectedly. |
-} |
- |
-void DeviceSocketManager::OnEOF(const std::string& socket_path) { |
- LOG(ERROR) << "EOF reading from socket: " << socket_path; |
- CloseSocket(socket_path); |
-} |
- |
-void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, |
- size_t data_size, |
- DeviceSocketListener* listener) { |
- CHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
- SocketData* socket_data = NULL; |
- if (!socket_data_.count(socket_path)) { |
- int socket_fd = -1; |
- if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), |
- &socket_fd)) { |
- LOG(ERROR) << "Error connecting to socket: " << socket_path; |
- return; |
- } |
- |
- socket_data = new SocketData; |
- socket_data_[socket_path] = socket_data; |
- |
- socket_data->fd = socket_fd; |
- |
- socket_data->controller.reset( |
- new base::MessagePumpLibevent::FileDescriptorWatcher()); |
- socket_data->watcher.reset( |
- new DeviceSocketReader(socket_path, data_size)); |
- |
- base::MessageLoopForIO::current()->WatchFileDescriptor( |
- socket_fd, |
- true, |
- base::MessageLoopForIO::WATCH_READ, |
- socket_data->controller.get(), |
- socket_data->watcher.get()); |
- } else { |
- socket_data = socket_data_[socket_path]; |
- } |
- socket_data->observers.AddObserver(listener); |
-} |
- |
-void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, |
- DeviceSocketListener* listener) { |
- if (!socket_data_.count(socket_path)) |
- return; // Happens if unable to create a socket. |
- |
- CHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
- DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; |
- listeners.RemoveObserver(listener); |
- if (!listeners.might_have_observers()) { |
- // All listeners for this socket has been removed. Close the socket. |
- CloseSocket(socket_path); |
- } |
-} |
- |
-} // namespace |
- |
-DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, |
- size_t data_size) |
- : socket_path_(socket_path), |
- data_size_(data_size) { |
-} |
- |
-DeviceSocketListener::~DeviceSocketListener() { |
- StopListening(); |
-} |
- |
-// static |
-void DeviceSocketListener::CreateSocketManager( |
- scoped_refptr<base::TaskRunner> io_task_runner) { |
- DeviceSocketManager::Create(io_task_runner); |
-} |
- |
-// static |
-void DeviceSocketListener::ShutdownSocketManager() { |
- DeviceSocketManager::Shutdown(); |
-} |
- |
-void DeviceSocketListener::StartListening() { |
- DeviceSocketManager::GetInstance()->StartListening(socket_path_, |
- data_size_, |
- this); |
-} |
- |
-void DeviceSocketListener::StopListening() { |
- DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); |
-} |
- |
-} // namespace athena |