OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "athena/system/device_socket_listener.h" | |
6 | |
7 #include <errno.h> | |
8 #include <map> | |
9 #include <sys/socket.h> | |
10 #include <sys/types.h> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/files/file_path.h" | |
15 #include "base/memory/singleton.h" | |
16 #include "base/message_loop/message_loop.h" | |
17 #include "base/observer_list.h" | |
18 #include "base/stl_util.h" | |
19 #include "ipc/unix_domain_socket_util.h" | |
20 | |
21 namespace athena { | |
22 | |
23 namespace { | |
24 | |
25 typedef ObserverList<DeviceSocketListener> DeviceSocketListeners; | |
26 | |
27 // Reads from a device socket blocks of a particular size. When that amount of | |
28 // data is read DeviceSocketManager::OnDataAvailable is called on the singleton | |
29 // instance which then informs all of the listeners on that socket. | |
30 class DeviceSocketReader : public base::MessagePumpLibevent::Watcher { | |
31 public: | |
32 DeviceSocketReader(const std::string& socket_path, | |
33 size_t data_size) | |
34 : socket_path_(socket_path), | |
35 data_size_(data_size), | |
36 data_(new char[data_size]) { | |
37 } | |
38 virtual ~DeviceSocketReader() {} | |
39 | |
40 private: | |
41 // Overidden from base::MessagePumpLibevent::Watcher. | |
42 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | |
43 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | |
44 | |
45 std::string socket_path_; | |
46 size_t data_size_; | |
47 scoped_ptr<char[]> data_; | |
48 | |
49 DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader); | |
50 }; | |
51 | |
52 class DeviceSocketManager; | |
53 DeviceSocketManager* device_socket_manager_instance_ = NULL; | |
54 | |
55 // A singleton instance for managing all connections to sockets. | |
56 class DeviceSocketManager { | |
57 public: | |
58 static void Create(scoped_refptr<base::TaskRunner> io_task_runner) { | |
59 device_socket_manager_instance_ = | |
60 new DeviceSocketManager(io_task_runner); | |
61 } | |
62 | |
63 static void Shutdown() { | |
64 CHECK(device_socket_manager_instance_); | |
65 delete device_socket_manager_instance_; | |
66 device_socket_manager_instance_ = NULL; | |
67 } | |
68 | |
69 static DeviceSocketManager* GetInstance() { | |
70 CHECK(device_socket_manager_instance_); | |
71 return device_socket_manager_instance_; | |
72 } | |
73 | |
74 // If there isn't an existing connection to |socket_path|, then opens a | |
75 // connection to |socket_path| and starts listening for data. All listeners | |
76 // for |socket_path| receives data when data is available on the socket. | |
77 void StartListening(const std::string& socket_path, | |
78 size_t data_size, | |
79 DeviceSocketListener* listener); | |
80 | |
81 // Removes |listener| from the list of listeners that receive data from | |
82 // |socket_path|. If this is the last listener, then this closes the | |
83 // connection to the socket. | |
84 void StopListening(const std::string& socket_path, | |
85 DeviceSocketListener* listener); | |
86 | |
87 // Sends data to all the listeners registered to receive data from | |
88 // |socket_path|. | |
89 void OnDataAvailable(const std::string& socket_path, | |
90 const void* data); | |
91 | |
92 // Notifies listeners of errors reading from the socket and closes it. | |
93 void OnError(const std::string& socket_path, int err); | |
94 void OnEOF(const std::string& socket_path); | |
95 | |
96 private: | |
97 friend struct DefaultSingletonTraits<DeviceSocketManager>; | |
98 | |
99 struct SocketData { | |
100 SocketData() | |
101 : fd(-1) { | |
102 } | |
103 | |
104 int fd; | |
105 DeviceSocketListeners observers; | |
106 scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; | |
107 scoped_ptr<DeviceSocketReader> watcher; | |
108 }; | |
109 | |
110 DeviceSocketManager(scoped_refptr<base::TaskRunner> io_task_runner) | |
111 : io_task_runner_(io_task_runner) { | |
112 } | |
113 | |
114 ~DeviceSocketManager() { | |
115 STLDeleteContainerPairSecondPointers(socket_data_.begin(), | |
116 socket_data_.end()); | |
117 } | |
118 | |
119 void StartListeningOnIO(const std::string& socket_path, | |
120 size_t data_size, | |
121 DeviceSocketListener* listener); | |
122 | |
123 void StopListeningOnIO(const std::string& socket_path, | |
124 DeviceSocketListener* listener); | |
125 | |
126 void CloseSocket(const std::string& socket_path); | |
127 | |
128 std::map<std::string, SocketData*> socket_data_; | |
129 scoped_refptr<base::TaskRunner> io_task_runner_; | |
130 | |
131 DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); | |
132 }; | |
133 | |
134 //////////////////////////////////////////////////////////////////////////////// | |
135 // DeviceSocketReader | |
136 | |
137 void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd) { | |
138 ssize_t read_size = recv(fd, data_.get(), data_size_, 0); | |
139 if (read_size < 0) { | |
140 if (errno == EINTR) | |
141 return; | |
142 DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); | |
143 return; | |
144 } | |
145 if (read_size == 0) { | |
146 DeviceSocketManager::GetInstance()->OnEOF(socket_path_); | |
147 return; | |
148 } | |
149 if (read_size != static_cast<ssize_t>(data_size_)) | |
150 return; | |
151 DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, | |
152 data_.get()); | |
153 } | |
154 | |
155 void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd) { | |
156 NOTREACHED(); | |
157 } | |
158 | |
159 //////////////////////////////////////////////////////////////////////////////// | |
160 // DeviceSocketManager | |
161 | |
162 void DeviceSocketManager::StartListening(const std::string& socket_path, | |
163 size_t data_size, | |
164 DeviceSocketListener* listener) { | |
165 io_task_runner_->PostTask(FROM_HERE, | |
166 base::Bind(&DeviceSocketManager::StartListeningOnIO, | |
167 base::Unretained(this), socket_path, data_size, listener)); | |
168 } | |
169 | |
170 void DeviceSocketManager::StopListening(const std::string& socket_path, | |
171 DeviceSocketListener* listener) { | |
172 io_task_runner_->PostTask(FROM_HERE, | |
173 base::Bind(&DeviceSocketManager::StopListeningOnIO, | |
174 base::Unretained(this), socket_path, listener)); | |
175 } | |
176 | |
177 void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, | |
178 const void* data) { | |
179 CHECK_GT(socket_data_.count(socket_path), 0UL); | |
180 DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; | |
181 FOR_EACH_OBSERVER(DeviceSocketListener, listeners, OnDataAvailableOnIO(data)); | |
182 } | |
183 | |
184 void DeviceSocketManager::CloseSocket(const std::string& socket_path) { | |
185 if (!socket_data_.count(socket_path)) | |
186 return; | |
187 SocketData* socket_data = socket_data_[socket_path]; | |
188 close(socket_data->fd); | |
189 delete socket_data; | |
190 socket_data_.erase(socket_path); | |
191 } | |
192 | |
193 void DeviceSocketManager::OnError(const std::string& socket_path, int err) { | |
194 LOG(ERROR) << "Error reading from socket: " << socket_path << ": " | |
195 << strerror(err); | |
196 CloseSocket(socket_path); | |
197 // TODO(flackr): Notify listeners that the socket was closed unexpectedly. | |
198 } | |
199 | |
200 void DeviceSocketManager::OnEOF(const std::string& socket_path) { | |
201 LOG(ERROR) << "EOF reading from socket: " << socket_path; | |
202 CloseSocket(socket_path); | |
203 } | |
204 | |
205 void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, | |
206 size_t data_size, | |
207 DeviceSocketListener* listener) { | |
208 CHECK(io_task_runner_->RunsTasksOnCurrentThread()); | |
209 SocketData* socket_data = NULL; | |
210 if (!socket_data_.count(socket_path)) { | |
211 int socket_fd = -1; | |
212 if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), | |
213 &socket_fd)) { | |
214 LOG(ERROR) << "Error connecting to socket: " << socket_path; | |
215 return; | |
216 } | |
217 | |
218 socket_data = new SocketData; | |
219 socket_data_[socket_path] = socket_data; | |
220 | |
221 socket_data->fd = socket_fd; | |
222 | |
223 socket_data->controller.reset( | |
224 new base::MessagePumpLibevent::FileDescriptorWatcher()); | |
225 socket_data->watcher.reset( | |
226 new DeviceSocketReader(socket_path, data_size)); | |
227 | |
228 base::MessageLoopForIO::current()->WatchFileDescriptor( | |
229 socket_fd, | |
230 true, | |
231 base::MessageLoopForIO::WATCH_READ, | |
232 socket_data->controller.get(), | |
233 socket_data->watcher.get()); | |
234 } else { | |
235 socket_data = socket_data_[socket_path]; | |
236 } | |
237 socket_data->observers.AddObserver(listener); | |
238 } | |
239 | |
240 void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, | |
241 DeviceSocketListener* listener) { | |
242 if (!socket_data_.count(socket_path)) | |
243 return; // Happens if unable to create a socket. | |
244 | |
245 CHECK(io_task_runner_->RunsTasksOnCurrentThread()); | |
246 DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; | |
247 listeners.RemoveObserver(listener); | |
248 if (!listeners.might_have_observers()) { | |
249 // All listeners for this socket has been removed. Close the socket. | |
250 CloseSocket(socket_path); | |
251 } | |
252 } | |
253 | |
254 } // namespace | |
255 | |
256 DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, | |
257 size_t data_size) | |
258 : socket_path_(socket_path), | |
259 data_size_(data_size) { | |
260 } | |
261 | |
262 DeviceSocketListener::~DeviceSocketListener() { | |
263 StopListening(); | |
264 } | |
265 | |
266 // static | |
267 void DeviceSocketListener::CreateSocketManager( | |
268 scoped_refptr<base::TaskRunner> io_task_runner) { | |
269 DeviceSocketManager::Create(io_task_runner); | |
270 } | |
271 | |
272 // static | |
273 void DeviceSocketListener::ShutdownSocketManager() { | |
274 DeviceSocketManager::Shutdown(); | |
275 } | |
276 | |
277 void DeviceSocketListener::StartListening() { | |
278 DeviceSocketManager::GetInstance()->StartListening(socket_path_, | |
279 data_size_, | |
280 this); | |
281 } | |
282 | |
283 void DeviceSocketListener::StopListening() { | |
284 DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); | |
285 } | |
286 | |
287 } // namespace athena | |
OLD | NEW |