OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "fmt" | 10 "fmt" |
11 "io" | 11 "io" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
16 | 16 |
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
18 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
19 "github.com/luci/luci-go/common/config" | 19 "github.com/luci/luci-go/common/config" |
20 "github.com/luci/luci-go/common/errors" | 20 "github.com/luci/luci-go/common/errors" |
21 "github.com/luci/luci-go/common/gcloud/gs" | 21 "github.com/luci/luci-go/common/gcloud/gs" |
22 "github.com/luci/luci-go/common/logdog/types" | 22 "github.com/luci/luci-go/common/logdog/types" |
23 log "github.com/luci/luci-go/common/logging" | 23 log "github.com/luci/luci-go/common/logging" |
24 "github.com/luci/luci-go/common/parallel" | 24 "github.com/luci/luci-go/common/parallel" |
25 "github.com/luci/luci-go/common/proto/logdog/logpb" | 25 "github.com/luci/luci-go/common/proto/logdog/logpb" |
26 "github.com/luci/luci-go/common/tsmon/distribution" | 26 "github.com/luci/luci-go/common/tsmon/distribution" |
27 "github.com/luci/luci-go/common/tsmon/field" | 27 "github.com/luci/luci-go/common/tsmon/field" |
28 "github.com/luci/luci-go/common/tsmon/metric" | 28 "github.com/luci/luci-go/common/tsmon/metric" |
| 29 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
29 "github.com/luci/luci-go/server/logdog/archive" | 30 "github.com/luci/luci-go/server/logdog/archive" |
30 "github.com/luci/luci-go/server/logdog/storage" | 31 "github.com/luci/luci-go/server/logdog/storage" |
31 ) | 32 ) |
32 | 33 |
33 const ( | 34 const ( |
34 tsEntriesField = "entries" | 35 tsEntriesField = "entries" |
35 tsIndexField = "index" | 36 tsIndexField = "index" |
36 tsDataField = "data" | 37 tsDataField = "data" |
37 | 38 |
38 // If the archive dispatch is within this range of the current time, we
will | 39 // If the archive dispatch is within this range of the current time, we
will |
39 // avoid archival. | 40 // avoid archival. |
40 dispatchThreshold = 5 * time.Minute | 41 dispatchThreshold = 5 * time.Minute |
41 ) | 42 ) |
42 | 43 |
43 var ( | 44 var ( |
44 // tsCount counts the raw number of archival tasks that this instance ha
s | 45 // tsCount counts the raw number of archival tasks that this instance ha
s |
45 // processed, regardless of success/failure. | 46 // processed, regardless of success/failure. |
46 tsCount = metric.NewCounter("logdog/archivist/archive/count", | 47 tsCount = metric.NewCounter("logdog/archivist/archive/count", |
47 "The number of archival tasks processed.", | 48 "The number of archival tasks processed.", |
| 49 tsmon_types.MetricMetadata{}, |
48 field.Bool("successful")) | 50 field.Bool("successful")) |
49 | 51 |
50 // tsSize tracks the archive binary file size distribution of completed | 52 // tsSize tracks the archive binary file size distribution of completed |
51 // archives. | 53 // archives. |
52 // | 54 // |
53 // The "archive" field is the specific type of archive (entries, index,
data) | 55 // The "archive" field is the specific type of archive (entries, index,
data) |
54 // that is being tracked. | 56 // that is being tracked. |
55 // | 57 // |
56 // The "stream" field is the type of log stream that is being archived. | 58 // The "stream" field is the type of log stream that is being archived. |
57 tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size
", | 59 tsSize = metric.NewCumulativeDistribution("logdog/archivist/archive/size
", |
58 "The size (in bytes) of each archive file.", | 60 "The size (in bytes) of each archive file.", |
| 61 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
59 distribution.DefaultBucketer, | 62 distribution.DefaultBucketer, |
60 field.String("archive"), | 63 field.String("archive"), |
61 field.String("stream")) | 64 field.String("stream")) |
62 | 65 |
63 // tsTotalBytes tracks the cumulative total number of bytes that have | 66 // tsTotalBytes tracks the cumulative total number of bytes that have |
64 // been archived by this instance. | 67 // been archived by this instance. |
65 // | 68 // |
66 // The "archive" field is the specific type of archive (entries, index,
data) | 69 // The "archive" field is the specific type of archive (entries, index,
data) |
67 // that is being tracked. | 70 // that is being tracked. |
68 // | 71 // |
69 // The "stream" field is the type of log stream that is being archived. | 72 // The "stream" field is the type of log stream that is being archived. |
70 tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes", | 73 tsTotalBytes = metric.NewCounter("logdog/archivist/archive/total_bytes", |
71 "The total number of archived bytes.", | 74 "The total number of archived bytes.", |
| 75 tsmon_types.MetricMetadata{Units: tsmon_types.Bytes}, |
72 field.String("archive"), | 76 field.String("archive"), |
73 field.String("stream")) | 77 field.String("stream")) |
74 | 78 |
75 // tsLogEntries tracks the number of log entries per individual | 79 // tsLogEntries tracks the number of log entries per individual |
76 // archival. | 80 // archival. |
77 // | 81 // |
78 // The "stream" field is the type of log stream that is being archived. | 82 // The "stream" field is the type of log stream that is being archived. |
79 tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archiv
e/log_entries", | 83 tsLogEntries = metric.NewCumulativeDistribution("logdog/archivist/archiv
e/log_entries", |
80 "The total number of log entries per archive.", | 84 "The total number of log entries per archive.", |
| 85 tsmon_types.MetricMetadata{}, |
81 distribution.DefaultBucketer, | 86 distribution.DefaultBucketer, |
82 field.String("stream")) | 87 field.String("stream")) |
83 | 88 |
84 // tsTotalLogEntries tracks the total number of log entries that have | 89 // tsTotalLogEntries tracks the total number of log entries that have |
85 // been archived by this instance. | 90 // been archived by this instance. |
86 // | 91 // |
87 // The "stream" field is the type of log stream that is being archived. | 92 // The "stream" field is the type of log stream that is being archived. |
88 tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_lo
g_entries", | 93 tsTotalLogEntries = metric.NewCounter("logdog/archivist/archive/total_lo
g_entries", |
89 "The total number of log entries.", | 94 "The total number of log entries.", |
| 95 tsmon_types.MetricMetadata{}, |
90 field.String("stream")) | 96 field.String("stream")) |
91 ) | 97 ) |
92 | 98 |
93 // Task is a single archive task. | 99 // Task is a single archive task. |
94 type Task interface { | 100 type Task interface { |
95 // UniqueID returns a task-unique value. Other tasks, and other retries
of | 101 // UniqueID returns a task-unique value. Other tasks, and other retries
of |
96 // this task, should (try to) not reuse this ID. | 102 // this task, should (try to) not reuse this ID. |
97 UniqueID() string | 103 UniqueID() string |
98 | 104 |
99 // Task is the archive task to execute. | 105 // Task is the archive task to execute. |
(...skipping 673 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
773 return e.inner | 779 return e.inner |
774 } | 780 } |
775 | 781 |
776 func isFailure(err error) bool { | 782 func isFailure(err error) bool { |
777 if err == nil { | 783 if err == nil { |
778 return false | 784 return false |
779 } | 785 } |
780 _, ok := err.(*statusErrorWrapper) | 786 _, ok := err.(*statusErrorWrapper) |
781 return !ok | 787 return !ok |
782 } | 788 } |
OLD | NEW |