Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(442)

Side by Side Diff: third_party/grpc/src/core/iomgr/timer.c

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/grpc/src/core/iomgr/timer.h ('k') | third_party/grpc/src/core/iomgr/timer_heap.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #include "src/core/iomgr/timer.h"
35
36 #include "src/core/iomgr/timer_heap.h"
37 #include "src/core/iomgr/time_averaged_stats.h"
38 #include <grpc/support/log.h>
39 #include <grpc/support/sync.h>
40 #include <grpc/support/useful.h>
41
42 #define INVALID_HEAP_INDEX 0xffffffffu
43
44 #define LOG2_NUM_SHARDS 5
45 #define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
46 #define ADD_DEADLINE_SCALE 0.33
47 #define MIN_QUEUE_WINDOW_DURATION 0.01
48 #define MAX_QUEUE_WINDOW_DURATION 1
49
50 typedef struct {
51 gpr_mu mu;
52 grpc_time_averaged_stats stats;
53 /* All and only timers with deadlines <= this will be in the heap. */
54 gpr_timespec queue_deadline_cap;
55 gpr_timespec min_deadline;
56 /* Index in the g_shard_queue */
57 uint32_t shard_queue_index;
58 /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
59 list have the top bit of their deadline set to 0. */
60 grpc_timer_heap heap;
61 /* This holds timers whose deadline is >= queue_deadline_cap. */
62 grpc_timer list;
63 } shard_type;
64
65 /* Protects g_shard_queue */
66 static gpr_mu g_mu;
67 /* Allow only one run_some_expired_timers at once */
68 static gpr_mu g_checker_mu;
69 static gpr_clock_type g_clock_type;
70 static shard_type g_shards[NUM_SHARDS];
71 /* Protected by g_mu */
72 static shard_type *g_shard_queue[NUM_SHARDS];
73
74 static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
75 gpr_timespec *next, int success);
76
77 static gpr_timespec compute_min_deadline(shard_type *shard) {
78 return grpc_timer_heap_is_empty(&shard->heap)
79 ? shard->queue_deadline_cap
80 : grpc_timer_heap_top(&shard->heap)->deadline;
81 }
82
83 void grpc_timer_list_init(gpr_timespec now) {
84 uint32_t i;
85
86 gpr_mu_init(&g_mu);
87 gpr_mu_init(&g_checker_mu);
88 g_clock_type = now.clock_type;
89
90 for (i = 0; i < NUM_SHARDS; i++) {
91 shard_type *shard = &g_shards[i];
92 gpr_mu_init(&shard->mu);
93 grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
94 0.5);
95 shard->queue_deadline_cap = now;
96 shard->shard_queue_index = i;
97 grpc_timer_heap_init(&shard->heap);
98 shard->list.next = shard->list.prev = &shard->list;
99 shard->min_deadline = compute_min_deadline(shard);
100 g_shard_queue[i] = shard;
101 }
102 }
103
104 void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
105 int i;
106 run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
107 for (i = 0; i < NUM_SHARDS; i++) {
108 shard_type *shard = &g_shards[i];
109 gpr_mu_destroy(&shard->mu);
110 grpc_timer_heap_destroy(&shard->heap);
111 }
112 gpr_mu_destroy(&g_mu);
113 gpr_mu_destroy(&g_checker_mu);
114 }
115
116 /* This is a cheap, but good enough, pointer hash for sharding the tasks: */
117 static size_t shard_idx(const grpc_timer *info) {
118 size_t x = (size_t)info;
119 return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
120 }
121
122 static double ts_to_dbl(gpr_timespec ts) {
123 return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
124 }
125
126 static gpr_timespec dbl_to_ts(double d) {
127 gpr_timespec ts;
128 ts.tv_sec = (int64_t)d;
129 ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec));
130 ts.clock_type = GPR_TIMESPAN;
131 return ts;
132 }
133
134 static void list_join(grpc_timer *head, grpc_timer *timer) {
135 timer->next = head;
136 timer->prev = head->prev;
137 timer->next->prev = timer->prev->next = timer;
138 }
139
140 static void list_remove(grpc_timer *timer) {
141 timer->next->prev = timer->prev;
142 timer->prev->next = timer->next;
143 }
144
145 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
146 shard_type *temp;
147 temp = g_shard_queue[first_shard_queue_index];
148 g_shard_queue[first_shard_queue_index] =
149 g_shard_queue[first_shard_queue_index + 1];
150 g_shard_queue[first_shard_queue_index + 1] = temp;
151 g_shard_queue[first_shard_queue_index]->shard_queue_index =
152 first_shard_queue_index;
153 g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
154 first_shard_queue_index + 1;
155 }
156
157 static void note_deadline_change(shard_type *shard) {
158 while (shard->shard_queue_index > 0 &&
159 gpr_time_cmp(
160 shard->min_deadline,
161 g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
162 swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
163 }
164 while (shard->shard_queue_index < NUM_SHARDS - 1 &&
165 gpr_time_cmp(
166 shard->min_deadline,
167 g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
168 swap_adjacent_shards_in_queue(shard->shard_queue_index);
169 }
170 }
171
172 void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
173 gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
174 void *timer_cb_arg, gpr_timespec now) {
175 int is_first_timer = 0;
176 shard_type *shard = &g_shards[shard_idx(timer)];
177 GPR_ASSERT(deadline.clock_type == g_clock_type);
178 GPR_ASSERT(now.clock_type == g_clock_type);
179 grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
180 timer->deadline = deadline;
181 timer->triggered = 0;
182
183 /* TODO(ctiller): check deadline expired */
184
185 gpr_mu_lock(&shard->mu);
186 grpc_time_averaged_stats_add_sample(&shard->stats,
187 ts_to_dbl(gpr_time_sub(deadline, now)));
188 if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
189 is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
190 } else {
191 timer->heap_index = INVALID_HEAP_INDEX;
192 list_join(&shard->list, timer);
193 }
194 gpr_mu_unlock(&shard->mu);
195
196 /* Deadline may have decreased, we need to adjust the master queue. Note
197 that there is a potential racy unlocked region here. There could be a
198 reordering of multiple grpc_timer_init calls, at this point, but the < test
199 below should ensure that we err on the side of caution. There could
200 also be a race with grpc_timer_check, which might beat us to the lock. In
201 that case, it is possible that the timer that we added will have already
202 run by the time we hold the lock, but that too is a safe error.
203 Finally, it's possible that the grpc_timer_check that intervened failed to
204 trigger the new timer because the min_deadline hadn't yet been reduced.
205 In that case, the timer will simply have to wait for the next
206 grpc_timer_check. */
207 if (is_first_timer) {
208 gpr_mu_lock(&g_mu);
209 if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
210 gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
211 shard->min_deadline = deadline;
212 note_deadline_change(shard);
213 if (shard->shard_queue_index == 0 &&
214 gpr_time_cmp(deadline, old_min_deadline) < 0) {
215 grpc_kick_poller();
216 }
217 }
218 gpr_mu_unlock(&g_mu);
219 }
220 }
221
222 void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
223 shard_type *shard = &g_shards[shard_idx(timer)];
224 gpr_mu_lock(&shard->mu);
225 if (!timer->triggered) {
226 grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
227 timer->triggered = 1;
228 if (timer->heap_index == INVALID_HEAP_INDEX) {
229 list_remove(timer);
230 } else {
231 grpc_timer_heap_remove(&shard->heap, timer);
232 }
233 }
234 gpr_mu_unlock(&shard->mu);
235 }
236
237 /* This is called when the queue is empty and "now" has reached the
238 queue_deadline_cap. We compute a new queue deadline and then scan the map
239 for timers that fall at or under it. Returns true if the queue is no
240 longer empty.
241 REQUIRES: shard->mu locked */
242 static int refill_queue(shard_type *shard, gpr_timespec now) {
243 /* Compute the new queue window width and bound by the limits: */
244 double computed_deadline_delta =
245 grpc_time_averaged_stats_update_average(&shard->stats) *
246 ADD_DEADLINE_SCALE;
247 double deadline_delta =
248 GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
249 MAX_QUEUE_WINDOW_DURATION);
250 grpc_timer *timer, *next;
251
252 /* Compute the new cap and put all timers under it into the queue: */
253 shard->queue_deadline_cap = gpr_time_add(
254 gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
255 for (timer = shard->list.next; timer != &shard->list; timer = next) {
256 next = timer->next;
257
258 if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) {
259 list_remove(timer);
260 grpc_timer_heap_add(&shard->heap, timer);
261 }
262 }
263 return !grpc_timer_heap_is_empty(&shard->heap);
264 }
265
266 /* This pops the next non-cancelled timer with deadline <= now from the queue,
267 or returns NULL if there isn't one.
268 REQUIRES: shard->mu locked */
269 static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
270 grpc_timer *timer;
271 for (;;) {
272 if (grpc_timer_heap_is_empty(&shard->heap)) {
273 if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
274 if (!refill_queue(shard, now)) return NULL;
275 }
276 timer = grpc_timer_heap_top(&shard->heap);
277 if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
278 timer->triggered = 1;
279 grpc_timer_heap_pop(&shard->heap);
280 return timer;
281 }
282 }
283
284 /* REQUIRES: shard->mu unlocked */
285 static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
286 gpr_timespec now, gpr_timespec *new_min_deadline,
287 int success) {
288 size_t n = 0;
289 grpc_timer *timer;
290 gpr_mu_lock(&shard->mu);
291 while ((timer = pop_one(shard, now))) {
292 grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL);
293 n++;
294 }
295 *new_min_deadline = compute_min_deadline(shard);
296 gpr_mu_unlock(&shard->mu);
297 return n;
298 }
299
300 static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
301 gpr_timespec *next, int success) {
302 size_t n = 0;
303
304 /* TODO(ctiller): verify that there are any timers (atomically) here */
305
306 if (gpr_mu_trylock(&g_checker_mu)) {
307 gpr_mu_lock(&g_mu);
308
309 while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
310 gpr_timespec new_min_deadline;
311
312 /* For efficiency, we pop as many available timers as we can from the
313 shard. This may violate perfect timer deadline ordering, but that
314 shouldn't be a big deal because we don't make ordering guarantees. */
315 n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
316 success);
317
318 /* An grpc_timer_init() on the shard could intervene here, adding a new
319 timer that is earlier than new_min_deadline. However,
320 grpc_timer_init() will block on the master_lock before it can call
321 set_min_deadline, so this one will complete first and then the Addtimer
322 will reduce the min_deadline (perhaps unnecessarily). */
323 g_shard_queue[0]->min_deadline = new_min_deadline;
324 note_deadline_change(g_shard_queue[0]);
325 }
326
327 if (next) {
328 *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
329 }
330
331 gpr_mu_unlock(&g_mu);
332 gpr_mu_unlock(&g_checker_mu);
333 }
334
335 return (int)n;
336 }
337
338 bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
339 gpr_timespec *next) {
340 GPR_ASSERT(now.clock_type == g_clock_type);
341 return run_some_expired_timers(
342 exec_ctx, now, next,
343 gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
344 }
OLDNEW
« no previous file with comments | « third_party/grpc/src/core/iomgr/timer.h ('k') | third_party/grpc/src/core/iomgr/timer_heap.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698