OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. | |
oshima
2014/08/01 17:15:11
2014
flackr
2014/08/06 14:13:51
Done.
| |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "athena/system/device_socket_listener.h" | |
6 | |
7 #include <errno.h> | |
8 #include <map> | |
9 #include <sys/socket.h> | |
10 #include <sys/types.h> | |
11 #include <vector> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/files/file_path.h" | |
15 #include "base/memory/singleton.h" | |
16 #include "base/message_loop/message_loop.h" | |
17 #include "base/stl_util.h" | |
18 #include "content/public/browser/browser_thread.h" | |
19 #include "ipc/unix_domain_socket_util.h" | |
20 | |
21 namespace athena { | |
22 | |
23 namespace { | |
24 | |
25 typedef std::vector<DeviceSocketListener*> DeviceSocketListeners; | |
oshima
2014/08/01 17:15:11
ObserverList<DeviceSocketListener> may help?
flackr
2014/08/06 14:13:51
Done.
| |
26 | |
27 class DeviceSocketWatcher : public base::MessagePumpLibevent::Watcher { | |
oshima
2014/08/01 17:15:11
document this class, and should this be called Rea
flackr
2014/08/06 14:13:51
Done.
| |
28 public: | |
29 DeviceSocketWatcher(const std::string& socket_path, | |
30 size_t data_size) | |
31 : socket_path_(socket_path), | |
32 data_size_(data_size), | |
33 data_(new char[data_size]) { | |
34 } | |
35 virtual ~DeviceSocketWatcher() {} | |
36 | |
37 private: | |
38 // Overidden from base::MessagePumpLibevent::Watcher. | |
39 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; | |
40 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; | |
41 | |
42 std::string socket_path_; | |
43 size_t data_size_; | |
44 scoped_ptr<char[]> data_; | |
oshima
2014/08/01 17:15:11
there is a StringPiece class. since it's small, I'
| |
45 | |
46 DISALLOW_COPY_AND_ASSIGN(DeviceSocketWatcher); | |
47 }; | |
48 | |
49 // A singleton instance for managing all connections to sockets. | |
50 class DeviceSocketManager { | |
51 public: | |
52 static DeviceSocketManager* GetInstance() { | |
53 return Singleton<DeviceSocketManager>::get(); | |
54 } | |
55 | |
56 // If there isn't an existing connection to |socket_path|, then opens a | |
57 // connection to |socket_path| and starts listening for data. All listeners | |
58 // for |socket_path| receives data when data is available on the socket. | |
59 void StartListening(const std::string& socket_path, | |
60 size_t data_size, | |
61 DeviceSocketListener* listener); | |
62 | |
63 // Removes |listener| from the list of listeners that receive data from | |
64 // |socket_path|. If this is the last listener, then this closes the | |
65 // connection to the socket. | |
66 void StopListening(const std::string& socket_path, | |
67 DeviceSocketListener* listener); | |
68 | |
69 // Sends data to all the listeners registered to receive data from | |
70 // |socket_path|. | |
71 void OnDataAvailable(const std::string& socket_path, | |
72 const void* data); | |
73 | |
74 // Notifies listeners of errors reading from the socket and closes it. | |
75 void OnError(const std::string& socket_path, int err); | |
76 void OnEOF(const std::string& socket_path); | |
77 | |
78 private: | |
79 friend struct DefaultSingletonTraits<DeviceSocketManager>; | |
80 | |
81 struct SocketData { | |
82 SocketData() | |
83 : fd(-1) { | |
84 } | |
85 | |
86 int fd; | |
87 DeviceSocketListeners listeners; | |
88 scoped_ptr<base::MessagePumpLibevent::FileDescriptorWatcher> controller; | |
89 scoped_ptr<DeviceSocketWatcher> watcher; | |
90 }; | |
91 | |
92 DeviceSocketManager() { | |
93 } | |
94 | |
95 ~DeviceSocketManager() { | |
96 STLDeleteContainerPairSecondPointers(socket_data_.begin(), | |
97 socket_data_.end()); | |
98 } | |
99 | |
100 void StartListeningOnIO(const std::string& socket_path, | |
101 size_t data_size, | |
102 DeviceSocketListener* listener); | |
103 | |
104 void StopListeningOnIO(const std::string& socket_path, | |
105 DeviceSocketListener* listener); | |
106 | |
107 void CloseSocket(const std::string& socket_path); | |
108 | |
109 std::map<std::string, SocketData*> socket_data_; | |
110 | |
111 DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager); | |
112 }; | |
113 | |
114 //////////////////////////////////////////////////////////////////////////////// | |
115 // DeviceSocketWatcher | |
116 | |
117 void DeviceSocketWatcher::OnFileCanReadWithoutBlocking(int fd) { | |
118 ssize_t read_size = recv(fd, data_.get(), data_size_, 0); | |
119 if (read_size < 0) { | |
120 if (errno == EINTR) | |
121 return; | |
122 DeviceSocketManager::GetInstance()->OnError(socket_path_, errno); | |
123 return; | |
124 } | |
125 if (read_size == 0) { | |
126 DeviceSocketManager::GetInstance()->OnEOF(socket_path_); | |
127 return; | |
128 } | |
129 if (read_size != static_cast<ssize_t>(data_size_)) | |
130 return; | |
131 DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_, | |
132 data_.get()); | |
133 } | |
134 | |
135 void DeviceSocketWatcher::OnFileCanWriteWithoutBlocking(int fd) { | |
136 NOTREACHED(); | |
137 } | |
138 | |
139 //////////////////////////////////////////////////////////////////////////////// | |
140 // DeviceSocketManager | |
141 | |
142 void DeviceSocketManager::StartListening(const std::string& socket_path, | |
143 size_t data_size, | |
144 DeviceSocketListener* listener) { | |
145 content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, | |
146 base::Bind(&DeviceSocketManager::StartListeningOnIO, | |
147 base::Unretained(this), socket_path, data_size, listener)); | |
148 } | |
149 | |
150 void DeviceSocketManager::StopListening(const std::string& socket_path, | |
151 DeviceSocketListener* listener) { | |
152 content::BrowserThread::PostTask(content::BrowserThread::IO, FROM_HERE, | |
153 base::Bind(&DeviceSocketManager::StopListeningOnIO, | |
154 base::Unretained(this), socket_path, listener)); | |
155 } | |
156 | |
157 void DeviceSocketManager::OnDataAvailable(const std::string& socket_path, | |
158 const void* data) { | |
159 CHECK_GT(socket_data_.count(socket_path), 0UL); | |
160 DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; | |
161 DeviceSocketListeners::iterator i = listeners.begin(); | |
162 for (; i != listeners.end(); ++i) { | |
163 (*i)->OnDataAvailableOnIO(data); | |
164 } | |
165 } | |
166 | |
167 void DeviceSocketManager::CloseSocket(const std::string& socket_path) { | |
168 if (!socket_data_.count(socket_path)) | |
169 return; | |
170 SocketData* socket_data = socket_data_[socket_path]; | |
171 close(socket_data->fd); | |
172 delete socket_data; | |
173 socket_data_.erase(socket_path); | |
174 } | |
175 | |
176 void DeviceSocketManager::OnError(const std::string& socket_path, int err) { | |
177 LOG(ERROR) << "Error reading from socket: " << socket_path << ": " | |
178 << strerror(err); | |
179 CloseSocket(socket_path); | |
oshima
2014/08/01 17:15:11
probably not now, but maybe useful to notify liste
flackr
2014/08/06 14:13:51
Added TODO.
| |
180 } | |
181 | |
182 void DeviceSocketManager::OnEOF(const std::string& socket_path) { | |
183 LOG(ERROR) << "EOF reading from socket: " << socket_path; | |
184 CloseSocket(socket_path); | |
185 } | |
186 | |
187 void DeviceSocketManager::StartListeningOnIO(const std::string& socket_path, | |
188 size_t data_size, | |
189 DeviceSocketListener* listener) { | |
190 CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); | |
191 SocketData* socket_data = NULL; | |
192 if (!socket_data_.count(socket_path)) { | |
193 int socket_fd = -1; | |
194 if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path), | |
195 &socket_fd)) { | |
196 LOG(ERROR) << "Error connecting to socket: " << socket_path; | |
197 return; | |
198 } | |
199 | |
200 socket_data = new SocketData; | |
201 socket_data_[socket_path] = socket_data; | |
202 | |
203 socket_data->fd = socket_fd; | |
204 | |
205 socket_data->controller.reset( | |
206 new base::MessagePumpLibevent::FileDescriptorWatcher()); | |
207 socket_data->watcher.reset( | |
208 new DeviceSocketWatcher(socket_path, data_size)); | |
209 | |
210 base::MessageLoopForIO::current()->WatchFileDescriptor( | |
211 socket_fd, | |
212 true, | |
213 base::MessageLoopForIO::WATCH_READ, | |
214 socket_data->controller.get(), | |
215 socket_data->watcher.get()); | |
216 } else { | |
217 socket_data = socket_data_[socket_path]; | |
218 } | |
219 | |
220 socket_data->listeners.push_back(listener); | |
221 } | |
222 | |
223 void DeviceSocketManager::StopListeningOnIO(const std::string& socket_path, | |
224 DeviceSocketListener* listener) { | |
225 if (!socket_data_.count(socket_path)) | |
226 return; // Happens if unable to create a socket. | |
227 | |
228 CHECK(content::BrowserThread::CurrentlyOn(content::BrowserThread::IO)); | |
229 DeviceSocketListeners& listeners = socket_data_[socket_path]->listeners; | |
230 DeviceSocketListeners::iterator i = | |
231 std::find(listeners.begin(), listeners.end(), listener); | |
232 if (i != listeners.end()) { | |
233 listeners.erase(i); | |
234 | |
235 if (listeners.size() == 0) { | |
236 // All listeners for this socket has been removed. Close the socket. | |
237 CloseSocket(socket_path); | |
238 } | |
239 } | |
240 } | |
241 | |
242 } // namespace | |
243 | |
244 DeviceSocketListener::DeviceSocketListener(const std::string& socket_path, | |
245 size_t data_size) | |
246 : socket_path_(socket_path), | |
247 data_size_(data_size) { | |
248 } | |
249 | |
250 DeviceSocketListener::~DeviceSocketListener() { | |
251 StopListening(); | |
252 } | |
253 | |
254 void DeviceSocketListener::StartListening() { | |
255 DeviceSocketManager::GetInstance()->StartListening(socket_path_, | |
256 data_size_, | |
257 this); | |
258 } | |
259 | |
260 void DeviceSocketListener::StopListening() { | |
261 DeviceSocketManager::GetInstance()->StopListening(socket_path_, this); | |
262 } | |
263 | |
264 } // namespace athena | |
OLD | NEW |