OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "fmt" | 10 "fmt" |
11 "io" | 11 "io" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
16 | 16 |
17 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
18 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
19 "github.com/luci/luci-go/common/gcloud/gs" | 19 "github.com/luci/luci-go/common/gcloud/gs" |
20 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
21 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
| 22 "github.com/luci/luci-go/common/retry/transient" |
22 "github.com/luci/luci-go/common/sync/parallel" | 23 "github.com/luci/luci-go/common/sync/parallel" |
23 "github.com/luci/luci-go/common/tsmon/distribution" | 24 "github.com/luci/luci-go/common/tsmon/distribution" |
24 "github.com/luci/luci-go/common/tsmon/field" | 25 "github.com/luci/luci-go/common/tsmon/field" |
25 "github.com/luci/luci-go/common/tsmon/metric" | 26 "github.com/luci/luci-go/common/tsmon/metric" |
26 tsmon_types "github.com/luci/luci-go/common/tsmon/types" | 27 tsmon_types "github.com/luci/luci-go/common/tsmon/types" |
27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 28 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
28 "github.com/luci/luci-go/logdog/api/logpb" | 29 "github.com/luci/luci-go/logdog/api/logpb" |
29 "github.com/luci/luci-go/logdog/common/archive" | 30 "github.com/luci/luci-go/logdog/common/archive" |
30 "github.com/luci/luci-go/logdog/common/storage" | 31 "github.com/luci/luci-go/logdog/common/storage" |
31 "github.com/luci/luci-go/logdog/common/types" | 32 "github.com/luci/luci-go/logdog/common/types" |
(...skipping 305 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
337 } | 338 } |
338 | 339 |
339 // Archive to staging. | 340 // Archive to staging. |
340 // | 341 // |
341 // If a non-transient failure occurs here, we will report it to the Arch
ivist | 342 // If a non-transient failure occurs here, we will report it to the Arch
ivist |
342 // under the assumption that it will continue occurring. | 343 // under the assumption that it will continue occurring. |
343 // | 344 // |
344 // We will handle error creating the plan and executing the plan in the
same | 345 // We will handle error creating the plan and executing the plan in the
same |
345 // switch statement below. | 346 // switch statement below. |
346 switch err = staged.stage(c); { | 347 switch err = staged.stage(c); { |
347 » case errors.IsTransient(err): | 348 » case transient.Tag.In(err): |
348 // If this is a transient error, exit immediately and do not del
ete the | 349 // If this is a transient error, exit immediately and do not del
ete the |
349 // archival task. | 350 // archival task. |
350 log.WithError(err).Warningf(c, "TRANSIENT error during archival
operation.") | 351 log.WithError(err).Warningf(c, "TRANSIENT error during archival
operation.") |
351 return err | 352 return err |
352 | 353 |
353 case err != nil: | 354 case err != nil: |
354 // This is a non-transient error, so we are confident that any f
uture | 355 // This is a non-transient error, so we are confident that any f
uture |
355 // Archival will also encounter this error. We will mark this ar
chival | 356 // Archival will also encounter this error. We will mark this ar
chival |
356 // as an error and report it to the Coordinator. | 357 // as an error and report it to the Coordinator. |
357 log.WithError(err).Errorf(c, "Archival failed with non-transient
error.") | 358 log.WithError(err).Errorf(c, "Archival failed with non-transient
error.") |
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
572 "streamURL": sa.stream.staged, | 573 "streamURL": sa.stream.staged, |
573 "indexURL": sa.index.staged, | 574 "indexURL": sa.index.staged, |
574 "dataURL": sa.data.staged, | 575 "dataURL": sa.data.staged, |
575 }.Debugf(c, "Staging log stream...") | 576 }.Debugf(c, "Staging log stream...") |
576 | 577 |
577 // Group any transient errors that occur during cleanup. If we aren't | 578 // Group any transient errors that occur during cleanup. If we aren't |
578 // returning a non-transient error, return a transient "terr". | 579 // returning a non-transient error, return a transient "terr". |
579 var terr errors.MultiError | 580 var terr errors.MultiError |
580 defer func() { | 581 defer func() { |
581 if err == nil && len(terr) > 0 { | 582 if err == nil && len(terr) > 0 { |
582 » » » err = errors.WrapTransient(terr) | 583 » » » err = transient.Tag.Apply(terr) |
583 } | 584 } |
584 }() | 585 }() |
585 | 586 |
586 // Close our writers on exit. If any of them fail to close, mark the arc
hival | 587 // Close our writers on exit. If any of them fail to close, mark the arc
hival |
587 // as a transient failure. | 588 // as a transient failure. |
588 closeWriter := func(closer io.Closer, path gs.Path) { | 589 closeWriter := func(closer io.Closer, path gs.Path) { |
589 // Close the Writer. If this results in an error, append it to o
ur transient | 590 // Close the Writer. If this results in an error, append it to o
ur transient |
590 // error MultiError. | 591 // error MultiError. |
591 if ierr := closer.Close(); ierr != nil { | 592 if ierr := closer.Close(); ierr != nil { |
592 terr = append(terr, ierr) | 593 terr = append(terr, ierr) |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
799 return e.inner | 800 return e.inner |
800 } | 801 } |
801 | 802 |
802 func isFailure(err error) bool { | 803 func isFailure(err error) bool { |
803 if err == nil { | 804 if err == nil { |
804 return false | 805 return false |
805 } | 806 } |
806 _, ok := err.(*statusErrorWrapper) | 807 _, ok := err.(*statusErrorWrapper) |
807 return !ok | 808 return !ok |
808 } | 809 } |
OLD | NEW |