OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "strings" | 9 "strings" |
10 "sync" | 10 "sync" |
11 "testing" | 11 "testing" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/luci/luci-go/common/clock/testclock" | 14 "github.com/luci/luci-go/common/clock/testclock" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/gcloud/gs" | 16 "github.com/luci/luci-go/common/gcloud/gs" |
17 "github.com/luci/luci-go/common/proto/google" | 17 "github.com/luci/luci-go/common/proto/google" |
| 18 "github.com/luci/luci-go/common/retry/transient" |
18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 19 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
19 "github.com/luci/luci-go/logdog/api/logpb" | 20 "github.com/luci/luci-go/logdog/api/logpb" |
20 "github.com/luci/luci-go/logdog/common/storage" | 21 "github.com/luci/luci-go/logdog/common/storage" |
21 "github.com/luci/luci-go/logdog/common/storage/memory" | 22 "github.com/luci/luci-go/logdog/common/storage/memory" |
22 "github.com/luci/luci-go/logdog/common/types" | 23 "github.com/luci/luci-go/logdog/common/types" |
23 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 24 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
24 | 25 |
25 "github.com/golang/protobuf/proto" | 26 "github.com/golang/protobuf/proto" |
26 "github.com/golang/protobuf/ptypes/empty" | 27 "github.com/golang/protobuf/ptypes/empty" |
27 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
(...skipping 435 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 TerminalIndex: 3, | 464 TerminalIndex: 3, |
464 | 465 |
465 StreamUrl: gsURL(project, "logstream.ent
ries"), | 466 StreamUrl: gsURL(project, "logstream.ent
ries"), |
466 IndexUrl: gsURL(project, "logstream.ind
ex"), | 467 IndexUrl: gsURL(project, "logstream.ind
ex"), |
467 DataUrl: gsURL(project, "data.bin"), | 468 DataUrl: gsURL(project, "data.bin"), |
468 }) | 469 }) |
469 }) | 470 }) |
470 | 471 |
471 Convey(`When a transient archival error occurs, will not
consume the task.`, func() { | 472 Convey(`When a transient archival error occurs, will not
consume the task.`, func() { |
472 addTestEntry(project, 0, 1, 2, 3, 4) | 473 addTestEntry(project, 0, 1, 2, 3, 4) |
473 » » » » gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } | 474 » » » » gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.New("test error", transient.Tag) } |
474 | 475 |
475 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "
test error") | 476 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "
test error") |
476 So(task.consumed, ShouldBeFalse) | 477 So(task.consumed, ShouldBeFalse) |
477 }) | 478 }) |
478 | 479 |
479 Convey(`When a non-transient archival error occurs`, fun
c() { | 480 Convey(`When a non-transient archival error occurs`, fun
c() { |
480 addTestEntry(project, 0, 1, 2, 3, 4) | 481 addTestEntry(project, 0, 1, 2, 3, 4) |
481 archiveErr := errors.New("archive failure error"
) | 482 archiveErr := errors.New("archive failure error"
) |
482 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } | 483 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } |
483 | 484 |
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
663 addTestEntry(project, 0, 1, 2, 3) | 664 addTestEntry(project, 0, 1, 2, 3) |
664 | 665 |
665 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 666 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
666 for _, testCase := range []struct { | 667 for _, testCase := range []struct { |
667 name string | 668 name string |
668 setup func() | 669 setup func() |
669 }{ | 670 }{ |
670 {"writer create failure", func() { | 671 {"writer create failure", func() { |
671 gsc.newWriterErr = func(w *testG
SWriter) error { | 672 gsc.newWriterErr = func(w *testG
SWriter) error { |
672 if strings.HasSuffix(str
ing(w.path), failName) { | 673 if strings.HasSuffix(str
ing(w.path), failName) { |
673 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) | 674 » » » » » » » » return errors.Ne
w("test error", transient.Tag) |
674 } | 675 } |
675 return nil | 676 return nil |
676 } | 677 } |
677 }}, | 678 }}, |
678 | 679 |
679 {"write failure", func() { | 680 {"write failure", func() { |
680 gsc.newWriterErr = func(w *testG
SWriter) error { | 681 gsc.newWriterErr = func(w *testG
SWriter) error { |
681 if strings.HasSuffix(str
ing(w.path), failName) { | 682 if strings.HasSuffix(str
ing(w.path), failName) { |
682 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) | 683 » » » » » » » » w.writeErr = err
ors.New("test error", transient.Tag) |
683 } | 684 } |
684 return nil | 685 return nil |
685 } | 686 } |
686 }}, | 687 }}, |
687 | 688 |
688 {"rename failure", func() { | 689 {"rename failure", func() { |
689 gsc.renameErr = func(src, dst gs
.Path) error { | 690 gsc.renameErr = func(src, dst gs
.Path) error { |
690 if strings.HasSuffix(str
ing(src), failName) { | 691 if strings.HasSuffix(str
ing(src), failName) { |
691 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) | 692 » » » » » » » » return errors.Ne
w("test error", transient.Tag) |
692 } | 693 } |
693 return nil | 694 return nil |
694 } | 695 } |
695 }}, | 696 }}, |
696 | 697 |
697 {"close failure", func() { | 698 {"close failure", func() { |
698 gsc.newWriterErr = func(w *testG
SWriter) error { | 699 gsc.newWriterErr = func(w *testG
SWriter) error { |
699 if strings.HasSuffix(str
ing(w.path), failName) { | 700 if strings.HasSuffix(str
ing(w.path), failName) { |
700 » » » » » » » » w.closeErr = err
ors.WrapTransient(errors.New("test error")) | 701 » » » » » » » » w.closeErr = err
ors.New("test error", transient.Tag) |
701 } | 702 } |
702 return nil | 703 return nil |
703 } | 704 } |
704 }}, | 705 }}, |
705 | 706 |
706 {"delete failure after other failure", f
unc() { | 707 {"delete failure after other failure", f
unc() { |
707 // Simulate a write failure. Thi
s is the error that will actually | 708 // Simulate a write failure. Thi
s is the error that will actually |
708 // be returned. | 709 // be returned. |
709 gsc.newWriterErr = func(w *testG
SWriter) error { | 710 gsc.newWriterErr = func(w *testG
SWriter) error { |
710 if strings.HasSuffix(str
ing(w.path), failName) { | 711 if strings.HasSuffix(str
ing(w.path), failName) { |
711 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) | 712 » » » » » » » » w.writeErr = err
ors.New("test error", transient.Tag) |
712 } | 713 } |
713 return nil | 714 return nil |
714 } | 715 } |
715 | 716 |
716 // This will trigger whe NewWrit
er fails from the above | 717 // This will trigger whe NewWrit
er fails from the above |
717 // instrumentation. | 718 // instrumentation. |
718 gsc.deleteErr = func(p gs.Path)
error { | 719 gsc.deleteErr = func(p gs.Path)
error { |
719 if strings.HasSuffix(str
ing(p), failName) { | 720 if strings.HasSuffix(str
ing(p), failName) { |
720 return errors.Ne
w("other error") | 721 return errors.Ne
w("other error") |
721 } | 722 } |
722 return nil | 723 return nil |
723 } | 724 } |
724 }}, | 725 }}, |
725 } { | 726 } { |
726 Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { | 727 Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { |
727 testCase.setup() | 728 testCase.setup() |
728 | 729 |
729 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") | 730 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") |
730 So(task.consumed, ShouldBeFalse) | 731 So(task.consumed, ShouldBeFalse) |
731 So(archiveRequest, ShouldBeNil) | 732 So(archiveRequest, ShouldBeNil) |
732 }) | 733 }) |
733 } | 734 } |
734 } | 735 } |
735 }) | 736 }) |
736 }) | 737 }) |
737 } | 738 } |
OLD | NEW |