OLD | NEW |
1 /* | 1 /* |
2 * libjingle | 2 * libjingle |
3 * Copyright 2004--2005, Google Inc. | 3 * Copyright 2004--2005, Google Inc. |
4 * | 4 * |
5 * Redistribution and use in source and binary forms, with or without | 5 * Redistribution and use in source and binary forms, with or without |
6 * modification, are permitted provided that the following conditions are met: | 6 * modification, are permitted provided that the following conditions are met: |
7 * | 7 * |
8 * 1. Redistributions of source code must retain the above copyright notice, | 8 * 1. Redistributions of source code must retain the above copyright notice, |
9 * this list of conditions and the following disclaimer. | 9 * this list of conditions and the following disclaimer. |
10 * 2. Redistributions in binary form must reproduce the above copyright notice, | 10 * 2. Redistributions in binary form must reproduce the above copyright notice, |
(...skipping 26 matching lines...) Expand all Loading... |
37 #include "talk/base/logging.h" | 37 #include "talk/base/logging.h" |
38 #include "talk/base/messagequeue.h" | 38 #include "talk/base/messagequeue.h" |
39 #include "talk/base/physicalsocketserver.h" | 39 #include "talk/base/physicalsocketserver.h" |
40 | 40 |
41 | 41 |
42 namespace talk_base { | 42 namespace talk_base { |
43 | 43 |
44 const uint32 kMaxMsgLatency = 150; // 150 ms | 44 const uint32 kMaxMsgLatency = 150; // 150 ms |
45 | 45 |
46 //------------------------------------------------------------------ | 46 //------------------------------------------------------------------ |
| 47 // DummySocketServer |
| 48 |
| 49 class DummySocketServer : public talk_base::SocketServer { |
| 50 virtual bool Wait(int cms, bool process_io) OVERRIDE { |
| 51 return false; |
| 52 } |
| 53 |
| 54 virtual void WakeUp() OVERRIDE { |
| 55 } |
| 56 |
| 57 virtual talk_base::Socket* CreateSocket(int type) OVERRIDE { |
| 58 return NULL; |
| 59 } |
| 60 |
| 61 // Returns a new socket for nonblocking communication. The type can be |
| 62 // SOCK_DGRAM and SOCK_STREAM. |
| 63 virtual talk_base::AsyncSocket* CreateAsyncSocket(int type) OVERRIDE { |
| 64 return NULL; |
| 65 } |
| 66 }; |
| 67 |
| 68 //------------------------------------------------------------------ |
47 // MessageQueueManager | 69 // MessageQueueManager |
48 | 70 |
49 MessageQueueManager* MessageQueueManager::instance_; | 71 MessageQueueManager* MessageQueueManager::instance_; |
50 | 72 |
51 MessageQueueManager* MessageQueueManager::Instance() { | 73 MessageQueueManager* MessageQueueManager::Instance() { |
52 // Note: This is not thread safe, but it is first called before threads are | 74 // Note: This is not thread safe, but it is first called before threads are |
53 // spawned. | 75 // spawned. |
54 if (!instance_) | 76 if (!instance_) |
55 instance_ = new MessageQueueManager; | 77 instance_ = new MessageQueueManager; |
56 return instance_; | 78 return instance_; |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
98 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. | 120 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
99 CritScope cs(&crit_); | 121 CritScope cs(&crit_); |
100 std::vector<MessageQueue *>::iterator iter; | 122 std::vector<MessageQueue *>::iterator iter; |
101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 123 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
102 (*iter)->Clear(handler); | 124 (*iter)->Clear(handler); |
103 } | 125 } |
104 | 126 |
105 //------------------------------------------------------------------ | 127 //------------------------------------------------------------------ |
106 // MessageQueue | 128 // MessageQueue |
107 | 129 |
108 MessageQueue::MessageQueue(SocketServer* ss) | 130 MessageQueue::MessageQueue() { |
109 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), | 131 // Currently, MessageQueue holds a socket server, and is the base class for |
110 dmsgq_next_num_(0) { | 132 // Thread. It seems like it makes more sense for Thread to hold the socket |
111 if (!ss_) { | 133 // server, and provide it to the MessageQueue, since the Thread controls |
112 // Currently, MessageQueue holds a socket server, and is the base class for | 134 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
113 // Thread. It seems like it makes more sense for Thread to hold the socket | 135 // messagequeue_unittest to depend on network libraries... yuck. |
114 // server, and provide it to the MessageQueue, since the Thread controls | 136 default_ss_.reset(new PhysicalSocketServer()); |
115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 137 Construct(); |
116 // messagequeue_unittest to depend on network libraries... yuck. | 138 } |
117 default_ss_.reset(new PhysicalSocketServer()); | 139 |
118 ss_ = default_ss_.get(); | 140 MessageQueue::MessageQueue(SocketServer* ss) { |
| 141 if (!ss) { |
| 142 default_ss_.reset(new DummySocketServer()); |
119 } | 143 } |
| 144 Construct(); |
| 145 } |
| 146 |
| 147 void MessageQueue::Construct() { |
| 148 fStop_ = false; |
| 149 fPeekKeep_ = false; |
| 150 active_ = false; |
| 151 dmsgq_next_num_ = 0; |
| 152 ss_ = default_ss_.get(); |
120 ss_->SetMessageQueue(this); | 153 ss_->SetMessageQueue(this); |
121 } | 154 } |
122 | 155 |
123 MessageQueue::~MessageQueue() { | 156 MessageQueue::~MessageQueue() { |
124 // The signal is done from here to ensure | 157 // The signal is done from here to ensure |
125 // that it always gets called when the queue | 158 // that it always gets called when the queue |
126 // is going away. | 159 // is going away. |
127 SignalQueueDestroyed(); | 160 SignalQueueDestroyed(); |
128 if (active_) { | 161 if (active_) { |
129 MessageQueueManager::Instance()->Remove(this); | 162 MessageQueueManager::Instance()->Remove(this); |
(...skipping 244 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
374 | 407 |
375 void MessageQueue::EnsureActive() { | 408 void MessageQueue::EnsureActive() { |
376 ASSERT(crit_.CurrentThreadIsOwner()); | 409 ASSERT(crit_.CurrentThreadIsOwner()); |
377 if (!active_) { | 410 if (!active_) { |
378 active_ = true; | 411 active_ = true; |
379 MessageQueueManager::Instance()->Add(this); | 412 MessageQueueManager::Instance()->Add(this); |
380 } | 413 } |
381 } | 414 } |
382 | 415 |
383 } // namespace talk_base | 416 } // namespace talk_base |
OLD | NEW |