OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include "src/core/iomgr/timer.h" |
| 35 |
| 36 #include "src/core/iomgr/timer_heap.h" |
| 37 #include "src/core/iomgr/time_averaged_stats.h" |
| 38 #include <grpc/support/log.h> |
| 39 #include <grpc/support/sync.h> |
| 40 #include <grpc/support/useful.h> |
| 41 |
| 42 #define INVALID_HEAP_INDEX 0xffffffffu |
| 43 |
| 44 #define LOG2_NUM_SHARDS 5 |
| 45 #define NUM_SHARDS (1 << LOG2_NUM_SHARDS) |
| 46 #define ADD_DEADLINE_SCALE 0.33 |
| 47 #define MIN_QUEUE_WINDOW_DURATION 0.01 |
| 48 #define MAX_QUEUE_WINDOW_DURATION 1 |
| 49 |
| 50 typedef struct { |
| 51 gpr_mu mu; |
| 52 grpc_time_averaged_stats stats; |
| 53 /* All and only timers with deadlines <= this will be in the heap. */ |
| 54 gpr_timespec queue_deadline_cap; |
| 55 gpr_timespec min_deadline; |
| 56 /* Index in the g_shard_queue */ |
| 57 uint32_t shard_queue_index; |
| 58 /* This holds all timers with deadlines < queue_deadline_cap. Timers in this |
| 59 list have the top bit of their deadline set to 0. */ |
| 60 grpc_timer_heap heap; |
| 61 /* This holds timers whose deadline is >= queue_deadline_cap. */ |
| 62 grpc_timer list; |
| 63 } shard_type; |
| 64 |
| 65 /* Protects g_shard_queue */ |
| 66 static gpr_mu g_mu; |
| 67 /* Allow only one run_some_expired_timers at once */ |
| 68 static gpr_mu g_checker_mu; |
| 69 static gpr_clock_type g_clock_type; |
| 70 static shard_type g_shards[NUM_SHARDS]; |
| 71 /* Protected by g_mu */ |
| 72 static shard_type *g_shard_queue[NUM_SHARDS]; |
| 73 |
| 74 static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
| 75 gpr_timespec *next, int success); |
| 76 |
| 77 static gpr_timespec compute_min_deadline(shard_type *shard) { |
| 78 return grpc_timer_heap_is_empty(&shard->heap) |
| 79 ? shard->queue_deadline_cap |
| 80 : grpc_timer_heap_top(&shard->heap)->deadline; |
| 81 } |
| 82 |
| 83 void grpc_timer_list_init(gpr_timespec now) { |
| 84 uint32_t i; |
| 85 |
| 86 gpr_mu_init(&g_mu); |
| 87 gpr_mu_init(&g_checker_mu); |
| 88 g_clock_type = now.clock_type; |
| 89 |
| 90 for (i = 0; i < NUM_SHARDS; i++) { |
| 91 shard_type *shard = &g_shards[i]; |
| 92 gpr_mu_init(&shard->mu); |
| 93 grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, |
| 94 0.5); |
| 95 shard->queue_deadline_cap = now; |
| 96 shard->shard_queue_index = i; |
| 97 grpc_timer_heap_init(&shard->heap); |
| 98 shard->list.next = shard->list.prev = &shard->list; |
| 99 shard->min_deadline = compute_min_deadline(shard); |
| 100 g_shard_queue[i] = shard; |
| 101 } |
| 102 } |
| 103 |
| 104 void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { |
| 105 int i; |
| 106 run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); |
| 107 for (i = 0; i < NUM_SHARDS; i++) { |
| 108 shard_type *shard = &g_shards[i]; |
| 109 gpr_mu_destroy(&shard->mu); |
| 110 grpc_timer_heap_destroy(&shard->heap); |
| 111 } |
| 112 gpr_mu_destroy(&g_mu); |
| 113 gpr_mu_destroy(&g_checker_mu); |
| 114 } |
| 115 |
| 116 /* This is a cheap, but good enough, pointer hash for sharding the tasks: */ |
| 117 static size_t shard_idx(const grpc_timer *info) { |
| 118 size_t x = (size_t)info; |
| 119 return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1); |
| 120 } |
| 121 |
| 122 static double ts_to_dbl(gpr_timespec ts) { |
| 123 return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; |
| 124 } |
| 125 |
| 126 static gpr_timespec dbl_to_ts(double d) { |
| 127 gpr_timespec ts; |
| 128 ts.tv_sec = (int64_t)d; |
| 129 ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); |
| 130 ts.clock_type = GPR_TIMESPAN; |
| 131 return ts; |
| 132 } |
| 133 |
| 134 static void list_join(grpc_timer *head, grpc_timer *timer) { |
| 135 timer->next = head; |
| 136 timer->prev = head->prev; |
| 137 timer->next->prev = timer->prev->next = timer; |
| 138 } |
| 139 |
| 140 static void list_remove(grpc_timer *timer) { |
| 141 timer->next->prev = timer->prev; |
| 142 timer->prev->next = timer->next; |
| 143 } |
| 144 |
| 145 static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { |
| 146 shard_type *temp; |
| 147 temp = g_shard_queue[first_shard_queue_index]; |
| 148 g_shard_queue[first_shard_queue_index] = |
| 149 g_shard_queue[first_shard_queue_index + 1]; |
| 150 g_shard_queue[first_shard_queue_index + 1] = temp; |
| 151 g_shard_queue[first_shard_queue_index]->shard_queue_index = |
| 152 first_shard_queue_index; |
| 153 g_shard_queue[first_shard_queue_index + 1]->shard_queue_index = |
| 154 first_shard_queue_index + 1; |
| 155 } |
| 156 |
| 157 static void note_deadline_change(shard_type *shard) { |
| 158 while (shard->shard_queue_index > 0 && |
| 159 gpr_time_cmp( |
| 160 shard->min_deadline, |
| 161 g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { |
| 162 swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); |
| 163 } |
| 164 while (shard->shard_queue_index < NUM_SHARDS - 1 && |
| 165 gpr_time_cmp( |
| 166 shard->min_deadline, |
| 167 g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { |
| 168 swap_adjacent_shards_in_queue(shard->shard_queue_index); |
| 169 } |
| 170 } |
| 171 |
| 172 void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, |
| 173 gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, |
| 174 void *timer_cb_arg, gpr_timespec now) { |
| 175 int is_first_timer = 0; |
| 176 shard_type *shard = &g_shards[shard_idx(timer)]; |
| 177 GPR_ASSERT(deadline.clock_type == g_clock_type); |
| 178 GPR_ASSERT(now.clock_type == g_clock_type); |
| 179 grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); |
| 180 timer->deadline = deadline; |
| 181 timer->triggered = 0; |
| 182 |
| 183 /* TODO(ctiller): check deadline expired */ |
| 184 |
| 185 gpr_mu_lock(&shard->mu); |
| 186 grpc_time_averaged_stats_add_sample(&shard->stats, |
| 187 ts_to_dbl(gpr_time_sub(deadline, now))); |
| 188 if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { |
| 189 is_first_timer = grpc_timer_heap_add(&shard->heap, timer); |
| 190 } else { |
| 191 timer->heap_index = INVALID_HEAP_INDEX; |
| 192 list_join(&shard->list, timer); |
| 193 } |
| 194 gpr_mu_unlock(&shard->mu); |
| 195 |
| 196 /* Deadline may have decreased, we need to adjust the master queue. Note |
| 197 that there is a potential racy unlocked region here. There could be a |
| 198 reordering of multiple grpc_timer_init calls, at this point, but the < test |
| 199 below should ensure that we err on the side of caution. There could |
| 200 also be a race with grpc_timer_check, which might beat us to the lock. In |
| 201 that case, it is possible that the timer that we added will have already |
| 202 run by the time we hold the lock, but that too is a safe error. |
| 203 Finally, it's possible that the grpc_timer_check that intervened failed to |
| 204 trigger the new timer because the min_deadline hadn't yet been reduced. |
| 205 In that case, the timer will simply have to wait for the next |
| 206 grpc_timer_check. */ |
| 207 if (is_first_timer) { |
| 208 gpr_mu_lock(&g_mu); |
| 209 if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { |
| 210 gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; |
| 211 shard->min_deadline = deadline; |
| 212 note_deadline_change(shard); |
| 213 if (shard->shard_queue_index == 0 && |
| 214 gpr_time_cmp(deadline, old_min_deadline) < 0) { |
| 215 grpc_kick_poller(); |
| 216 } |
| 217 } |
| 218 gpr_mu_unlock(&g_mu); |
| 219 } |
| 220 } |
| 221 |
| 222 void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { |
| 223 shard_type *shard = &g_shards[shard_idx(timer)]; |
| 224 gpr_mu_lock(&shard->mu); |
| 225 if (!timer->triggered) { |
| 226 grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); |
| 227 timer->triggered = 1; |
| 228 if (timer->heap_index == INVALID_HEAP_INDEX) { |
| 229 list_remove(timer); |
| 230 } else { |
| 231 grpc_timer_heap_remove(&shard->heap, timer); |
| 232 } |
| 233 } |
| 234 gpr_mu_unlock(&shard->mu); |
| 235 } |
| 236 |
| 237 /* This is called when the queue is empty and "now" has reached the |
| 238 queue_deadline_cap. We compute a new queue deadline and then scan the map |
| 239 for timers that fall at or under it. Returns true if the queue is no |
| 240 longer empty. |
| 241 REQUIRES: shard->mu locked */ |
| 242 static int refill_queue(shard_type *shard, gpr_timespec now) { |
| 243 /* Compute the new queue window width and bound by the limits: */ |
| 244 double computed_deadline_delta = |
| 245 grpc_time_averaged_stats_update_average(&shard->stats) * |
| 246 ADD_DEADLINE_SCALE; |
| 247 double deadline_delta = |
| 248 GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION, |
| 249 MAX_QUEUE_WINDOW_DURATION); |
| 250 grpc_timer *timer, *next; |
| 251 |
| 252 /* Compute the new cap and put all timers under it into the queue: */ |
| 253 shard->queue_deadline_cap = gpr_time_add( |
| 254 gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); |
| 255 for (timer = shard->list.next; timer != &shard->list; timer = next) { |
| 256 next = timer->next; |
| 257 |
| 258 if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { |
| 259 list_remove(timer); |
| 260 grpc_timer_heap_add(&shard->heap, timer); |
| 261 } |
| 262 } |
| 263 return !grpc_timer_heap_is_empty(&shard->heap); |
| 264 } |
| 265 |
| 266 /* This pops the next non-cancelled timer with deadline <= now from the queue, |
| 267 or returns NULL if there isn't one. |
| 268 REQUIRES: shard->mu locked */ |
| 269 static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { |
| 270 grpc_timer *timer; |
| 271 for (;;) { |
| 272 if (grpc_timer_heap_is_empty(&shard->heap)) { |
| 273 if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; |
| 274 if (!refill_queue(shard, now)) return NULL; |
| 275 } |
| 276 timer = grpc_timer_heap_top(&shard->heap); |
| 277 if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; |
| 278 timer->triggered = 1; |
| 279 grpc_timer_heap_pop(&shard->heap); |
| 280 return timer; |
| 281 } |
| 282 } |
| 283 |
| 284 /* REQUIRES: shard->mu unlocked */ |
| 285 static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, |
| 286 gpr_timespec now, gpr_timespec *new_min_deadline, |
| 287 int success) { |
| 288 size_t n = 0; |
| 289 grpc_timer *timer; |
| 290 gpr_mu_lock(&shard->mu); |
| 291 while ((timer = pop_one(shard, now))) { |
| 292 grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL); |
| 293 n++; |
| 294 } |
| 295 *new_min_deadline = compute_min_deadline(shard); |
| 296 gpr_mu_unlock(&shard->mu); |
| 297 return n; |
| 298 } |
| 299 |
| 300 static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
| 301 gpr_timespec *next, int success) { |
| 302 size_t n = 0; |
| 303 |
| 304 /* TODO(ctiller): verify that there are any timers (atomically) here */ |
| 305 |
| 306 if (gpr_mu_trylock(&g_checker_mu)) { |
| 307 gpr_mu_lock(&g_mu); |
| 308 |
| 309 while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { |
| 310 gpr_timespec new_min_deadline; |
| 311 |
| 312 /* For efficiency, we pop as many available timers as we can from the |
| 313 shard. This may violate perfect timer deadline ordering, but that |
| 314 shouldn't be a big deal because we don't make ordering guarantees. */ |
| 315 n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, |
| 316 success); |
| 317 |
| 318 /* An grpc_timer_init() on the shard could intervene here, adding a new |
| 319 timer that is earlier than new_min_deadline. However, |
| 320 grpc_timer_init() will block on the master_lock before it can call |
| 321 set_min_deadline, so this one will complete first and then the Addtimer |
| 322 will reduce the min_deadline (perhaps unnecessarily). */ |
| 323 g_shard_queue[0]->min_deadline = new_min_deadline; |
| 324 note_deadline_change(g_shard_queue[0]); |
| 325 } |
| 326 |
| 327 if (next) { |
| 328 *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); |
| 329 } |
| 330 |
| 331 gpr_mu_unlock(&g_mu); |
| 332 gpr_mu_unlock(&g_checker_mu); |
| 333 } |
| 334 |
| 335 return (int)n; |
| 336 } |
| 337 |
| 338 bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, |
| 339 gpr_timespec *next) { |
| 340 GPR_ASSERT(now.clock_type == g_clock_type); |
| 341 return run_some_expired_timers( |
| 342 exec_ctx, now, next, |
| 343 gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); |
| 344 } |
OLD | NEW |