| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 18 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/config" | 19 "github.com/luci/luci-go/common/config" |
| 20 "github.com/luci/luci-go/common/errors" | 20 "github.com/luci/luci-go/common/errors" |
| 21 "github.com/luci/luci-go/common/gcloud/gs" | 21 "github.com/luci/luci-go/common/gcloud/gs" |
| 22 "github.com/luci/luci-go/common/logdog/types" | 22 "github.com/luci/luci-go/common/logdog/types" |
| 23 log "github.com/luci/luci-go/common/logging" | 23 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/common/parallel" | 24 "github.com/luci/luci-go/common/parallel" |
| 25 "github.com/luci/luci-go/common/proto/logdog/logpb" | 25 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 26 "github.com/luci/luci-go/common/tsmon/distribution" | 26 "github.com/luci/luci-go/common/tsmon/distribution" |
| 27 "github.com/luci/luci-go/common/tsmon/field" | 27 "github.com/luci/luci-go/common/tsmon/field" |
| 28 "github.com/luci/luci-go/common/tsmon/metric" | 28 "github.com/luci/luci-go/common/tsmon/metric" |
| 29 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
| 29 "github.com/luci/luci-go/server/logdog/archive" | 30 "github.com/luci/luci-go/server/logdog/archive" |
| 30 "github.com/luci/luci-go/server/logdog/storage" | 31 "github.com/luci/luci-go/server/logdog/storage" |
| 31 ) | 32 ) |
| 32 | 33 |
| 33 const ( | 34 const ( |
| 34 tsEntriesField = "entries" | 35 tsEntriesField = "entries" |
| 35 tsIndexField = "index" | 36 tsIndexField = "index" |
| 36 tsDataField = "data" | 37 tsDataField = "data" |
| 37 | 38 |
| 38 // If the archive dispatch is within this range of the current time, we
will | 39 // If the archive dispatch is within this range of the current time, we
will |
| 39 // avoid archival. | 40 // avoid archival. |
| 40 dispatchThreshold = 5 * time.Minute | 41 dispatchThreshold = 5 * time.Minute |
| 41 ) | 42 ) |
| 42 | 43 |
| 43 var ( | 44 var ( |
| 44 // tsCount counts the raw number of archival tasks that this instance ha
s | 45 // tsCount counts the raw number of archival tasks that this instance ha
s |
| 45 // processed, regardless of success/failure. | 46 // processed, regardless of success/failure. |
| 46 tsCount = metric.NewCounter("logdog/archivist/archive/count", | 47 tsCount = metric.NewCounter("logdog/archivist/archive/count", |
| 47 "The number of archival tasks processed.", | 48 "The number of archival tasks processed.", |
| 49 tsmon_types.MetricMetadata{}, |
| 48 field.Bool("successful")) | 50 field.Bool("successful")) |
| 49 | 51 |
| 50 // tsSize tracks the archive binary file size distribution of completed | 52 // tsSize tracks the archive binary file size distribution of completed |
| 51 // archives. | 53 // archives. |
| 52 // | 54 // |
| 53 // The "archive" field is the specific type of archive (entries, index,
data) | 55 // The "archive" field is the specific type of archive (entries, index,
data) |
| 54 // that is being tracked. | 56 // that is being tracked. |
| 55 // | 57 // |
| 56 // The "stream" field is the type of log stream that is being archived. | 58 // The "stream" field is the type of log stream that is being archived. |
| 57 tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size
", | 59 tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size
", |
| 58 "The size (in bytes) of each archive file.", | 60 "The size (in bytes) of each archive file.", |
| 61 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
| 59 distribution.DefaultBucketer, | 62 distribution.DefaultBucketer, |
| 60 field.String("archive"), | 63 field.String("archive"), |
| 61 field.String("stream")) | 64 field.String("stream")) |
| 62 | 65 |
| 63 // tsTotalBytes tracks the cumulative total number of bytes that have | 66 // tsTotalBytes tracks the cumulative total number of bytes that have |
| 64 // been archived by this instance. | 67 // been archived by this instance. |
| 65 // | 68 // |
| 66 // The "archive" field is the specific type of archive (entries, index,
data) | 69 // The "archive" field is the specific type of archive (entries, index,
data) |
| 67 // that is being tracked. | 70 // that is being tracked. |
| 68 // | 71 // |
| 69 // The "stream" field is the type of log stream that is being archived. | 72 // The "stream" field is the type of log stream that is being archived. |
| 70 tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes", | 73 tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes", |
| 71 "The total number of archived bytes.", | 74 "The total number of archived bytes.", |
| 75 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
| 72 field.String("archive"), | 76 field.String("archive"), |
| 73 field.String("stream")) | 77 field.String("stream")) |
| 74 | 78 |
| 75 // tsLogEntries tracks the number of log entries per individual | 79 // tsLogEntries tracks the number of log entries per individual |
| 76 // archival. | 80 // archival. |
| 77 // | 81 // |
| 78 // The "stream" field is the type of log stream that is being archived. | 82 // The "stream" field is the type of log stream that is being archived. |
| 79 tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archiv
e/log_entries", | 83 tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archiv
e/log_entries", |
| 80 "The total number of log entries per archive.", | 84 "The total number of log entries per archive.", |
| 85 tsmon_types.MetricMetadata{}, |
| 81 distribution.DefaultBucketer, | 86 distribution.DefaultBucketer, |
| 82 field.String("stream")) | 87 field.String("stream")) |
| 83 | 88 |
| 84 // tsTotalLogEntries tracks the total number of log entries that have | 89 // tsTotalLogEntries tracks the total number of log entries that have |
| 85 // been archived by this instance. | 90 // been archived by this instance. |
| 86 // | 91 // |
| 87 // The "stream" field is the type of log stream that is being archived. | 92 // The "stream" field is the type of log stream that is being archived. |
| 88 tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_lo
g_entries", | 93 tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_lo
g_entries", |
| 89 "The total number of log entries.", | 94 "The total number of log entries.", |
| 95 tsmon_types.MetricMetadata{}, |
| 90 field.String("stream")) | 96 field.String("stream")) |
| 91 ) | 97 ) |
| 92 | 98 |
| 93 // Task is a single archive task. | 99 // Task is a single archive task. |
| 94 type Task interface { | 100 type Task interface { |
| 95 // UniqueID returns a task-unique value. Other tasks, and other retries
of | 101 // UniqueID returns a task-unique value. Other tasks, and other retries
of |
| 96 // this task, should (try to) not reuse this ID. | 102 // this task, should (try to) not reuse this ID. |
| 97 UniqueID() string | 103 UniqueID() string |
| 98 | 104 |
| 99 // Task is the archive task to execute. | 105 // Task is the archive task to execute. |
| (...skipping 673 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 773 return e.inner | 779 return e.inner |
| 774 } | 780 } |
| 775 | 781 |
| 776 func isFailure(err error) bool { | 782 func isFailure(err error) bool { |
| 777 if err == nil { | 783 if err == nil { |
| 778 return false | 784 return false |
| 779 } | 785 } |
| 780 _, ok := err.(*statusErrorWrapper) | 786 _, ok := err.(*statusErrorWrapper) |
| 781 return !ok | 787 return !ok |
| 782 } | 788 } |
| OLD | NEW |