| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "time" | 10 "time" |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 111 | 111 |
| 112 queueName, err := archiveTaskQueueName(cfg) | 112 queueName, err := archiveTaskQueueName(cfg) |
| 113 if err != nil { | 113 if err != nil { |
| 114 log.Errorf(c, "Failed to get task queue name.") | 114 log.Errorf(c, "Failed to get task queue name.") |
| 115 return err | 115 return err |
| 116 } | 116 } |
| 117 | 117 |
| 118 now := clock.Now(c).UTC() | 118 now := clock.Now(c).UTC() |
| 119 q := ds.NewQuery("LogStream") | 119 q := ds.NewQuery("LogStream") |
| 120 | 120 |
| 121 // TODO(dnj): Next schema migration, remove this State constraint in fav
or |
| 122 // of equality filters on _Terminated and ArchiveState. |
| 123 // It is necessary for now since otherwise, *all* archived streams will
match |
| 124 // this query! |
| 121 var threshold time.Duration | 125 var threshold time.Duration |
| 122 if complete { | 126 if complete { |
| 123 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() | 127 threshold = cfg.GetCoordinator().ArchiveDelay.Duration() |
| 124 q = q.Eq("State", coordinator.LSTerminated) | 128 q = q.Eq("State", coordinator.LSTerminated) |
| 125 } else { | 129 } else { |
| 126 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() | 130 threshold = cfg.GetCoordinator().ArchiveDelayMax.Duration() |
| 127 q = q.Eq("State", coordinator.LSPending) | 131 q = q.Eq("State", coordinator.LSPending) |
| 128 } | 132 } |
| 129 q = q.Lte("Updated", now.Add(-threshold)) | 133 q = q.Lte("Updated", now.Add(-threshold)) |
| 130 | 134 |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 215 // Dispatch any remaining enqueued tasks. | 219 // Dispatch any remaining enqueued tasks. |
| 216 if err := addAndMaybeDispatchTasks(nil); err != nil { | 220 if err := addAndMaybeDispatchTasks(nil); err != nil { |
| 217 return err | 221 return err |
| 218 } | 222 } |
| 219 | 223 |
| 220 log.Fields{ | 224 log.Fields{ |
| 221 "scheduledTaskCount": totalScheduledTasks, | 225 "scheduledTaskCount": totalScheduledTasks, |
| 222 }.Debugf(c, "Archive sweep completed successfully.") | 226 }.Debugf(c, "Archive sweep completed successfully.") |
| 223 return nil | 227 return nil |
| 224 } | 228 } |
| OLD | NEW |