OLD | NEW |
1 /* | 1 /* |
2 * libjingle | 2 * libjingle |
3 * Copyright 2004--2005, Google Inc. | 3 * Copyright 2004--2005, Google Inc. |
4 * | 4 * |
5 * Redistribution and use in source and binary forms, with or without | 5 * Redistribution and use in source and binary forms, with or without |
6 * modification, are permitted provided that the following conditions are met: | 6 * modification, are permitted provided that the following conditions are met: |
7 * | 7 * |
8 * 1. Redistributions of source code must retain the above copyright notice, | 8 * 1. Redistributions of source code must retain the above copyright notice, |
9 * this list of conditions and the following disclaimer. | 9 * this list of conditions and the following disclaimer. |
10 * 2. Redistributions in binary form must reproduce the above copyright notice, | 10 * 2. Redistributions in binary form must reproduce the above copyright notice, |
(...skipping 16 matching lines...) Expand all Loading... |
27 | 27 |
28 #if defined(_MSC_VER) && _MSC_VER < 1300 | 28 #if defined(_MSC_VER) && _MSC_VER < 1300 |
29 #pragma warning(disable:4786) | 29 #pragma warning(disable:4786) |
30 #endif | 30 #endif |
31 | 31 |
32 #ifdef POSIX | 32 #ifdef POSIX |
33 #include <sys/time.h> | 33 #include <sys/time.h> |
34 #endif | 34 #endif |
35 | 35 |
36 #include "talk/base/common.h" | 36 #include "talk/base/common.h" |
| 37 #include "talk/base/event.h" |
37 #include "talk/base/logging.h" | 38 #include "talk/base/logging.h" |
38 #include "talk/base/messagequeue.h" | 39 #include "talk/base/messagequeue.h" |
39 #include "talk/base/physicalsocketserver.h" | 40 #include "talk/base/physicalsocketserver.h" |
40 | 41 |
| 42 namespace { |
| 43 //------------------------------------------------------------------ |
| 44 // NullSocketServer |
| 45 |
| 46 class NullSocketServer : public talk_base::SocketServer { |
| 47 public: |
| 48 NullSocketServer() : event_(false, false) {} |
| 49 |
| 50 virtual bool Wait(int cms, bool process_io) { |
| 51 return event_.Wait(talk_base::kForever); |
| 52 } |
| 53 |
| 54 virtual void WakeUp() { |
| 55 event_.Set(); |
| 56 } |
| 57 |
| 58 virtual talk_base::Socket* CreateSocket(int type) { |
| 59 ASSERT(false); |
| 60 return NULL; |
| 61 } |
| 62 |
| 63 // Returns a new socket for nonblocking communication. The type can be |
| 64 // SOCK_DGRAM and/or SOCK_STREAM. |
| 65 virtual talk_base::AsyncSocket* CreateAsyncSocket(int type) { |
| 66 ASSERT(false); |
| 67 return NULL; |
| 68 } |
| 69 |
| 70 private: |
| 71 talk_base::Event event_; |
| 72 }; |
| 73 |
| 74 } // namespace |
41 | 75 |
42 namespace talk_base { | 76 namespace talk_base { |
43 | 77 |
44 const uint32 kMaxMsgLatency = 150; // 150 ms | 78 const uint32 kMaxMsgLatency = 150; // 150 ms |
45 | 79 |
46 //------------------------------------------------------------------ | 80 //------------------------------------------------------------------ |
47 // MessageQueueManager | 81 // MessageQueueManager |
48 | 82 |
49 MessageQueueManager* MessageQueueManager::instance_; | 83 MessageQueueManager* MessageQueueManager::instance_; |
50 | 84 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
98 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. | 132 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
99 CritScope cs(&crit_); | 133 CritScope cs(&crit_); |
100 std::vector<MessageQueue *>::iterator iter; | 134 std::vector<MessageQueue *>::iterator iter; |
101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 135 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
102 (*iter)->Clear(handler); | 136 (*iter)->Clear(handler); |
103 } | 137 } |
104 | 138 |
105 //------------------------------------------------------------------ | 139 //------------------------------------------------------------------ |
106 // MessageQueue | 140 // MessageQueue |
107 | 141 |
108 MessageQueue::MessageQueue(SocketServer* ss) | 142 MessageQueue::MessageQueue() { |
109 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), | 143 // TODO(ronghuawu): |
110 dmsgq_next_num_(0) { | 144 // Currently, MessageQueue holds a socket server, and is the base class for |
111 if (!ss_) { | 145 // Thread. It seems like it makes more sense for Thread to hold the socket |
112 // Currently, MessageQueue holds a socket server, and is the base class for | 146 // server, and provide it to the MessageQueue, since the Thread controls |
113 // Thread. It seems like it makes more sense for Thread to hold the socket | 147 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
114 // server, and provide it to the MessageQueue, since the Thread controls | 148 // messagequeue_unittest to depend on network libraries... yuck. |
115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 149 owned_ss_.reset(new PhysicalSocketServer()); |
116 // messagequeue_unittest to depend on network libraries... yuck. | 150 ss_ = owned_ss_.get(); |
117 default_ss_.reset(new PhysicalSocketServer()); | 151 Construct(); |
118 ss_ = default_ss_.get(); | 152 } |
| 153 |
| 154 MessageQueue::MessageQueue(SocketServer* ss) { |
| 155 if (ss) { |
| 156 ss_ = ss; |
| 157 } else { |
| 158 owned_ss_.reset(new NullSocketServer()); |
| 159 ss_ = owned_ss_.get(); |
119 } | 160 } |
| 161 Construct(); |
| 162 } |
| 163 |
| 164 void MessageQueue::Construct() { |
| 165 fStop_ = false; |
| 166 fPeekKeep_ = false; |
| 167 active_ = false; |
| 168 dmsgq_next_num_ = 0; |
120 ss_->SetMessageQueue(this); | 169 ss_->SetMessageQueue(this); |
121 } | 170 } |
122 | 171 |
123 MessageQueue::~MessageQueue() { | 172 MessageQueue::~MessageQueue() { |
124 // The signal is done from here to ensure | 173 // The signal is done from here to ensure |
125 // that it always gets called when the queue | 174 // that it always gets called when the queue |
126 // is going away. | 175 // is going away. |
127 SignalQueueDestroyed(); | 176 SignalQueueDestroyed(); |
128 if (active_) { | 177 if (active_) { |
129 MessageQueueManager::Instance()->Remove(this); | 178 MessageQueueManager::Instance()->Remove(this); |
130 Clear(NULL); | 179 Clear(NULL); |
131 } | 180 } |
132 if (ss_) { | 181 if (ss_) { |
133 ss_->SetMessageQueue(NULL); | 182 ss_->SetMessageQueue(NULL); |
134 } | 183 } |
135 } | 184 } |
136 | 185 |
137 void MessageQueue::set_socketserver(SocketServer* ss) { | 186 void MessageQueue::set_socketserver(SocketServer* ss) { |
138 ss_ = ss ? ss : default_ss_.get(); | 187 ss_ = ss ? ss : owned_ss_.get(); |
139 ss_->SetMessageQueue(this); | 188 ss_->SetMessageQueue(this); |
140 } | 189 } |
141 | 190 |
142 void MessageQueue::Quit() { | 191 void MessageQueue::Quit() { |
143 fStop_ = true; | 192 fStop_ = true; |
144 ss_->WakeUp(); | 193 ss_->WakeUp(); |
145 } | 194 } |
146 | 195 |
147 bool MessageQueue::IsQuitting() { | 196 bool MessageQueue::IsQuitting() { |
148 return fStop_; | 197 return fStop_; |
(...skipping 225 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
374 | 423 |
375 void MessageQueue::EnsureActive() { | 424 void MessageQueue::EnsureActive() { |
376 ASSERT(crit_.CurrentThreadIsOwner()); | 425 ASSERT(crit_.CurrentThreadIsOwner()); |
377 if (!active_) { | 426 if (!active_) { |
378 active_ = true; | 427 active_ = true; |
379 MessageQueueManager::Instance()->Add(this); | 428 MessageQueueManager::Instance()->Add(this); |
380 } | 429 } |
381 } | 430 } |
382 | 431 |
383 } // namespace talk_base | 432 } // namespace talk_base |
OLD | NEW |