| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "strings" | 9 "strings" |
| 10 "sync" | 10 "sync" |
| 11 "testing" | 11 "testing" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/luci/luci-go/common/clock/testclock" | 14 "github.com/luci/luci-go/common/clock/testclock" |
| 15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/gcloud/gs" | 16 "github.com/luci/luci-go/common/gcloud/gs" |
| 17 "github.com/luci/luci-go/common/proto/google" | 17 "github.com/luci/luci-go/common/proto/google" |
| 18 "github.com/luci/luci-go/common/retry" |
| 18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 19 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 19 "github.com/luci/luci-go/logdog/api/logpb" | 20 "github.com/luci/luci-go/logdog/api/logpb" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 21 "github.com/luci/luci-go/logdog/common/storage" |
| 21 "github.com/luci/luci-go/logdog/common/storage/memory" | 22 "github.com/luci/luci-go/logdog/common/storage/memory" |
| 22 "github.com/luci/luci-go/logdog/common/types" | 23 "github.com/luci/luci-go/logdog/common/types" |
| 23 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 24 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 24 | 25 |
| 25 "github.com/golang/protobuf/proto" | 26 "github.com/golang/protobuf/proto" |
| 26 "github.com/golang/protobuf/ptypes/empty" | 27 "github.com/golang/protobuf/ptypes/empty" |
| 27 "golang.org/x/net/context" | 28 "golang.org/x/net/context" |
| (...skipping 435 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 463 TerminalIndex: 3, | 464 TerminalIndex: 3, |
| 464 | 465 |
| 465 StreamUrl: gsURL(project, "logstream.ent
ries"), | 466 StreamUrl: gsURL(project, "logstream.ent
ries"), |
| 466 IndexUrl: gsURL(project, "logstream.ind
ex"), | 467 IndexUrl: gsURL(project, "logstream.ind
ex"), |
| 467 DataUrl: gsURL(project, "data.bin"), | 468 DataUrl: gsURL(project, "data.bin"), |
| 468 }) | 469 }) |
| 469 }) | 470 }) |
| 470 | 471 |
| 471 Convey(`When a transient archival error occurs, will not
consume the task.`, func() { | 472 Convey(`When a transient archival error occurs, will not
consume the task.`, func() { |
| 472 addTestEntry(project, 0, 1, 2, 3, 4) | 473 addTestEntry(project, 0, 1, 2, 3, 4) |
| 473 » » » » gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } | 474 » » » » gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.New("test error", retry.Tag) } |
| 474 | 475 |
| 475 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "
test error") | 476 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "
test error") |
| 476 So(task.consumed, ShouldBeFalse) | 477 So(task.consumed, ShouldBeFalse) |
| 477 }) | 478 }) |
| 478 | 479 |
| 479 Convey(`When a non-transient archival error occurs`, fun
c() { | 480 Convey(`When a non-transient archival error occurs`, fun
c() { |
| 480 addTestEntry(project, 0, 1, 2, 3, 4) | 481 addTestEntry(project, 0, 1, 2, 3, 4) |
| 481 archiveErr := errors.New("archive failure error"
) | 482 archiveErr := errors.New("archive failure error"
) |
| 482 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } | 483 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } |
| 483 | 484 |
| (...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 663 addTestEntry(project, 0, 1, 2, 3) | 664 addTestEntry(project, 0, 1, 2, 3) |
| 664 | 665 |
| 665 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 666 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
| 666 for _, testCase := range []struct { | 667 for _, testCase := range []struct { |
| 667 name string | 668 name string |
| 668 setup func() | 669 setup func() |
| 669 }{ | 670 }{ |
| 670 {"writer create failure", func() { | 671 {"writer create failure", func() { |
| 671 gsc.newWriterErr = func(w *testG
SWriter) error { | 672 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 672 if strings.HasSuffix(str
ing(w.path), failName) { | 673 if strings.HasSuffix(str
ing(w.path), failName) { |
| 673 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) | 674 » » » » » » » » return errors.Ne
w("test error", retry.Tag) |
| 674 } | 675 } |
| 675 return nil | 676 return nil |
| 676 } | 677 } |
| 677 }}, | 678 }}, |
| 678 | 679 |
| 679 {"write failure", func() { | 680 {"write failure", func() { |
| 680 gsc.newWriterErr = func(w *testG
SWriter) error { | 681 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 681 if strings.HasSuffix(str
ing(w.path), failName) { | 682 if strings.HasSuffix(str
ing(w.path), failName) { |
| 682 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) | 683 » » » » » » » » w.writeErr = err
ors.New("test error", retry.Tag) |
| 683 } | 684 } |
| 684 return nil | 685 return nil |
| 685 } | 686 } |
| 686 }}, | 687 }}, |
| 687 | 688 |
| 688 {"rename failure", func() { | 689 {"rename failure", func() { |
| 689 gsc.renameErr = func(src, dst gs
.Path) error { | 690 gsc.renameErr = func(src, dst gs
.Path) error { |
| 690 if strings.HasSuffix(str
ing(src), failName) { | 691 if strings.HasSuffix(str
ing(src), failName) { |
| 691 » » » » » » » » return errors.Wr
apTransient(errors.New("test error")) | 692 » » » » » » » » return errors.Ne
w("test error", retry.Tag) |
| 692 } | 693 } |
| 693 return nil | 694 return nil |
| 694 } | 695 } |
| 695 }}, | 696 }}, |
| 696 | 697 |
| 697 {"close failure", func() { | 698 {"close failure", func() { |
| 698 gsc.newWriterErr = func(w *testG
SWriter) error { | 699 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 699 if strings.HasSuffix(str
ing(w.path), failName) { | 700 if strings.HasSuffix(str
ing(w.path), failName) { |
| 700 » » » » » » » » w.closeErr = err
ors.WrapTransient(errors.New("test error")) | 701 » » » » » » » » w.closeErr = err
ors.New("test error", retry.Tag) |
| 701 } | 702 } |
| 702 return nil | 703 return nil |
| 703 } | 704 } |
| 704 }}, | 705 }}, |
| 705 | 706 |
| 706 {"delete failure after other failure", f
unc() { | 707 {"delete failure after other failure", f
unc() { |
| 707 // Simulate a write failure. Thi
s is the error that will actually | 708 // Simulate a write failure. Thi
s is the error that will actually |
| 708 // be returned. | 709 // be returned. |
| 709 gsc.newWriterErr = func(w *testG
SWriter) error { | 710 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 710 if strings.HasSuffix(str
ing(w.path), failName) { | 711 if strings.HasSuffix(str
ing(w.path), failName) { |
| 711 » » » » » » » » w.writeErr = err
ors.WrapTransient(errors.New("test error")) | 712 » » » » » » » » w.writeErr = err
ors.New("test error", retry.Tag) |
| 712 } | 713 } |
| 713 return nil | 714 return nil |
| 714 } | 715 } |
| 715 | 716 |
| 716 // This will trigger whe NewWrit
er fails from the above | 717 // This will trigger whe NewWrit
er fails from the above |
| 717 // instrumentation. | 718 // instrumentation. |
| 718 gsc.deleteErr = func(p gs.Path)
error { | 719 gsc.deleteErr = func(p gs.Path)
error { |
| 719 if strings.HasSuffix(str
ing(p), failName) { | 720 if strings.HasSuffix(str
ing(p), failName) { |
| 720 return errors.Ne
w("other error") | 721 return errors.Ne
w("other error") |
| 721 } | 722 } |
| 722 return nil | 723 return nil |
| 723 } | 724 } |
| 724 }}, | 725 }}, |
| 725 } { | 726 } { |
| 726 Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { | 727 Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { |
| 727 testCase.setup() | 728 testCase.setup() |
| 728 | 729 |
| 729 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") | 730 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") |
| 730 So(task.consumed, ShouldBeFalse) | 731 So(task.consumed, ShouldBeFalse) |
| 731 So(archiveRequest, ShouldBeNil) | 732 So(archiveRequest, ShouldBeNil) |
| 732 }) | 733 }) |
| 733 } | 734 } |
| 734 } | 735 } |
| 735 }) | 736 }) |
| 736 }) | 737 }) |
| 737 } | 738 } |
| OLD | NEW |