| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/config" |
| 12 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/logdog/butlerproto" | 14 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 14 "github.com/luci/luci-go/common/logdog/types" | 15 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 19 "github.com/luci/luci-go/server/logdog/storage" | 20 "github.com/luci/luci-go/server/logdog/storage" |
| 20 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 21 ) | 22 ) |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 74 if pr.Bundle == nil { | 75 if pr.Bundle == nil { |
| 75 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") | 76 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") |
| 76 return nil | 77 return nil |
| 77 } | 78 } |
| 78 | 79 |
| 79 // If we're logging INFO or higher, log the ranges that this bundle | 80 // If we're logging INFO or higher, log the ranges that this bundle |
| 80 // represents. | 81 // represents. |
| 81 if log.IsLogging(ctx, log.Info) { | 82 if log.IsLogging(ctx, log.Info) { |
| 82 for i, entry := range pr.Bundle.Entries { | 83 for i, entry := range pr.Bundle.Entries { |
| 83 fields := log.Fields{ | 84 fields := log.Fields{ |
| 84 » » » » "index": i, | 85 » » » » "index": i, |
| 85 » » » » "path": entry.GetDesc().Path(), | 86 » » » » "project": pr.Bundle.Project, |
| 87 » » » » "path": entry.GetDesc().Path(), |
| 86 } | 88 } |
| 87 if entry.Terminal { | 89 if entry.Terminal { |
| 88 fields["terminalIndex"] = entry.TerminalIndex | 90 fields["terminalIndex"] = entry.TerminalIndex |
| 89 } | 91 } |
| 90 if logs := entry.GetLogs(); len(logs) > 0 { | 92 if logs := entry.GetLogs(); len(logs) > 0 { |
| 91 fields["logStart"] = logs[0].StreamIndex | 93 fields["logStart"] = logs[0].StreamIndex |
| 92 fields["logEnd"] = logs[len(logs)-1].StreamIndex | 94 fields["logEnd"] = logs[len(logs)-1].StreamIndex |
| 93 } | 95 } |
| 94 | 96 |
| 95 fields.Infof(ctx, "Processing log bundle entry.") | 97 fields.Infof(ctx, "Processing log bundle entry.") |
| 96 } | 98 } |
| 97 } | 99 } |
| 98 | 100 |
| 99 // If there are no entries, there is nothing to do. | |
| 100 if len(pr.Bundle.Entries) == 0 { | |
| 101 return nil | |
| 102 } | |
| 103 | |
| 104 lw := bundleHandler{ | 101 lw := bundleHandler{ |
| 105 msg: msg, | 102 msg: msg, |
| 106 md: pr.Metadata, | 103 md: pr.Metadata, |
| 107 b: pr.Bundle, | 104 b: pr.Bundle, |
| 108 } | 105 } |
| 109 | 106 |
| 107 if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil { |
| 108 log.Fields{ |
| 109 log.ErrorKey: err, |
| 110 "secretLength": len(lw.b.Secret), |
| 111 }.Errorf(ctx, "Failed to validate prefix secret.") |
| 112 return errors.New("invalid prefix secret") |
| 113 } |
| 114 |
| 115 // TODO(dnj): Make this actually an fatal error, once project becomes |
| 116 // required. |
| 117 if lw.b.Project != "" { |
| 118 lw.project = config.ProjectName(lw.b.Project) |
| 119 if err := lw.project.Validate(); err != nil { |
| 120 log.Fields{ |
| 121 log.ErrorKey: err, |
| 122 "project": lw.b.Project, |
| 123 }.Errorf(ctx, "Failed to validate bundle project name.") |
| 124 return errors.New("invalid bundle project name") |
| 125 } |
| 126 } |
| 127 |
| 128 if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { |
| 129 log.Fields{ |
| 130 log.ErrorKey: err, |
| 131 "prefix": lw.b.Prefix, |
| 132 }.Errorf(ctx, "Failed to validate bundle prefix.") |
| 133 return errors.New("invalid bundle prefix") |
| 134 } |
| 135 |
| 136 // If there are no entries, there is nothing to do. |
| 137 if len(pr.Bundle.Entries) == 0 { |
| 138 return nil |
| 139 } |
| 140 |
| 110 // Handle each bundle entry in parallel. We will use a separate work poo
l | 141 // Handle each bundle entry in parallel. We will use a separate work poo
l |
| 111 // here so that top-level bundle dispatch can't deadlock the processing
tasks. | 142 // here so that top-level bundle dispatch can't deadlock the processing
tasks. |
| 112 workers := c.MaxMessageWorkers | 143 workers := c.MaxMessageWorkers |
| 113 if workers <= 0 { | 144 if workers <= 0 { |
| 114 workers = DefaultMaxMessageWorkers | 145 workers = DefaultMaxMessageWorkers |
| 115 } | 146 } |
| 116 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { | 147 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { |
| 117 for _, be := range pr.Bundle.Entries { | 148 for _, be := range pr.Bundle.Entries { |
| 118 be := be | 149 be := be |
| 119 | 150 |
| (...skipping 22 matching lines...) Expand all Loading... |
| 142 | 173 |
| 143 // bundleHandler is a cumulative set of read-only state passed around by | 174 // bundleHandler is a cumulative set of read-only state passed around by |
| 144 // value for log processing. | 175 // value for log processing. |
| 145 type bundleHandler struct { | 176 type bundleHandler struct { |
| 146 // msg is the original message bytes. | 177 // msg is the original message bytes. |
| 147 msg []byte | 178 msg []byte |
| 148 // md is the metadata associated with the overall message. | 179 // md is the metadata associated with the overall message. |
| 149 md *logpb.ButlerMetadata | 180 md *logpb.ButlerMetadata |
| 150 // b is the Butler bundle. | 181 // b is the Butler bundle. |
| 151 b *logpb.ButlerLogBundle | 182 b *logpb.ButlerLogBundle |
| 183 |
| 184 // project is the validated project name. |
| 185 project config.ProjectName |
| 152 } | 186 } |
| 153 | 187 |
| 154 type bundleEntryHandler struct { | 188 type bundleEntryHandler struct { |
| 155 *bundleHandler | 189 *bundleHandler |
| 156 | 190 |
| 157 // be is the Bundle entry. | 191 // be is the Bundle entry. |
| 158 be *logpb.ButlerLogBundle_Entry | 192 be *logpb.ButlerLogBundle_Entry |
| 159 // path is the constructed path of the stream being processed. | 193 // path is the constructed path of the stream being processed. |
| 160 path types.StreamPath | 194 path types.StreamPath |
| 161 } | 195 } |
| 162 | 196 |
| 163 // processLogStream processes an individual set of log messages belonging to the | 197 // processLogStream processes an individual set of log messages belonging to the |
| 164 // same log stream. | 198 // same log stream. |
| 165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
error { | 199 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
error { |
| 166 // If this bundle has neither log entries nor a terminal index, it is ju
nk and | 200 // If this bundle has neither log entries nor a terminal index, it is ju
nk and |
| 167 // must be discarded. | 201 // must be discarded. |
| 168 // | 202 // |
| 169 // This is more important than a basic optimization, as it enforces that
no | 203 // This is more important than a basic optimization, as it enforces that
no |
| 170 // zero-entry log streams can be ingested. Either some entries exist, or
there | 204 // zero-entry log streams can be ingested. Either some entries exist, or
there |
| 171 // is a promise of a terminal entry. | 205 // is a promise of a terminal entry. |
| 172 if len(h.be.Logs) == 0 && !h.be.Terminal { | 206 if len(h.be.Logs) == 0 && !h.be.Terminal { |
| 173 log.Warningf(ctx, "Bundle entry is non-terminal and contains no
logs; discarding.") | 207 log.Warningf(ctx, "Bundle entry is non-terminal and contains no
logs; discarding.") |
| 174 return nil | 208 return nil |
| 175 } | 209 } |
| 176 | 210 |
| 211 // If the descriptor has a Prefix, it must match the bundle's Prefix. |
| 212 if p := h.be.Desc.Prefix; p != "" { |
| 213 if p != h.b.Prefix { |
| 214 log.Fields{ |
| 215 "bundlePrefix": h.b.Prefix, |
| 216 "bundleEntryPrefix": p, |
| 217 }.Errorf(ctx, "Bundle prefix does not match entry prefix
.") |
| 218 return errors.New("mismatched bundle and entry prefixes"
) |
| 219 } |
| 220 } else { |
| 221 // Fill in the bundle's Prefix. |
| 222 h.be.Desc.Prefix = h.b.Prefix |
| 223 } |
| 224 |
| 177 if err := h.be.Desc.Validate(true); err != nil { | 225 if err := h.be.Desc.Validate(true); err != nil { |
| 178 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") | 226 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") |
| 179 return err | 227 return err |
| 180 } | 228 } |
| 181 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D
esc.Name)) | 229 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D
esc.Name)) |
| 182 » ctx = log.SetField(ctx, "path", h.path) | 230 » ctx = log.SetFields(ctx, log.Fields{ |
| 183 | 231 » » "project": h.project, |
| 184 » if len(h.be.Secret) == 0 { | 232 » » "path": h.path, |
| 185 » » log.Errorf(ctx, "Missing secret.") | 233 » }) |
| 186 » » return errors.New("missing stream secret") | |
| 187 » } | |
| 188 | 234 |
| 189 // Confirm that the log entries are valid and contiguous. Serialize the
log | 235 // Confirm that the log entries are valid and contiguous. Serialize the
log |
| 190 // entries for ingest as we validate them. | 236 // entries for ingest as we validate them. |
| 191 var logData [][]byte | 237 var logData [][]byte |
| 192 var blockIndex uint64 | 238 var blockIndex uint64 |
| 193 if logs := h.be.Logs; len(logs) > 0 { | 239 if logs := h.be.Logs; len(logs) > 0 { |
| 194 logData = make([][]byte, len(logs)) | 240 logData = make([][]byte, len(logs)) |
| 195 blockIndex = logs[0].StreamIndex | 241 blockIndex = logs[0].StreamIndex |
| 196 | 242 |
| 197 for i, le := range logs { | 243 for i, le := range logs { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 223 }.Errorf(ctx, "Failed to marshal log entry.") | 269 }.Errorf(ctx, "Failed to marshal log entry.") |
| 224 return errors.New("failed to marshal log entries
") | 270 return errors.New("failed to marshal log entries
") |
| 225 } | 271 } |
| 226 } | 272 } |
| 227 } | 273 } |
| 228 | 274 |
| 229 // Fetch our cached/remote state. This will replace our state object wit
h the | 275 // Fetch our cached/remote state. This will replace our state object wit
h the |
| 230 // fetched state, so any future calls will need to re-set the Secret val
ue. | 276 // fetched state, so any future calls will need to re-set the Secret val
ue. |
| 231 // TODO: Use timeout? | 277 // TODO: Use timeout? |
| 232 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ | 278 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ |
| 279 Project: h.project, |
| 233 Path: h.path, | 280 Path: h.path, |
| 234 » » Secret: types.PrefixSecret(h.be.Secret), | 281 » » Secret: types.PrefixSecret(h.b.Secret), |
| 235 ProtoVersion: h.md.ProtoVersion, | 282 ProtoVersion: h.md.ProtoVersion, |
| 236 }, h.be.Desc) | 283 }, h.be.Desc) |
| 237 if err != nil { | 284 if err != nil { |
| 238 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") | 285 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") |
| 239 return err | 286 return err |
| 240 } | 287 } |
| 241 | 288 |
| 242 // Does the log stream's secret match the expected secret? | 289 // Does the log stream's secret match the expected secret? |
| 243 » if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { | 290 » if !bytes.Equal(h.b.Secret, []byte(state.Secret)) { |
| 244 log.Errorf(log.SetFields(ctx, log.Fields{ | 291 log.Errorf(log.SetFields(ctx, log.Fields{ |
| 245 » » » "secret": h.be.Secret, | 292 » » » "secret": h.b.Secret, |
| 246 "expectedSecret": state.Secret, | 293 "expectedSecret": state.Secret, |
| 247 }), "Log entry has incorrect secret.") | 294 }), "Log entry has incorrect secret.") |
| 248 return nil | 295 return nil |
| 249 } | 296 } |
| 250 | 297 |
| 251 if state.Archived { | 298 if state.Archived { |
| 252 log.Infof(ctx, "Skipping message bundle for archived stream.") | 299 log.Infof(ctx, "Skipping message bundle for archived stream.") |
| 253 return nil | 300 return nil |
| 254 } | 301 } |
| 255 if state.Purged { | 302 if state.Purged { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 279 } | 326 } |
| 280 | 327 |
| 281 // Perform stream processing operations. We can do these operations in | 328 // Perform stream processing operations. We can do these operations in |
| 282 // parallel. | 329 // parallel. |
| 283 return parallel.FanOutIn(func(taskC chan<- func() error) { | 330 return parallel.FanOutIn(func(taskC chan<- func() error) { |
| 284 // Store log data, if any was provided. It has already been vali
dated. | 331 // Store log data, if any was provided. It has already been vali
dated. |
| 285 if len(logData) > 0 { | 332 if len(logData) > 0 { |
| 286 taskC <- func() error { | 333 taskC <- func() error { |
| 287 // Post the log to storage. | 334 // Post the log to storage. |
| 288 err = c.Storage.Put(storage.PutRequest{ | 335 err = c.Storage.Put(storage.PutRequest{ |
| 289 » » » » » Path: h.path, | 336 » » » » » Project: h.project, |
| 290 » » » » » Index: types.MessageIndex(blockIndex), | 337 » » » » » Path: h.path, |
| 291 » » » » » Values: logData, | 338 » » » » » Index: types.MessageIndex(blockIndex), |
| 339 » » » » » Values: logData, |
| 292 }) | 340 }) |
| 293 | 341 |
| 294 // If the log entry already exists, consider the
"put" successful. | 342 // If the log entry already exists, consider the
"put" successful. |
| 295 // Storage will return a transient error if one
occurred. | 343 // Storage will return a transient error if one
occurred. |
| 296 if err != nil && err != storage.ErrExists { | 344 if err != nil && err != storage.ErrExists { |
| 297 log.Fields{ | 345 log.Fields{ |
| 298 log.ErrorKey: err, | 346 log.ErrorKey: err, |
| 299 "blockIndex": blockIndex, | 347 "blockIndex": blockIndex, |
| 300 }.Errorf(ctx, "Failed to load log entry
into Storage.") | 348 }.Errorf(ctx, "Failed to load log entry
into Storage.") |
| 301 return err | 349 return err |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 337 for _, e := range merr { | 385 for _, e := range merr { |
| 338 if hasTransientError(e) { | 386 if hasTransientError(e) { |
| 339 return true | 387 return true |
| 340 } | 388 } |
| 341 } | 389 } |
| 342 return false | 390 return false |
| 343 } | 391 } |
| 344 | 392 |
| 345 return errors.IsTransient(err) | 393 return errors.IsTransient(err) |
| 346 } | 394 } |
| OLD | NEW |