Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(163)

Side by Side Diff: ppapi/proxy/ppb_flash_proxy.cc

Issue 10540057: Cleanup - remove ModuleLocalThreadAdapter. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ppapi/proxy/ppb_flash_proxy.h" 5 #include "ppapi/proxy/ppb_flash_proxy.h"
6 6
7 #include <map> 7 #include <map>
8 #include <set> 8 #include <set>
yzshen1 2012/06/07 17:38:34 Remove includes that are not needed any more. For
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/memory/ref_counted.h" 11 #include "base/memory/ref_counted.h"
12 #include "base/message_loop.h" 12 #include "base/message_loop.h"
13 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
14 #include "base/time.h" 14 #include "base/time.h"
15 #include "ipc/ipc_channel_proxy.h" 15 #include "ipc/ipc_channel_proxy.h"
16 #include "ppapi/c/dev/ppb_font_dev.h" 16 #include "ppapi/c/dev/ppb_font_dev.h"
17 #include "ppapi/c/dev/ppb_var_deprecated.h" 17 #include "ppapi/c/dev/ppb_var_deprecated.h"
18 #include "ppapi/c/pp_errors.h" 18 #include "ppapi/c/pp_errors.h"
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 base::PlatformFile file) { 55 base::PlatformFile file) {
56 if (*error != PP_OK) 56 if (*error != PP_OK)
57 return IPC::InvalidPlatformFileForTransit(); 57 return IPC::InvalidPlatformFileForTransit();
58 IPC::PlatformFileForTransit out_handle = 58 IPC::PlatformFileForTransit out_handle =
59 dispatcher->ShareHandleWithRemote(file, true); 59 dispatcher->ShareHandleWithRemote(file, true);
60 if (out_handle == IPC::InvalidPlatformFileForTransit()) 60 if (out_handle == IPC::InvalidPlatformFileForTransit())
61 *error = PP_ERROR_NOACCESS; 61 *error = PP_ERROR_NOACCESS;
62 return out_handle; 62 return out_handle;
63 } 63 }
64 64
65 // ModuleLocalThreadAdapter ----------------------------------------------------
66 // TODO(yzshen): Refactor to use IPC::SyncMessageFilter.
67 class ModuleLocalThreadAdapter
68 : public base::RefCountedThreadSafe<ModuleLocalThreadAdapter> {
69 class Filter;
70 public:
71 ModuleLocalThreadAdapter();
72
73 void AddInstanceRouting(PP_Instance instance, Dispatcher* dispatcher);
74 void ClearInstanceRouting(PP_Instance instance);
75 void ClearFilter(Dispatcher* dispatcher, Filter* filter);
76
77 bool OnModuleLocalMessageReceived(const IPC::Message& msg);
78
79 // Called on the I/O thread when the channel is being destroyed and the
80 // given message will never be issued a reply.
81 void OnModuleLocalMessageFailed(int message_id);
82
83 bool Send(PP_Instance instance, IPC::Message* msg);
84
85 private:
86 class Filter : public IPC::ChannelProxy::MessageFilter {
87 public:
88 explicit Filter(Dispatcher* dispatcher);
89 ~Filter();
90
91 void Send(IPC::Message* msg);
92
93 virtual void OnFilterAdded(IPC::Channel* channel);
94 virtual void OnFilterRemoved();
95 virtual bool OnMessageReceived(const IPC::Message& message);
96
97 private:
98 // DO NOT DEREFERENCE! This is used only for tracking.
99 Dispatcher* dispatcher_;
100
101 IPC::Channel* channel_;
102
103 // Holds the IPC messages that were sent before the channel was connected.
104 // These will be sent ASAP.
105 std::vector<IPC::Message*> pre_connect_pending_messages_;
106
107 // Holds the IDs of the sync messages we're currently waiting on for this
108 // channel. This tracking allows us to cancel those requests if the
109 // remote process crashes and we're cleaning up this filter (without just
110 // deadlocking the waiting thread(s).
111 std::set<int> pending_requests_for_filter_;
112 };
113
114 void SendFromIOThread(Dispatcher* dispatcher, IPC::Message* msg);
115
116 // Internal version of OnModuleLocalMessageFailed which assumes the lock
117 // is already held.
118 void OnModuleLocalMessageFailedLocked(int message_id);
119
120 base::Lock lock_;
121
122 scoped_refptr<base::MessageLoopProxy> main_thread_;
123
124 // Will be NULL before an instance routing is added.
125 scoped_refptr<base::MessageLoopProxy> io_thread_;
126
127 typedef std::map<PP_Instance, Dispatcher*> InstanceToDispatcher;
128 InstanceToDispatcher instance_to_dispatcher_;
129
130 // The filters are owned by the channel.
131 typedef std::map<Dispatcher*, Filter*> DispatcherToFilter;
132 DispatcherToFilter dispatcher_to_filter_;
133
134 // Tracks all messages with currently waiting threads. This does not own
135 // the pointer, the pointer lifetime is managed by Send().
136 typedef std::map<int, IPC::PendingSyncMsg*> SyncRequestMap;
137 SyncRequestMap pending_sync_requests_;
138 };
139
140 ModuleLocalThreadAdapter* g_module_local_thread_adapter = NULL;
141
142 ModuleLocalThreadAdapter::Filter::Filter(Dispatcher* dispatcher)
143 : dispatcher_(dispatcher), channel_(NULL) {
144 }
145
146 ModuleLocalThreadAdapter::Filter::~Filter() {
147 }
148
149 void ModuleLocalThreadAdapter::Filter::Send(IPC::Message* msg) {
150 if (channel_) {
151 int message_id = IPC::SyncMessage::GetMessageId(*msg);
152 if (channel_->Send(msg))
153 pending_requests_for_filter_.insert(message_id);
154 else // Message lost, notify adapter so it can unblock.
155 g_module_local_thread_adapter->OnModuleLocalMessageFailed(message_id);
156 } else {
157 // No channel, save this message for when it's connected.
158 pre_connect_pending_messages_.push_back(msg);
159 }
160 }
161
162 void ModuleLocalThreadAdapter::Filter::OnFilterAdded(IPC::Channel* channel) {
163 DCHECK(!channel_);
164 channel_ = channel;
165
166 // Now that we have a channel, process all pending messages.
167 for (size_t i = 0; i < pre_connect_pending_messages_.size(); i++)
168 Send(pre_connect_pending_messages_[i]);
169 pre_connect_pending_messages_.clear();
170 }
171
172 void ModuleLocalThreadAdapter::Filter::OnFilterRemoved() {
173 DCHECK(channel_);
174 channel_ = NULL;
175 g_module_local_thread_adapter->ClearFilter(dispatcher_, this);
176
177 for (std::set<int>::iterator i = pending_requests_for_filter_.begin();
178 i != pending_requests_for_filter_.end(); ++i) {
179 g_module_local_thread_adapter->OnModuleLocalMessageFailed(*i);
180 }
181 }
182
183 bool ModuleLocalThreadAdapter::Filter::OnMessageReceived(
184 const IPC::Message& message) {
185 if (!message.is_reply() ||
186 message.routing_id() != API_ID_PPB_FLASH)
187 return false;
188
189 if (g_module_local_thread_adapter->OnModuleLocalMessageReceived(message)) {
190 // The message was consumed, this means we can remove the message ID from
191 // the list of messages this channel is waiting on.
192 pending_requests_for_filter_.erase(IPC::SyncMessage::GetMessageId(message));
193 return true;
194 }
195 return false;
196 }
197
198 ModuleLocalThreadAdapter::ModuleLocalThreadAdapter()
199 : main_thread_(base::MessageLoopProxy::current()) {
200 }
201
202 void ModuleLocalThreadAdapter::AddInstanceRouting(PP_Instance instance,
203 Dispatcher* dispatcher) {
204 base::AutoLock lock(lock_);
205
206 // Now that we've had contact with a dispatcher, we can set up the IO thread.
207 DCHECK(main_thread_->BelongsToCurrentThread());
208 if (!io_thread_.get())
209 io_thread_ = dispatcher->GetIPCMessageLoop();
210
211 // Set up the instance -> dispatcher routing.
212 DCHECK(instance_to_dispatcher_.find(instance) ==
213 instance_to_dispatcher_.end());
214 instance_to_dispatcher_[instance] = dispatcher;
215
216 DispatcherToFilter::iterator found_filter =
217 dispatcher_to_filter_.find(dispatcher);
218 if (found_filter == dispatcher_to_filter_.end()) {
219 // Need to set up a filter for this dispatcher to intercept the messages.
220 Filter* filter = new Filter(dispatcher);
221 dispatcher_to_filter_[dispatcher] = filter;
222 dispatcher->AddIOThreadMessageFilter(filter);
223 }
224 }
225
226 void ModuleLocalThreadAdapter::ClearInstanceRouting(PP_Instance instance) {
227 // The dispatcher->filter mapping is cleaned up by ClearFilter which is
228 // initiated by the channel.
229 instance_to_dispatcher_.erase(instance);
230 }
231
232 void ModuleLocalThreadAdapter::ClearFilter(Dispatcher* dispatcher,
233 Filter* filter) {
234 // DANGER! Don't dereference the dispatcher, it's just used to identify
235 // which filter to remove. The dispatcher may not even exist any more.
236 //
237 // Since the dispatcher may be gone, there's a potential for ambiguity if
238 // another one is created on the main thread before this code runs on the
239 // I/O thread. So we check that the filter matches to avoid this rare case.
240 base::AutoLock lock(lock_);
241 if (dispatcher_to_filter_[dispatcher] == filter)
242 dispatcher_to_filter_.erase(dispatcher);
243 }
244
245 bool ModuleLocalThreadAdapter::OnModuleLocalMessageReceived(
246 const IPC::Message& msg) {
247 base::AutoLock lock(lock_);
248
249 int message_id = IPC::SyncMessage::GetMessageId(msg);
250 SyncRequestMap::iterator found = pending_sync_requests_.find(message_id);
251 if (found == pending_sync_requests_.end()) {
252 // Not waiting for this event. This will happen for sync messages to the
253 // main thread which use the "regular" sync channel code path.
254 return false;
255 }
256
257 IPC::PendingSyncMsg& info = *found->second;
258
259 if (!msg.is_reply_error())
260 info.deserializer->SerializeOutputParameters(msg);
261 info.done_event->Signal();
262 return true;
263 }
264
265 void ModuleLocalThreadAdapter::OnModuleLocalMessageFailed(int message_id) {
266 base::AutoLock lock(lock_);
267 OnModuleLocalMessageFailedLocked(message_id);
268 }
269
270 bool ModuleLocalThreadAdapter::Send(PP_Instance instance, IPC::Message* msg) {
271 // Compute the dispatcher corresponding to this message.
272 Dispatcher* dispatcher = NULL;
273 {
274 base::AutoLock lock(lock_);
275 InstanceToDispatcher::iterator found =
276 instance_to_dispatcher_.find(instance);
277 if (found == instance_to_dispatcher_.end()) {
278 NOTREACHED();
279 delete msg;
280 return false;
281 }
282 dispatcher = found->second;
283 }
284
285 if (main_thread_->BelongsToCurrentThread()) {
286 // Easy case: We're on the same thread as the dispatcher, so we don't need
287 // a lock to access it, and we can just use the normal sync channel stuff
288 // to handle the message. Actually, we MUST use the normal sync channel
289 // stuff since there may be incoming sync messages that need processing.
290 // The code below doesn't handle any nested message loops.
291 return dispatcher->Send(msg);
292 }
293
294 // Background thread case
295 // ----------------------
296 // 1. Generate tracking info, stick in pending_sync_messages_map.
297 // 2. Kick off the request. This is done on the I/O thread.
298 // 3. Filter on the I/O thread notices reply, writes the reply data and
299 // signals the event. We block on the event while this is happening.
300 // 4. Remove tracking info.
301
302 // Generate the tracking info. and copied
303 IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(msg);
304 int message_id = IPC::SyncMessage::GetMessageId(*sync_msg);
305 base::WaitableEvent event(true, false);
306 scoped_ptr<IPC::MessageReplyDeserializer> deserializer(
307 sync_msg->GetReplyDeserializer()); // We own this pointer once retrieved.
308 IPC::PendingSyncMsg info(message_id, deserializer.get(), &event);
309
310 // Add the tracking information to our map.
311 {
312 base::AutoLock lock(lock_);
313 pending_sync_requests_[message_id] = &info;
314 }
315
316 // This is a bit dangerous. We use the dispatcher pointer as the routing
317 // ID for this message. While we don't dereference it, there is an
318 // exceedingly remote possibility that while this is going to the background
319 // thread the connection will be shut down and a new one will be created with
320 // a dispatcher at the same address. It could potentially get sent to a
321 // random place, but it should actually still work (since the Flash file
322 // operations are global).
323 io_thread_->PostTask(FROM_HERE,
324 base::Bind(&ModuleLocalThreadAdapter::SendFromIOThread, this,
325 dispatcher, msg));
326
327 // Now we block the current thread waiting for the reply.
328 event.Wait();
329
330 {
331 // Clear our tracking info for this message now that we're done.
332 base::AutoLock lock(lock_);
333 DCHECK(pending_sync_requests_.find(message_id) !=
334 pending_sync_requests_.end());
335 pending_sync_requests_.erase(message_id);
336 }
337
338 return true;
339 }
340
341 void ModuleLocalThreadAdapter::SendFromIOThread(Dispatcher* dispatcher,
342 IPC::Message* msg) {
343 // DO NOT DEREFERENCE DISPATCHER. Used as a lookup only.
344 base::AutoLock lock(lock_);
345 DispatcherToFilter::iterator found = dispatcher_to_filter_.find(dispatcher);
346
347 // The dispatcher could have been destroyed by the time we got here since
348 // we're on another thread. Need to unblock the caller.
349 if (found == dispatcher_to_filter_.end()) {
350 OnModuleLocalMessageFailedLocked(IPC::SyncMessage::GetMessageId(*msg));
351 delete msg;
352 return;
353 }
354
355 // Takes ownership of pointer.
356 found->second->Send(msg);
357 }
358
359 void ModuleLocalThreadAdapter::OnModuleLocalMessageFailedLocked(
360 int message_id) {
361 lock_.AssertAcquired();
362
363 // Unblock the thread waiting for the message that will never come.
364 SyncRequestMap::iterator found = pending_sync_requests_.find(message_id);
365 if (found == pending_sync_requests_.end()) {
366 NOTREACHED();
367 return;
368 }
369 found->second->done_event->Signal();
370 }
371
372 void InvokePrinting(PP_Instance instance) { 65 void InvokePrinting(PP_Instance instance) {
373 PluginDispatcher* dispatcher = PluginDispatcher::GetForInstance(instance); 66 PluginDispatcher* dispatcher = PluginDispatcher::GetForInstance(instance);
374 if (dispatcher) { 67 if (dispatcher) {
375 dispatcher->Send(new PpapiHostMsg_PPBFlash_InvokePrinting( 68 dispatcher->Send(new PpapiHostMsg_PPBFlash_InvokePrinting(
376 API_ID_PPB_FLASH, instance)); 69 API_ID_PPB_FLASH, instance));
377 } 70 }
378 } 71 }
379 72
380 const PPB_Flash_Print_1_0 g_flash_print_interface = { 73 const PPB_Flash_Print_1_0 g_flash_print_interface = {
381 &InvokePrinting 74 &InvokePrinting
(...skipping 269 matching lines...) Expand 10 before | Expand all | Expand 10 after
651 dispatcher()->Send(new PpapiHostMsg_PPBFlash_WriteClipboardData( 344 dispatcher()->Send(new PpapiHostMsg_PPBFlash_WriteClipboardData(
652 API_ID_PPB_FLASH, 345 API_ID_PPB_FLASH,
653 instance, 346 instance,
654 static_cast<int>(clipboard_type), 347 static_cast<int>(clipboard_type),
655 formats_vector, 348 formats_vector,
656 data_items_vector)); 349 data_items_vector));
657 // Assume success, since it allows us to avoid a sync IPC. 350 // Assume success, since it allows us to avoid a sync IPC.
658 return PP_OK; 351 return PP_OK;
659 } 352 }
660 353
661 bool PPB_Flash_Proxy::CreateThreadAdapterForInstance(PP_Instance instance) {
662 if (!g_module_local_thread_adapter) {
663 g_module_local_thread_adapter = new ModuleLocalThreadAdapter();
664 g_module_local_thread_adapter->AddRef(); // Leaked, this object is global.
665 }
666
667 PluginDispatcher* dispatcher = PluginDispatcher::GetForInstance(instance);
668 if (!dispatcher) {
669 NOTREACHED();
670 return false;
671 }
672 g_module_local_thread_adapter->AddInstanceRouting(instance, dispatcher);
673 return true;
674 }
675
676 void PPB_Flash_Proxy::ClearThreadAdapterForInstance(PP_Instance instance) {
677 if (g_module_local_thread_adapter)
678 g_module_local_thread_adapter->ClearInstanceRouting(instance);
679 }
680
681 int32_t PPB_Flash_Proxy::OpenFile(PP_Instance, 354 int32_t PPB_Flash_Proxy::OpenFile(PP_Instance,
682 const char* path, 355 const char* path,
683 int32_t mode, 356 int32_t mode,
684 PP_FileHandle* file) { 357 PP_FileHandle* file) {
685 int flags = 0; 358 int flags = 0;
686 if (!path || 359 if (!path ||
687 !ppapi::PepperFileOpenFlagsToPlatformFileFlags(mode, &flags) || 360 !ppapi::PepperFileOpenFlagsToPlatformFileFlags(mode, &flags) ||
688 !file) 361 !file)
689 return PP_ERROR_BADARGUMENT; 362 return PP_ERROR_BADARGUMENT;
690 363
(...skipping 424 matching lines...) Expand 10 before | Expand all | Expand 10 after
1115 // It's rarely used enough that we just request this interface when needed. 788 // It's rarely used enough that we just request this interface when needed.
1116 const PPB_Flash_Print_1_0* print_interface = 789 const PPB_Flash_Print_1_0* print_interface =
1117 static_cast<const PPB_Flash_Print_1_0*>( 790 static_cast<const PPB_Flash_Print_1_0*>(
1118 dispatcher()->local_get_interface()(PPB_FLASH_PRINT_INTERFACE_1_0)); 791 dispatcher()->local_get_interface()(PPB_FLASH_PRINT_INTERFACE_1_0));
1119 if (print_interface) 792 if (print_interface)
1120 print_interface->InvokePrinting(instance); 793 print_interface->InvokePrinting(instance);
1121 } 794 }
1122 795
1123 } // namespace proxy 796 } // namespace proxy
1124 } // namespace ppapi 797 } // namespace ppapi
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698