OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/websockets/websocket_throttle.h" | 5 #include "net/websockets/websocket_throttle.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <set> | 8 #include <set> |
9 #include <string> | 9 #include <string> |
10 | 10 |
11 #include "base/memory/singleton.h" | 11 #include "base/memory/singleton.h" |
12 #include "base/message_loop.h" | 12 #include "base/message_loop.h" |
13 #include "base/strings/string_number_conversions.h" | 13 #include "base/strings/string_number_conversions.h" |
14 #include "base/strings/string_util.h" | 14 #include "base/strings/string_util.h" |
15 #include "base/strings/stringprintf.h" | 15 #include "base/strings/stringprintf.h" |
16 #include "net/base/io_buffer.h" | 16 #include "net/base/io_buffer.h" |
17 #include "net/socket_stream/socket_stream.h" | 17 #include "net/socket_stream/socket_stream.h" |
18 #include "net/websockets/websocket_job.h" | 18 #include "net/websockets/websocket_job.h" |
19 | 19 |
20 namespace net { | 20 namespace net { |
21 | 21 |
22 namespace { | |
23 | |
24 const size_t kMaxWebSocketJobsThrottled = 1024; | |
25 | |
26 } // namespace | |
27 | |
22 WebSocketThrottle::WebSocketThrottle() { | 28 WebSocketThrottle::WebSocketThrottle() { |
23 } | 29 } |
24 | 30 |
25 WebSocketThrottle::~WebSocketThrottle() { | 31 WebSocketThrottle::~WebSocketThrottle() { |
26 DCHECK(queue_.empty()); | 32 DCHECK(queue_.empty()); |
27 DCHECK(addr_map_.empty()); | 33 DCHECK(addr_map_.empty()); |
28 } | 34 } |
29 | 35 |
30 // static | 36 // static |
31 WebSocketThrottle* WebSocketThrottle::GetInstance() { | 37 WebSocketThrottle* WebSocketThrottle::GetInstance() { |
32 return Singleton<WebSocketThrottle>::get(); | 38 return Singleton<WebSocketThrottle>::get(); |
33 } | 39 } |
34 | 40 |
35 void WebSocketThrottle::PutInQueue(WebSocketJob* job) { | 41 bool WebSocketThrottle::PutInQueue(WebSocketJob* job) { |
42 if (queue_.size() >= kMaxWebSocketJobsThrottled) | |
43 return false; | |
44 | |
36 queue_.push_back(job); | 45 queue_.push_back(job); |
37 const AddressList& address_list = job->address_list(); | 46 const AddressList& address_list = job->address_list(); |
38 std::set<IPEndPoint> address_set; | 47 std::set<IPEndPoint> address_set; |
39 for (AddressList::const_iterator addr_iter = address_list.begin(); | 48 for (AddressList::const_iterator addr_iter = address_list.begin(); |
40 addr_iter != address_list.end(); | 49 addr_iter != address_list.end(); |
41 ++addr_iter) { | 50 ++addr_iter) { |
42 const IPEndPoint& address = *addr_iter; | 51 const IPEndPoint& address = *addr_iter; |
43 // If |address| is already processed, don't do it again. | 52 // If |address| is already processed, don't do it again. |
44 if (!address_set.insert(address).second) | 53 if (!address_set.insert(address).second) |
45 continue; | 54 continue; |
46 | 55 |
47 ConnectingAddressMap::iterator iter = addr_map_.find(address); | 56 ConnectingAddressMap::iterator iter = addr_map_.find(address); |
48 if (iter == addr_map_.end()) { | 57 if (iter == addr_map_.end()) { |
49 ConnectingQueue* queue = new ConnectingQueue(); | 58 ConnectingQueue* queue = new ConnectingQueue(); |
50 queue->push_back(job); | 59 queue->push_back(job); |
51 addr_map_[address] = queue; | 60 addr_map_[address] = queue; |
52 } else { | 61 } else { |
yhirano
2013/07/16 02:08:56
DCHECK(!iter->second->empty()) would be helpful.
tyoshino (SeeGerritForStatus)
2013/07/17 06:58:26
Done.
| |
53 iter->second->push_back(job); | 62 iter->second->push_back(job); |
54 job->SetWaiting(); | 63 job->SetWaiting(); |
55 DVLOG(1) << "Waiting on " << address.ToString(); | 64 DVLOG(1) << "Waiting on " << address.ToString(); |
56 } | 65 } |
57 } | 66 } |
67 | |
68 return true; | |
58 } | 69 } |
59 | 70 |
60 void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { | 71 void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { |
61 ConnectingQueue::iterator queue_iter = | 72 ConnectingQueue::iterator queue_iter = |
62 std::find(queue_.begin(), queue_.end(), job); | 73 std::find(queue_.begin(), queue_.end(), job); |
63 if (queue_iter == queue_.end()) | 74 if (queue_iter == queue_.end()) |
64 return; | 75 return; |
65 queue_.erase(queue_iter); | 76 queue_.erase(queue_iter); |
66 const AddressList& address_list = job->address_list(); | 77 |
78 std::set<WebSocketJob*> wakeup_candidates; | |
79 | |
80 const AddressList& resolved_address_list = job->address_list(); | |
67 std::set<IPEndPoint> address_set; | 81 std::set<IPEndPoint> address_set; |
68 for (AddressList::const_iterator addr_iter = address_list.begin(); | 82 for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); |
69 addr_iter != address_list.end(); | 83 addr_iter != resolved_address_list.end(); |
70 ++addr_iter) { | 84 ++addr_iter) { |
71 const IPEndPoint& address = *addr_iter; | 85 const IPEndPoint& address = *addr_iter; |
72 // If |address| is already processed, don't do it again. | 86 // If |address| is already processed, don't do it again. |
73 if (!address_set.insert(address).second) | 87 if (!address_set.insert(address).second) |
74 continue; | 88 continue; |
75 | 89 |
76 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); | 90 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); |
77 DCHECK(map_iter != addr_map_.end()); | 91 DCHECK(map_iter != addr_map_.end()); |
78 | 92 |
79 ConnectingQueue* queue = map_iter->second; | 93 ConnectingQueue* per_address_queue = map_iter->second; |
80 // Job may not be front of queue if the socket is closed while waiting. | 94 DCHECK(!per_address_queue->empty()); |
81 ConnectingQueue::iterator address_queue_iter = | 95 // Job may not be front of the queue if the socket is closed while waiting. |
82 std::find(queue->begin(), queue->end(), job); | 96 ConnectingQueue::iterator per_address_queue_iter = |
83 if (address_queue_iter != queue->end()) | 97 std::find(per_address_queue->begin(), per_address_queue->end(), job); |
84 queue->erase(address_queue_iter); | 98 bool was_front = false; |
85 if (queue->empty()) { | 99 if (per_address_queue_iter != per_address_queue->end()) { |
86 delete queue; | 100 was_front = per_address_queue_iter == per_address_queue->begin(); |
szym
2013/07/12 15:31:05
suggest parentheses
tyoshino (SeeGerritForStatus)
2013/07/17 06:58:26
Done.
| |
101 per_address_queue->erase(per_address_queue_iter); | |
102 } | |
103 if (per_address_queue->empty()) { | |
104 delete per_address_queue; | |
87 addr_map_.erase(map_iter); | 105 addr_map_.erase(map_iter); |
106 } else if (was_front) { | |
107 // The new front is a wake-up candidate. | |
108 wakeup_candidates.insert(per_address_queue->front()); | |
88 } | 109 } |
89 } | 110 } |
111 | |
112 WakeupSocketIfNecessary(wakeup_candidates); | |
90 } | 113 } |
91 | 114 |
92 void WebSocketThrottle::WakeupSocketIfNecessary() { | 115 void WebSocketThrottle::WakeupSocketIfNecessary( |
93 for (ConnectingQueue::iterator iter = queue_.begin(); | 116 const std::set<WebSocketJob*>& wakeup_candidates) { |
94 iter != queue_.end(); | 117 for (std::set<WebSocketJob*>::iterator iter = wakeup_candidates.begin(); |
118 iter != wakeup_candidates.end(); | |
95 ++iter) { | 119 ++iter) { |
96 WebSocketJob* job = *iter; | 120 WebSocketJob* job = *iter; |
97 if (!job->IsWaiting()) | 121 if (!job->IsWaiting()) |
98 continue; | 122 continue; |
99 | 123 |
100 bool should_wakeup = true; | 124 bool should_wakeup = true; |
101 const AddressList& address_list = job->address_list(); | 125 const AddressList& resolved_address_list = job->address_list(); |
102 for (AddressList::const_iterator addr_iter = address_list.begin(); | 126 for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); |
103 addr_iter != address_list.end(); | 127 addr_iter != resolved_address_list.end(); |
104 ++addr_iter) { | 128 ++addr_iter) { |
105 const IPEndPoint& address = *addr_iter; | 129 const IPEndPoint& address = *addr_iter; |
106 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); | 130 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); |
107 DCHECK(map_iter != addr_map_.end()); | 131 DCHECK(map_iter != addr_map_.end()); |
108 ConnectingQueue* queue = map_iter->second; | 132 ConnectingQueue* per_address_queue = map_iter->second; |
109 if (job != queue->front()) { | 133 if (job != per_address_queue->front()) { |
110 should_wakeup = false; | 134 should_wakeup = false; |
111 break; | 135 break; |
112 } | 136 } |
113 } | 137 } |
114 if (should_wakeup) | 138 if (should_wakeup) |
115 job->Wakeup(); | 139 job->Wakeup(); |
116 } | 140 } |
117 } | 141 } |
118 | 142 |
119 } // namespace net | 143 } // namespace net |
OLD | NEW |