Index: athena/system/device_socket_listener.cc |
diff --git a/athena/system/device_socket_listener.cc b/athena/system/device_socket_listener.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8bb9970b405c5b21015b99d0c3bea5e8d190e214 |
--- /dev/null |
+++ b/athena/system/device_socket_listener.cc |
@@ -0,0 +1,264 @@ |
+// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
oshima
2014/08/01 17:15:11
2014
flackr
2014/08/06 14:13:51
Done.
|
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "athena/system/device_socket_listener.h" |
+ |
+#include <errno.h> |
+#include <map> |
+#include <sys/socket.h> |
+#include <sys/types.h> |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/files/file_path.h" |
+#include "base/memory/singleton.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "content/public/browser/browser_thread.h" |
+#include "ipc/unix_domain_socket_util.h" |
+ |
+namespace athena { |
+ |
+namespace { |
+ |
+typedef std::vector<DeviceSocketListener*> DeviceSocketListeners; |
oshima
2014/08/01 17:15:11
ObserverList<DeviceSocketListener> may help?
flackr
2014/08/06 14:13:51
Done.
|
+ |
+class DeviceSocketWatcher : public base::MessagePumpLibevent::Watcher { |
oshima
2014/08/01 17:15:11
document this class, and should this be called Rea
flackr
2014/08/06 14:13:51
Done.
|
+ public: |
+ DeviceSocketWatcher(const std::string& socket_path, |
+ size_t data_size) |
+ : socket_path_(socket_path), |
+ data_size_(data_size), |
+ data_(new char[data_size]) { |
+ } |
+ virtual ~DeviceSocketWatcher() {} |
+ |
+ private: |
+ // Overidden from base::MessagePumpLibevent::Watcher. |
+ virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; |
+ virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; |
+ |
+ std::string socket_path_; |
+ size_t data_size_; |
+ scoped_ptr<char[]> data_; |
oshima
2014/08/01 17:15:11
there is a StringPiece class. since it's small, I'
|
+ |
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketWatcher); |
+}; |
+ |
+// A singleton instance for managing all connections to sockets. |
+class DeviceSocketManager { |
+ public: |
+ static DeviceSocketManager* GetInstance() { |
+ return Singleton<DeviceSocketManager>::get(); |
+ } |
+ |
+ // If there isn't an existing connection to |socket_path|, then opens a |
+ // connection to |socket_path| and starts listening for data. All listeners |
+ // for |socket_path| receives data when data is available on the socket. |
+ void StartListening(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener); |
+ |
+ // Removes |listener| from the list of listeners that receive data from |
+ // |socket_path|. If this is the last listener, then this closes the |
+ // connection to the socket. |
+ void StopListening(const std::string& socket_path, |
+ DeviceSocketListener* listener); |
+ |
+ // Sends data to all the listeners registered to receive data from |
+ // |socket_path|. |
+ void OnDataAvailable(const std::string& socket_path, |
+ const void* data); |
+ |
+ // Notifies listeners of errors reading from the socket and closes it. |
+ void OnError(const std::string& socket_path, int err); |
+ void OnEOF(const std::string& socket_path); |
+ |
+ private: |
+ friend struct DefaultSingletonTraits<DeviceSocketManager>; |
+ |
+ struct SocketData { |
+ SocketData() |
+ : fd(-1) { |
+ } |
+ |
+ int fd; |
+ DeviceSocketListeners listeners; |
+ scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; |
+ scoped_ptr<DeviceSocketWatcher> watcher; |
+ }; |
+ |
+ DeviceSocketManager() { |
+ } |
+ |
+ ~DeviceSocketManager() { |
+ STLDeleteContainerPairSecondPointers(socket_data_.begin(), |
+ socket_data_.end()); |
+ } |
+ |
+ void StartListeningOnIO(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener); |
+ |
+ void StopListeningOnIO(const std::string& socket_path, |
+ DeviceSocketListener* listener); |
+ |
+ void CloseSocket(const std::string& socket_path); |
+ |
+ std::map<std::string, SocketData*> socket_data_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); |
+}; |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+// DeviceSocketWatcher |
+ |
+void DeviceSocketWatcher::OnFileCanReadWithoutBlocking(int fd) { |
+ ssize_t read_size = recv(fd, data_.get(), data_size_, 0); |
+ if (read_size < 0) { |
+ if (errno == EINTR) |
+ return; |
+ DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); |
+ return; |
+ } |
+ if (read_size == 0) { |
+ DeviceSocketManager::GetInstance()->OnEOF(socket_path_); |
+ return; |
+ } |
+ if (read_size != static_cast<ssize_t>(data_size_)) |
+ return; |
+ DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, |
+ data_.get()); |
+} |
+ |
+void DeviceSocketWatcher::OnFileCanWriteWithoutBlocking(int fd) { |
+ NOTREACHED(); |
+} |
+ |
+//////////////////////////////////////////////////////////////////////////////// |
+// DeviceSocketManager |
+ |
+void DeviceSocketManager::StartListening(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener) { |
+ content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, |
+ base::Bind(&DeviceSocketManager::StartListeningOnIO, |
+ base::Unretained(this), socket_path, data_size, listener)); |
+} |
+ |
+void DeviceSocketManager::StopListening(const std::string& socket_path, |
+ DeviceSocketListener* listener) { |
+ content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, |
+ base::Bind(&DeviceSocketManager::StopListeningOnIO, |
+ base::Unretained(this), socket_path, listener)); |
+} |
+ |
+void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, |
+ const void* data) { |
+ CHECK_GT(socket_data_.count(socket_path), 0UL); |
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; |
+ DeviceSocketListeners::iterator i = listeners.begin(); |
+ for (; i != listeners.end(); ++i) { |
+ (*i)->OnDataAvailableOnIO(data); |
+ } |
+} |
+ |
+void DeviceSocketManager::CloseSocket(const std::string& socket_path) { |
+ if (!socket_data_.count(socket_path)) |
+ return; |
+ SocketData* socket_data = socket_data_[socket_path]; |
+ close(socket_data->fd); |
+ delete socket_data; |
+ socket_data_.erase(socket_path); |
+} |
+ |
+void DeviceSocketManager::OnError(const std::string& socket_path, int err) { |
+ LOG(ERROR) << "Error reading from socket: " << socket_path << ": " |
+ << strerror(err); |
+ CloseSocket(socket_path); |
oshima
2014/08/01 17:15:11
probably not now, but maybe useful to notify liste
flackr
2014/08/06 14:13:51
Added TODO.
|
+} |
+ |
+void DeviceSocketManager::OnEOF(const std::string& socket_path) { |
+ LOG(ERROR) << "EOF reading from socket: " << socket_path; |
+ CloseSocket(socket_path); |
+} |
+ |
+void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, |
+ size_t data_size, |
+ DeviceSocketListener* listener) { |
+ CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); |
+ SocketData* socket_data = NULL; |
+ if (!socket_data_.count(socket_path)) { |
+ int socket_fd = -1; |
+ if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), |
+ &socket_fd)) { |
+ LOG(ERROR) << "Error connecting to socket: " << socket_path; |
+ return; |
+ } |
+ |
+ socket_data = new SocketData; |
+ socket_data_[socket_path] = socket_data; |
+ |
+ socket_data->fd = socket_fd; |
+ |
+ socket_data->controller.reset( |
+ new base::MessagePumpLibevent::FileDescriptorWatcher()); |
+ socket_data->watcher.reset( |
+ new DeviceSocketWatcher(socket_path, data_size)); |
+ |
+ base::MessageLoopForIO::current()->WatchFileDescriptor( |
+ socket_fd, |
+ true, |
+ base::MessageLoopForIO::WATCH_READ, |
+ socket_data->controller.get(), |
+ socket_data->watcher.get()); |
+ } else { |
+ socket_data = socket_data_[socket_path]; |
+ } |
+ |
+ socket_data->listeners.push_back(listener); |
+} |
+ |
+void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, |
+ DeviceSocketListener* listener) { |
+ if (!socket_data_.count(socket_path)) |
+ return; // Happens if unable to create a socket. |
+ |
+ CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); |
+ DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; |
+ DeviceSocketListeners::iterator i = |
+ std::find(listeners.begin(), listeners.end(), listener); |
+ if (i != listeners.end()) { |
+ listeners.erase(i); |
+ |
+ if (listeners.size() == 0) { |
+ // All listeners for this socket has been removed. Close the socket. |
+ CloseSocket(socket_path); |
+ } |
+ } |
+} |
+ |
+} // namespace |
+ |
+DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, |
+ size_t data_size) |
+ : socket_path_(socket_path), |
+ data_size_(data_size) { |
+} |
+ |
+DeviceSocketListener::~DeviceSocketListener() { |
+ StopListening(); |
+} |
+ |
+void DeviceSocketListener::StartListening() { |
+ DeviceSocketManager::GetInstance()->StartListening(socket_path_, |
+ data_size_, |
+ this); |
+} |
+ |
+void DeviceSocketListener::StopListening() { |
+ DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); |
+} |
+ |
+} // namespace athena |