Chromium Code Reviews| Index: athena/system/device_socket_listener.cc |
| diff --git a/athena/system/device_socket_listener.cc b/athena/system/device_socket_listener.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8bb9970b405c5b21015b99d0c3bea5e8d190e214 |
| --- /dev/null |
| +++ b/athena/system/device_socket_listener.cc |
| @@ -0,0 +1,264 @@ |
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
|
oshima
2014/08/01 17:15:11
2014
flackr
2014/08/06 14:13:51
Done.
|
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "athena/system/device_socket_listener.h" |
| + |
| +#include <errno.h> |
| +#include <map> |
| +#include <sys/socket.h> |
| +#include <sys/types.h> |
| +#include <vector> |
| + |
| +#include "base/bind.h" |
| +#include "base/files/file_path.h" |
| +#include "base/memory/singleton.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/stl_util.h" |
| +#include "content/public/browser/browser_thread.h" |
| +#include "ipc/unix_domain_socket_util.h" |
| + |
| +namespace athena { |
| + |
| +namespace { |
| + |
| +typedef std::vector<DeviceSocketListener*> DeviceSocketListeners; |
|
oshima
2014/08/01 17:15:11
ObserverList<DeviceSocketListener> may help?
flackr
2014/08/06 14:13:51
Done.
|
| + |
| +class DeviceSocketWatcher : public base::MessagePumpLibevent::Watcher { |
|
oshima
2014/08/01 17:15:11
document this class, and should this be called Rea
flackr
2014/08/06 14:13:51
Done.
|
| + public: |
| + DeviceSocketWatcher(const std::string& socket_path, |
| + size_t data_size) |
| + : socket_path_(socket_path), |
| + data_size_(data_size), |
| + data_(new char[data_size]) { |
| + } |
| + virtual ~DeviceSocketWatcher() {} |
| + |
| + private: |
| + // Overidden from base::MessagePumpLibevent::Watcher. |
| + virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
| + virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
| + |
| + std::string socket_path_; |
| + size_t data_size_; |
| + scoped_ptr<char[]> data_; |
|
oshima
2014/08/01 17:15:11
there is a StringPiece class. since it's small, I'
|
| + |
| + DISALLOW_COPY_AND_ASSIGN(DeviceSocketWatcher); |
| +}; |
| + |
| +// A singleton instance for managing all connections to sockets. |
| +class DeviceSocketManager { |
| + public: |
| + static DeviceSocketManager* GetInstance() { |
| + return Singleton<DeviceSocketManager>::get(); |
| + } |
| + |
| + // If there isn't an existing connection to |socket_path|, then opens a |
| + // connection to |socket_path| and starts listening for data. All listeners |
| + // for |socket_path| receives data when data is available on the socket. |
| + void StartListening(const std::string& socket_path, |
| + size_t data_size, |
| + DeviceSocketListener* listener); |
| + |
| + // Removes |listener| from the list of listeners that receive data from |
| + // |socket_path|. If this is the last listener, then this closes the |
| + // connection to the socket. |
| + void StopListening(const std::string& socket_path, |
| + DeviceSocketListener* listener); |
| + |
| + // Sends data to all the listeners registered to receive data from |
| + // |socket_path|. |
| + void OnDataAvailable(const std::string& socket_path, |
| + const void* data); |
| + |
| + // Notifies listeners of errors reading from the socket and closes it. |
| + void OnError(const std::string& socket_path, int err); |
| + void OnEOF(const std::string& socket_path); |
| + |
| + private: |
| + friend struct DefaultSingletonTraits<DeviceSocketManager>; |
| + |
| + struct SocketData { |
| + SocketData() |
| + : fd(-1) { |
| + } |
| + |
| + int fd; |
| + DeviceSocketListeners listeners; |
| + scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; |
| + scoped_ptr<DeviceSocketWatcher> watcher; |
| + }; |
| + |
| + DeviceSocketManager() { |
| + } |
| + |
| + ~DeviceSocketManager() { |
| + STLDeleteContainerPairSecondPointers(socket_data_.begin(), |
| + socket_data_.end()); |
| + } |
| + |
| + void StartListeningOnIO(const std::string& socket_path, |
| + size_t data_size, |
| + DeviceSocketListener* listener); |
| + |
| + void StopListeningOnIO(const std::string& socket_path, |
| + DeviceSocketListener* listener); |
| + |
| + void CloseSocket(const std::string& socket_path); |
| + |
| + std::map<std::string, SocketData*> socket_data_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); |
| +}; |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| +// DeviceSocketWatcher |
| + |
| +void DeviceSocketWatcher::OnFileCanReadWithoutBlocking(int fd) { |
| + ssize_t read_size = recv(fd, data_.get(), data_size_, 0); |
| + if (read_size < 0) { |
| + if (errno == EINTR) |
| + return; |
| + DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); |
| + return; |
| + } |
| + if (read_size == 0) { |
| + DeviceSocketManager::GetInstance()->OnEOF(socket_path_); |
| + return; |
| + } |
| + if (read_size != static_cast<ssize_t>(data_size_)) |
| + return; |
| + DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, |
| + data_.get()); |
| +} |
| + |
| +void DeviceSocketWatcher::OnFileCanWriteWithoutBlocking(int fd) { |
| + NOTREACHED(); |
| +} |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| +// DeviceSocketManager |
| + |
| +void DeviceSocketManager::StartListening(const std::string& socket_path, |
| + size_t data_size, |
| + DeviceSocketListener* listener) { |
| + content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, |
| + base::Bind(&DeviceSocketManager::StartListeningOnIO, |
| + base::Unretained(this), socket_path, data_size, listener)); |
| +} |
| + |
| +void DeviceSocketManager::StopListening(const std::string& socket_path, |
| + DeviceSocketListener* listener) { |
| + content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, |
| + base::Bind(&DeviceSocketManager::StopListeningOnIO, |
| + base::Unretained(this), socket_path, listener)); |
| +} |
| + |
| +void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, |
| + const void* data) { |
| + CHECK_GT(socket_data_.count(socket_path), 0UL); |
| + DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; |
| + DeviceSocketListeners::iterator i = listeners.begin(); |
| + for (; i != listeners.end(); ++i) { |
| + (*i)->OnDataAvailableOnIO(data); |
| + } |
| +} |
| + |
| +void DeviceSocketManager::CloseSocket(const std::string& socket_path) { |
| + if (!socket_data_.count(socket_path)) |
| + return; |
| + SocketData* socket_data = socket_data_[socket_path]; |
| + close(socket_data->fd); |
| + delete socket_data; |
| + socket_data_.erase(socket_path); |
| +} |
| + |
| +void DeviceSocketManager::OnError(const std::string& socket_path, int err) { |
| + LOG(ERROR) << "Error reading from socket: " << socket_path << ": " |
| + << strerror(err); |
| + CloseSocket(socket_path); |
|
oshima
2014/08/01 17:15:11
probably not now, but maybe useful to notify liste
flackr
2014/08/06 14:13:51
Added TODO.
|
| +} |
| + |
| +void DeviceSocketManager::OnEOF(const std::string& socket_path) { |
| + LOG(ERROR) << "EOF reading from socket: " << socket_path; |
| + CloseSocket(socket_path); |
| +} |
| + |
| +void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, |
| + size_t data_size, |
| + DeviceSocketListener* listener) { |
| + CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); |
| + SocketData* socket_data = NULL; |
| + if (!socket_data_.count(socket_path)) { |
| + int socket_fd = -1; |
| + if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), |
| + &socket_fd)) { |
| + LOG(ERROR) << "Error connecting to socket: " << socket_path; |
| + return; |
| + } |
| + |
| + socket_data = new SocketData; |
| + socket_data_[socket_path] = socket_data; |
| + |
| + socket_data->fd = socket_fd; |
| + |
| + socket_data->controller.reset( |
| + new base::MessagePumpLibevent::FileDescriptorWatcher()); |
| + socket_data->watcher.reset( |
| + new DeviceSocketWatcher(socket_path, data_size)); |
| + |
| + base::MessageLoopForIO::current()->WatchFileDescriptor( |
| + socket_fd, |
| + true, |
| + base::MessageLoopForIO::WATCH_READ, |
| + socket_data->controller.get(), |
| + socket_data->watcher.get()); |
| + } else { |
| + socket_data = socket_data_[socket_path]; |
| + } |
| + |
| + socket_data->listeners.push_back(listener); |
| +} |
| + |
| +void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, |
| + DeviceSocketListener* listener) { |
| + if (!socket_data_.count(socket_path)) |
| + return; // Happens if unable to create a socket. |
| + |
| + CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); |
| + DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; |
| + DeviceSocketListeners::iterator i = |
| + std::find(listeners.begin(), listeners.end(), listener); |
| + if (i != listeners.end()) { |
| + listeners.erase(i); |
| + |
| + if (listeners.size() == 0) { |
| + // All listeners for this socket has been removed. Close the socket. |
| + CloseSocket(socket_path); |
| + } |
| + } |
| +} |
| + |
| +} // namespace |
| + |
| +DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, |
| + size_t data_size) |
| + : socket_path_(socket_path), |
| + data_size_(data_size) { |
| +} |
| + |
| +DeviceSocketListener::~DeviceSocketListener() { |
| + StopListening(); |
| +} |
| + |
| +void DeviceSocketListener::StartListening() { |
| + DeviceSocketManager::GetInstance()->StartListening(socket_path_, |
| + data_size_, |
| + this); |
| +} |
| + |
| +void DeviceSocketListener::StopListening() { |
| + DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); |
| +} |
| + |
| +} // namespace athena |