OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "athena/system/device_socket_listener.h" | |
6 | |
7 #include <errno.h> | |
8 #include <map> | |
9 #include <sys/socket.h> | |
10 #include <sys/types.h> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/files/file_path.h" | |
15 #include "base/message_loop/message_loop.h" | |
16 #include "base/observer_list.h" | |
17 #include "base/stl_util.h" | |
18 #include "ipc/unix_domain_socket_util.h" | |
19 | |
20 namespace athena { | |
21 | |
22 namespace { | |
23 | |
24 typedef ObserverList<DeviceSocketListener> DeviceSocketListeners; | |
25 | |
26 // Reads from a device socket blocks of a particular size. When that amount of | |
27 // data is read DeviceSocketManager::OnDataAvailable is called on the singleton | |
28 // instance which then informs all of the listeners on that socket. | |
29 class DeviceSocketReader : public base::MessagePumpLibevent::Watcher { | |
30 public: | |
31 DeviceSocketReader(const std::string& socket_path, | |
32 size_t data_size) | |
33 : socket_path_(socket_path), | |
34 data_size_(data_size), | |
35 data_(new char[data_size]) { | |
36 } | |
37 virtual ~DeviceSocketReader() {} | |
38 | |
39 private: | |
40 // Overidden from base::MessagePumpLibevent::Watcher. | |
41 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | |
42 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | |
43 | |
44 std::string socket_path_; | |
45 size_t data_size_; | |
46 scoped_ptr<char[]> data_; | |
47 | |
48 DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader); | |
49 }; | |
50 | |
51 class DeviceSocketManager; | |
52 DeviceSocketManager* device_socket_manager_instance_ = NULL; | |
53 | |
54 // A singleton instance for managing all connections to sockets. | |
55 class DeviceSocketManager { | |
56 public: | |
57 static void Create(scoped_refptr<base::TaskRunner> file_task_runner) { | |
58 device_socket_manager_instance_ = new DeviceSocketManager(file_task_runner); | |
59 } | |
60 | |
61 static void Shutdown() { | |
62 CHECK(device_socket_manager_instance_); | |
63 device_socket_manager_instance_->ScheduleDelete(); | |
64 // Once scheduled to be deleted, no-one should be | |
65 // able to access it. | |
66 device_socket_manager_instance_ = NULL; | |
67 } | |
68 | |
69 static DeviceSocketManager* GetInstanceUnsafe() { | |
70 return device_socket_manager_instance_; | |
71 } | |
72 | |
73 static DeviceSocketManager* GetInstance() { | |
74 CHECK(device_socket_manager_instance_); | |
75 return device_socket_manager_instance_; | |
76 } | |
77 | |
78 // If there isn't an existing connection to |socket_path|, then opens a | |
79 // connection to |socket_path| and starts listening for data. All listeners | |
80 // for |socket_path| receives data when data is available on the socket. | |
81 void StartListening(const std::string& socket_path, | |
82 size_t data_size, | |
83 DeviceSocketListener* listener); | |
84 | |
85 // Removes |listener| from the list of listeners that receive data from | |
86 // |socket_path|. If this is the last listener, then this closes the | |
87 // connection to the socket. | |
88 void StopListening(const std::string& socket_path, | |
89 DeviceSocketListener* listener); | |
90 | |
91 // Sends data to all the listeners registered to receive data from | |
92 // |socket_path|. | |
93 void OnDataAvailable(const std::string& socket_path, | |
94 const void* data); | |
95 | |
96 // Notifies listeners of errors reading from the socket and closes it. | |
97 void OnError(const std::string& socket_path, int err); | |
98 void OnEOF(const std::string& socket_path); | |
99 | |
100 private: | |
101 struct SocketData { | |
102 SocketData() | |
103 : fd(-1) { | |
104 } | |
105 | |
106 int fd; | |
107 DeviceSocketListeners observers; | |
108 scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; | |
109 scoped_ptr<DeviceSocketReader> watcher; | |
110 }; | |
111 | |
112 static void DeleteOnFILE(DeviceSocketManager* manager) { delete manager; } | |
113 | |
114 DeviceSocketManager(scoped_refptr<base::TaskRunner> file_task_runner) | |
115 : file_task_runner_(file_task_runner) {} | |
116 | |
117 ~DeviceSocketManager() { | |
118 STLDeleteContainerPairSecondPointers(socket_data_.begin(), | |
119 socket_data_.end()); | |
120 } | |
121 | |
122 void ScheduleDelete(); | |
123 | |
124 void StartListeningOnFILE(const std::string& socket_path, | |
125 size_t data_size, | |
126 DeviceSocketListener* listener); | |
127 | |
128 void StopListeningOnFILE(const std::string& socket_path, | |
129 DeviceSocketListener* listener); | |
130 | |
131 void CloseSocket(const std::string& socket_path); | |
132 | |
133 std::map<std::string, SocketData*> socket_data_; | |
134 scoped_refptr<base::TaskRunner> file_task_runner_; | |
135 | |
136 DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); | |
137 }; | |
138 | |
139 //////////////////////////////////////////////////////////////////////////////// | |
140 // DeviceSocketReader | |
141 | |
142 void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd) { | |
143 ssize_t read_size = recv(fd, data_.get(), data_size_, 0); | |
144 if (read_size < 0) { | |
145 if (errno == EINTR) | |
146 return; | |
147 DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); | |
148 return; | |
149 } | |
150 if (read_size == 0) { | |
151 DeviceSocketManager::GetInstance()->OnEOF(socket_path_); | |
152 return; | |
153 } | |
154 if (read_size != static_cast<ssize_t>(data_size_)) | |
155 return; | |
156 DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, | |
157 data_.get()); | |
158 } | |
159 | |
160 void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd) { | |
161 NOTREACHED(); | |
162 } | |
163 | |
164 //////////////////////////////////////////////////////////////////////////////// | |
165 // DeviceSocketManager | |
166 | |
167 void DeviceSocketManager::StartListening(const std::string& socket_path, | |
168 size_t data_size, | |
169 DeviceSocketListener* listener) { | |
170 file_task_runner_->PostTask( | |
171 FROM_HERE, | |
172 base::Bind(&DeviceSocketManager::StartListeningOnFILE, | |
173 base::Unretained(this), | |
174 socket_path, | |
175 data_size, | |
176 listener)); | |
177 } | |
178 | |
179 void DeviceSocketManager::StopListening(const std::string& socket_path, | |
180 DeviceSocketListener* listener) { | |
181 file_task_runner_->PostTask( | |
182 FROM_HERE, | |
183 base::Bind(&DeviceSocketManager::StopListeningOnFILE, | |
184 base::Unretained(this), | |
185 socket_path, | |
186 listener)); | |
187 } | |
188 | |
189 void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, | |
190 const void* data) { | |
191 CHECK_GT(socket_data_.count(socket_path), 0UL); | |
192 DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; | |
193 FOR_EACH_OBSERVER( | |
194 DeviceSocketListener, listeners, OnDataAvailableOnFILE(data)); | |
195 } | |
196 | |
197 void DeviceSocketManager::CloseSocket(const std::string& socket_path) { | |
198 if (!socket_data_.count(socket_path)) | |
199 return; | |
200 SocketData* socket_data = socket_data_[socket_path]; | |
201 close(socket_data->fd); | |
202 delete socket_data; | |
203 socket_data_.erase(socket_path); | |
204 } | |
205 | |
206 void DeviceSocketManager::OnError(const std::string& socket_path, int err) { | |
207 LOG(ERROR) << "Error reading from socket: " << socket_path << ": " | |
208 << strerror(err); | |
209 CloseSocket(socket_path); | |
210 // TODO(flackr): Notify listeners that the socket was closed unexpectedly. | |
211 } | |
212 | |
213 void DeviceSocketManager::OnEOF(const std::string& socket_path) { | |
214 LOG(ERROR) << "EOF reading from socket: " << socket_path; | |
215 CloseSocket(socket_path); | |
216 } | |
217 | |
218 void DeviceSocketManager::StartListeningOnFILE(const std::string& socket_path, | |
219 size_t data_size, | |
220 DeviceSocketListener* listener) { | |
221 CHECK(file_task_runner_->RunsTasksOnCurrentThread()); | |
222 SocketData* socket_data = NULL; | |
223 if (!socket_data_.count(socket_path)) { | |
224 int socket_fd = -1; | |
225 if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), | |
226 &socket_fd)) { | |
227 LOG(ERROR) << "Error connecting to socket: " << socket_path; | |
228 return; | |
229 } | |
230 | |
231 socket_data = new SocketData; | |
232 socket_data_[socket_path] = socket_data; | |
233 | |
234 socket_data->fd = socket_fd; | |
235 | |
236 socket_data->controller.reset( | |
237 new base::MessagePumpLibevent::FileDescriptorWatcher()); | |
238 socket_data->watcher.reset( | |
239 new DeviceSocketReader(socket_path, data_size)); | |
240 | |
241 base::MessageLoopForIO::current()->WatchFileDescriptor( | |
242 socket_fd, | |
243 true, | |
244 base::MessageLoopForIO::WATCH_READ, | |
245 socket_data->controller.get(), | |
246 socket_data->watcher.get()); | |
247 } else { | |
248 socket_data = socket_data_[socket_path]; | |
249 } | |
250 socket_data->observers.AddObserver(listener); | |
251 } | |
252 | |
253 void DeviceSocketManager::StopListeningOnFILE(const std::string& socket_path, | |
254 DeviceSocketListener* listener) { | |
255 if (!socket_data_.count(socket_path)) | |
256 return; // Happens if unable to create a socket. | |
257 | |
258 CHECK(file_task_runner_->RunsTasksOnCurrentThread()); | |
259 DeviceSocketListeners& listeners = socket_data_[socket_path]->observers; | |
260 listeners.RemoveObserver(listener); | |
261 if (!listeners.might_have_observers()) { | |
262 // All listeners for this socket has been removed. Close the socket. | |
263 CloseSocket(socket_path); | |
264 } | |
265 } | |
266 | |
267 void DeviceSocketManager::ScheduleDelete() { | |
268 // Schedule a task to delete on FILE thread because | |
269 // there may be a task scheduled on |file_task_runner_|. | |
270 file_task_runner_->PostTask( | |
271 FROM_HERE, | |
272 base::Bind(&DeleteOnFILE, base::Unretained(this))); | |
273 } | |
274 | |
275 } // namespace | |
276 | |
277 DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, | |
278 size_t data_size) | |
279 : socket_path_(socket_path), | |
280 data_size_(data_size) { | |
281 } | |
282 | |
283 DeviceSocketListener::~DeviceSocketListener() { | |
284 StopListening(); | |
285 } | |
286 | |
287 // static | |
288 void DeviceSocketListener::CreateSocketManager( | |
289 scoped_refptr<base::TaskRunner> file_task_runner) { | |
290 DeviceSocketManager::Create(file_task_runner); | |
291 } | |
292 | |
293 // static | |
294 void DeviceSocketListener::ShutdownSocketManager() { | |
295 DeviceSocketManager::Shutdown(); | |
296 } | |
297 | |
298 void DeviceSocketListener::StartListening() { | |
299 DeviceSocketManager::GetInstance()->StartListening(socket_path_, | |
300 data_size_, | |
301 this); | |
302 } | |
303 | |
304 void DeviceSocketListener::StopListening() { | |
305 DeviceSocketManager* instance = DeviceSocketManager::GetInstanceUnsafe(); | |
306 if (instance) | |
307 instance->StopListening(socket_path_, this); | |
308 } | |
309 | |
310 } // namespace athena | |
OLD | NEW |