| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/config" | |
| 14 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 15 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/sync/parallel" | 15 "github.com/luci/luci-go/common/sync/parallel" |
| 17 "github.com/luci/luci-go/common/tsmon/distribution" | 16 "github.com/luci/luci-go/common/tsmon/distribution" |
| 18 "github.com/luci/luci-go/common/tsmon/field" | 17 "github.com/luci/luci-go/common/tsmon/field" |
| 19 "github.com/luci/luci-go/common/tsmon/metric" | 18 "github.com/luci/luci-go/common/tsmon/metric" |
| 20 tsmon_types "github.com/luci/luci-go/common/tsmon/types" | 19 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
| 21 "github.com/luci/luci-go/logdog/api/logpb" | 20 "github.com/luci/luci-go/logdog/api/logpb" |
| 22 "github.com/luci/luci-go/logdog/client/butlerproto" | 21 "github.com/luci/luci-go/logdog/client/butlerproto" |
| 23 "github.com/luci/luci-go/logdog/common/storage" | 22 "github.com/luci/luci-go/logdog/common/storage" |
| 24 "github.com/luci/luci-go/logdog/common/types" | 23 "github.com/luci/luci-go/logdog/common/types" |
| 25 "github.com/luci/luci-go/logdog/server/collector/coordinator" | 24 "github.com/luci/luci-go/logdog/server/collector/coordinator" |
| 25 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 26 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 27 ) | 27 ) |
| 28 | 28 |
| 29 const ( | 29 const ( |
| 30 // DefaultMaxMessageWorkers is the default number of concurrent worker | 30 // DefaultMaxMessageWorkers is the default number of concurrent worker |
| 31 // goroutones to employ for a single message. | 31 // goroutones to employ for a single message. |
| 32 DefaultMaxMessageWorkers = 4 | 32 DefaultMaxMessageWorkers = 4 |
| 33 ) | 33 ) |
| 34 | 34 |
| 35 var ( | 35 var ( |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 155 fields.Infof(ctx, "Processing log bundle entry.") | 155 fields.Infof(ctx, "Processing log bundle entry.") |
| 156 } | 156 } |
| 157 } | 157 } |
| 158 | 158 |
| 159 lw := bundleHandler{ | 159 lw := bundleHandler{ |
| 160 msg: msg, | 160 msg: msg, |
| 161 md: pr.Metadata, | 161 md: pr.Metadata, |
| 162 b: pr.Bundle, | 162 b: pr.Bundle, |
| 163 } | 163 } |
| 164 | 164 |
| 165 » lw.project = config.ProjectName(lw.b.Project) | 165 » lw.project = cfgtypes.ProjectName(lw.b.Project) |
| 166 if err := lw.project.Validate(); err != nil { | 166 if err := lw.project.Validate(); err != nil { |
| 167 log.Fields{ | 167 log.Fields{ |
| 168 log.ErrorKey: err, | 168 log.ErrorKey: err, |
| 169 "project": lw.b.Project, | 169 "project": lw.b.Project, |
| 170 }.Errorf(ctx, "Failed to validate bundle project name.") | 170 }.Errorf(ctx, "Failed to validate bundle project name.") |
| 171 return errors.New("invalid bundle project name") | 171 return errors.New("invalid bundle project name") |
| 172 } | 172 } |
| 173 | 173 |
| 174 if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { | 174 if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { |
| 175 log.Fields{ | 175 log.Fields{ |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 213 // value for log processing. | 213 // value for log processing. |
| 214 type bundleHandler struct { | 214 type bundleHandler struct { |
| 215 // msg is the original message bytes. | 215 // msg is the original message bytes. |
| 216 msg []byte | 216 msg []byte |
| 217 // md is the metadata associated with the overall message. | 217 // md is the metadata associated with the overall message. |
| 218 md *logpb.ButlerMetadata | 218 md *logpb.ButlerMetadata |
| 219 // b is the Butler bundle. | 219 // b is the Butler bundle. |
| 220 b *logpb.ButlerLogBundle | 220 b *logpb.ButlerLogBundle |
| 221 | 221 |
| 222 // project is the validated project name. | 222 // project is the validated project name. |
| 223 » project config.ProjectName | 223 » project cfgtypes.ProjectName |
| 224 } | 224 } |
| 225 | 225 |
| 226 type bundleEntryHandler struct { | 226 type bundleEntryHandler struct { |
| 227 *bundleHandler | 227 *bundleHandler |
| 228 | 228 |
| 229 // be is the Bundle entry. | 229 // be is the Bundle entry. |
| 230 be *logpb.ButlerLogBundle_Entry | 230 be *logpb.ButlerLogBundle_Entry |
| 231 // path is the constructed path of the stream being processed. | 231 // path is the constructed path of the stream being processed. |
| 232 path types.StreamPath | 232 path types.StreamPath |
| 233 } | 233 } |
| (...skipping 212 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 446 } | 446 } |
| 447 }) | 447 }) |
| 448 } | 448 } |
| 449 | 449 |
| 450 func streamType(desc *logpb.LogStreamDescriptor) string { | 450 func streamType(desc *logpb.LogStreamDescriptor) string { |
| 451 if desc == nil { | 451 if desc == nil { |
| 452 return "UNKNOWN" | 452 return "UNKNOWN" |
| 453 } | 453 } |
| 454 return desc.StreamType.String() | 454 return desc.StreamType.String() |
| 455 } | 455 } |
| OLD | NEW |