OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "chrome/browser/extensions/api/sockets_tcp/tcp_socket_event_dispatcher.
h" | |
6 | |
7 #include "chrome/browser/extensions/api/socket/tcp_socket.h" | |
8 #include "extensions/browser/event_router.h" | |
9 #include "extensions/browser/extension_system.h" | |
10 #include "extensions/browser/extensions_browser_client.h" | |
11 #include "net/base/net_errors.h" | |
12 | |
13 namespace { | |
14 int kDefaultBufferSize = 4096; | |
15 } | |
16 | |
17 namespace extensions { | |
18 namespace api { | |
19 | |
20 using content::BrowserThread; | |
21 | |
22 static base::LazyInstance< | |
23 BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher> > g_factory = | |
24 LAZY_INSTANCE_INITIALIZER; | |
25 | |
26 // static | |
27 BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher>* | |
28 TCPSocketEventDispatcher::GetFactoryInstance() { | |
29 return g_factory.Pointer(); | |
30 } | |
31 | |
32 // static | |
33 TCPSocketEventDispatcher* TCPSocketEventDispatcher::Get( | |
34 content::BrowserContext* context) { | |
35 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
36 | |
37 return BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher>::Get(context); | |
38 } | |
39 | |
40 TCPSocketEventDispatcher::TCPSocketEventDispatcher( | |
41 content::BrowserContext* context) | |
42 : thread_id_(Socket::kThreadId), browser_context_(context) { | |
43 ApiResourceManager<ResumableTCPSocket>* manager = | |
44 ApiResourceManager<ResumableTCPSocket>::Get(browser_context_); | |
45 DCHECK(manager) << "There is no socket manager. " | |
46 "If this assertion is failing during a test, then it is likely that " | |
47 "TestExtensionSystem is failing to provide an instance of " | |
48 "ApiResourceManager<ResumableTCPSocket>."; | |
49 sockets_ = manager->data_; | |
50 } | |
51 | |
52 TCPSocketEventDispatcher::~TCPSocketEventDispatcher() {} | |
53 | |
54 TCPSocketEventDispatcher::ReadParams::ReadParams() {} | |
55 | |
56 TCPSocketEventDispatcher::ReadParams::~ReadParams() {} | |
57 | |
58 void TCPSocketEventDispatcher::OnSocketConnect(const std::string& extension_id, | |
59 int socket_id) { | |
60 DCHECK(BrowserThread::CurrentlyOn(thread_id_)); | |
61 | |
62 StartSocketRead(extension_id, socket_id); | |
63 } | |
64 | |
65 void TCPSocketEventDispatcher::OnSocketResume(const std::string& extension_id, | |
66 int socket_id) { | |
67 DCHECK(BrowserThread::CurrentlyOn(thread_id_)); | |
68 | |
69 StartSocketRead(extension_id, socket_id); | |
70 } | |
71 | |
72 void TCPSocketEventDispatcher::StartSocketRead(const std::string& extension_id, | |
73 int socket_id) { | |
74 DCHECK(BrowserThread::CurrentlyOn(thread_id_)); | |
75 | |
76 ReadParams params; | |
77 params.thread_id = thread_id_; | |
78 params.browser_context_id = browser_context_; | |
79 params.extension_id = extension_id; | |
80 params.sockets = sockets_; | |
81 params.socket_id = socket_id; | |
82 | |
83 StartRead(params); | |
84 } | |
85 | |
86 // static | |
87 void TCPSocketEventDispatcher::StartRead(const ReadParams& params) { | |
88 DCHECK(BrowserThread::CurrentlyOn(params.thread_id)); | |
89 | |
90 ResumableTCPSocket* socket = | |
91 params.sockets->Get(params.extension_id, params.socket_id); | |
92 if (!socket) { | |
93 // This can happen if the socket is closed while our callback is active. | |
94 return; | |
95 } | |
96 DCHECK(params.extension_id == socket->owner_extension_id()) | |
97 << "Socket has wrong owner."; | |
98 | |
99 // Don't start another read if the socket has been paused. | |
100 if (socket->paused()) | |
101 return; | |
102 | |
103 int buffer_size = socket->buffer_size(); | |
104 if (buffer_size <= 0) | |
105 buffer_size = kDefaultBufferSize; | |
106 socket->Read(buffer_size, | |
107 base::Bind(&TCPSocketEventDispatcher::ReadCallback, | |
108 params)); | |
109 } | |
110 | |
111 // static | |
112 void TCPSocketEventDispatcher::ReadCallback( | |
113 const ReadParams& params, | |
114 int bytes_read, | |
115 scoped_refptr<net::IOBuffer> io_buffer) { | |
116 DCHECK(BrowserThread::CurrentlyOn(params.thread_id)); | |
117 | |
118 // If |bytes_read| == 0, the connection has been closed by the peer. | |
119 // If |bytes_read| < 0, there was a network error, and |bytes_read| is a value | |
120 // from "net::ERR_". | |
121 | |
122 if (bytes_read == 0) { | |
123 bytes_read = net::ERR_CONNECTION_CLOSED; | |
124 } | |
125 | |
126 if (bytes_read > 0) { | |
127 // Dispatch "onReceive" event. | |
128 sockets_tcp::ReceiveInfo receive_info; | |
129 receive_info.socket_id = params.socket_id; | |
130 receive_info.data = std::string(io_buffer->data(), bytes_read); | |
131 scoped_ptr<base::ListValue> args = | |
132 sockets_tcp::OnReceive::Create(receive_info); | |
133 scoped_ptr<Event> event( | |
134 new Event(sockets_tcp::OnReceive::kEventName, args.Pass())); | |
135 PostEvent(params, event.Pass()); | |
136 | |
137 // Post a task to delay the read until the socket is available, as | |
138 // calling StartReceive at this point would error with ERR_IO_PENDING. | |
139 BrowserThread::PostTask( | |
140 params.thread_id, FROM_HERE, | |
141 base::Bind(&TCPSocketEventDispatcher::StartRead, params)); | |
142 } else if (bytes_read == net::ERR_IO_PENDING) { | |
143 // This happens when resuming a socket which already had an | |
144 // active "read" callback. | |
145 } else { | |
146 // Dispatch "onReceiveError" event but don't start another read to avoid | |
147 // potential infinite reads if we have a persistent network error. | |
148 sockets_tcp::ReceiveErrorInfo receive_error_info; | |
149 receive_error_info.socket_id = params.socket_id; | |
150 receive_error_info.result_code = bytes_read; | |
151 scoped_ptr<base::ListValue> args = | |
152 sockets_tcp::OnReceiveError::Create(receive_error_info); | |
153 scoped_ptr<Event> event( | |
154 new Event(sockets_tcp::OnReceiveError::kEventName, args.Pass())); | |
155 PostEvent(params, event.Pass()); | |
156 | |
157 // Since we got an error, the socket is now "paused" until the application | |
158 // "resumes" it. | |
159 ResumableTCPSocket* socket = | |
160 params.sockets->Get(params.extension_id, params.socket_id); | |
161 if (socket) { | |
162 socket->set_paused(true); | |
163 } | |
164 } | |
165 } | |
166 | |
167 // static | |
168 void TCPSocketEventDispatcher::PostEvent(const ReadParams& params, | |
169 scoped_ptr<Event> event) { | |
170 DCHECK(BrowserThread::CurrentlyOn(params.thread_id)); | |
171 | |
172 BrowserThread::PostTask(BrowserThread::UI, | |
173 FROM_HERE, | |
174 base::Bind(&DispatchEvent, | |
175 params.browser_context_id, | |
176 params.extension_id, | |
177 base::Passed(event.Pass()))); | |
178 } | |
179 | |
180 // static | |
181 void TCPSocketEventDispatcher::DispatchEvent(void* browser_context_id, | |
182 const std::string& extension_id, | |
183 scoped_ptr<Event> event) { | |
184 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); | |
185 | |
186 content::BrowserContext* context = | |
187 reinterpret_cast<content::BrowserContext*>(browser_context_id); | |
188 if (!extensions::ExtensionsBrowserClient::Get()->IsValidContext(context)) | |
189 return; | |
190 | |
191 EventRouter* router = ExtensionSystem::Get(context)->event_router(); | |
192 if (router) | |
193 router->DispatchEventToExtension(extension_id, event.Pass()); | |
194 } | |
195 | |
196 } // namespace api | |
197 } // namespace extensions | |
OLD | NEW |