| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "strings" | 9 "strings" |
| 10 "sync" | 10 "sync" |
| (...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 | 203 |
| 204 Convey(`A testing archive setup`, t, func() { | 204 Convey(`A testing archive setup`, t, func() { |
| 205 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeUTC) | 205 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeUTC) |
| 206 | 206 |
| 207 st := memory.Storage{} | 207 st := memory.Storage{} |
| 208 gsc := testGSClient{} | 208 gsc := testGSClient{} |
| 209 | 209 |
| 210 // Set up our test log stream. | 210 // Set up our test log stream. |
| 211 project := "test-project" | 211 project := "test-project" |
| 212 desc := logpb.LogStreamDescriptor{ | 212 desc := logpb.LogStreamDescriptor{ |
| 213 » » » Prefix: "testing", | 213 » » » Prefix: "testing", |
| 214 » » » Name: "foo", | 214 » » » Name: "foo", |
| 215 » » » BinaryFileExt: "bin", | |
| 216 » » } | |
| 217 » » descBytes, err := proto.Marshal(&desc) | |
| 218 » » if err != nil { | |
| 219 » » » panic(err) | |
| 220 } | 215 } |
| 221 | 216 |
| 222 // Utility function to add a log entry for "ls". | 217 // Utility function to add a log entry for "ls". |
| 223 addTestEntry := func(p string, idxs ...int) { | 218 addTestEntry := func(p string, idxs ...int) { |
| 224 for _, v := range idxs { | 219 for _, v := range idxs { |
| 225 le := logpb.LogEntry{ | 220 le := logpb.LogEntry{ |
| 226 PrefixIndex: uint64(v), | 221 PrefixIndex: uint64(v), |
| 227 StreamIndex: uint64(v), | 222 StreamIndex: uint64(v), |
| 228 Content: &logpb.LogEntry_Text{&logpb.Tex
t{ | 223 Content: &logpb.LogEntry_Text{&logpb.Tex
t{ |
| 229 Lines: []*logpb.Text_Line{ | 224 Lines: []*logpb.Text_Line{ |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 273 } | 268 } |
| 274 | 269 |
| 275 // Set up our test Coordinator client stubs. | 270 // Set up our test Coordinator client stubs. |
| 276 stream := logdog.LoadStreamResponse{ | 271 stream := logdog.LoadStreamResponse{ |
| 277 State: &logdog.LogStreamState{ | 272 State: &logdog.LogStreamState{ |
| 278 ProtoVersion: logpb.Version, | 273 ProtoVersion: logpb.Version, |
| 279 TerminalIndex: -1, | 274 TerminalIndex: -1, |
| 280 Archived: false, | 275 Archived: false, |
| 281 Purged: false, | 276 Purged: false, |
| 282 }, | 277 }, |
| 283 Desc: descBytes, | |
| 284 | 278 |
| 285 // Age is ON the expiration threshold, so not expired. | 279 // Age is ON the expiration threshold, so not expired. |
| 286 Age: archiveTask.CompletePeriod, | 280 Age: archiveTask.CompletePeriod, |
| 287 ArchivalKey: archiveTask.Key, | 281 ArchivalKey: archiveTask.Key, |
| 288 } | 282 } |
| 289 | 283 |
| 284 // Allow tests to modify the log stream descriptor. |
| 285 reloadDesc := func() { |
| 286 descBytes, err := proto.Marshal(&desc) |
| 287 if err != nil { |
| 288 panic(err) |
| 289 } |
| 290 stream.Desc = descBytes |
| 291 } |
| 292 reloadDesc() |
| 293 |
| 290 var archiveRequest *logdog.ArchiveStreamRequest | 294 var archiveRequest *logdog.ArchiveStreamRequest |
| 291 var archiveStreamErr error | 295 var archiveStreamErr error |
| 292 sc := testServicesClient{ | 296 sc := testServicesClient{ |
| 293 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 297 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 294 return &stream, nil | 298 return &stream, nil |
| 295 }, | 299 }, |
| 296 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ | 300 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ |
| 297 archiveRequest = req | 301 archiveRequest = req |
| 298 return archiveStreamErr | 302 return archiveStreamErr |
| 299 }, | 303 }, |
| 300 } | 304 } |
| 301 | 305 |
| 302 » » stBase := Settings{} | 306 » » stBase := Settings{ |
| 307 » » » AlwaysRender: true, |
| 308 » » } |
| 303 | 309 |
| 304 ar := Archivist{ | 310 ar := Archivist{ |
| 305 Service: &sc, | 311 Service: &sc, |
| 306 SettingsLoader: func(c context.Context, proj config.Proj
ectName) (*Settings, error) { | 312 SettingsLoader: func(c context.Context, proj config.Proj
ectName) (*Settings, error) { |
| 307 // Extra slashes to test concatenation,. | 313 // Extra slashes to test concatenation,. |
| 308 st := stBase | 314 st := stBase |
| 309 st.GSBase = gs.Path(fmt.Sprintf("gs://archival/%
s/path/to/archive/", proj)) | 315 st.GSBase = gs.Path(fmt.Sprintf("gs://archival/%
s/path/to/archive/", proj)) |
| 310 st.GSStagingBase = gs.Path(fmt.Sprintf("gs://arc
hival-staging/%s/path/to/archive/", proj)) | 316 st.GSStagingBase = gs.Path(fmt.Sprintf("gs://arc
hival-staging/%s/path/to/archive/", proj)) |
| 311 return &st, nil | 317 return &st, nil |
| 312 }, | 318 }, |
| (...skipping 272 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 585 TerminalIndex: 2, | 591 TerminalIndex: 2, |
| 586 | 592 |
| 587 StreamUrl: gsURL(project, "logst
ream.entries"), | 593 StreamUrl: gsURL(project, "logst
ream.entries"), |
| 588 IndexUrl: gsURL(project, "logst
ream.index"), | 594 IndexUrl: gsURL(project, "logst
ream.index"), |
| 589 DataUrl: gsURL(project, "data.
bin"), | 595 DataUrl: gsURL(project, "data.
bin"), |
| 590 }) | 596 }) |
| 591 }) | 597 }) |
| 592 }) | 598 }) |
| 593 }) | 599 }) |
| 594 | 600 |
| 601 Convey(`When not configured to always render`, func() { |
| 602 stBase.AlwaysRender = false |
| 603 |
| 604 addTestEntry(project, 0, 1, 2, 3, 4) |
| 605 stream.State.TerminalIndex = 4 |
| 606 |
| 607 Convey(`Will not emit a data stream if no binary file ex
tension is specified.`, func() { |
| 608 So(ar.archiveTaskImpl(c, task), ShouldBeNil) |
| 609 So(task.consumed, ShouldBeTrue) |
| 610 |
| 611 So(hasStreams(true, true, false), ShouldBeTrue) |
| 612 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 613 Project: project, |
| 614 Id: archiveTask.Id, |
| 615 LogEntryCount: 5, |
| 616 TerminalIndex: 4, |
| 617 |
| 618 StreamUrl: gsURL(project, "logstream.ent
ries"), |
| 619 IndexUrl: gsURL(project, "logstream.ind
ex"), |
| 620 }) |
| 621 }) |
| 622 |
| 623 Convey(`Will emit a data stream if a binary file extensi
on is specified.`, func() { |
| 624 desc.BinaryFileExt = "foobar" |
| 625 reloadDesc() |
| 626 |
| 627 So(ar.archiveTaskImpl(c, task), ShouldBeNil) |
| 628 So(task.consumed, ShouldBeTrue) |
| 629 |
| 630 So(hasStreams(true, true, true), ShouldBeTrue) |
| 631 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 632 Project: project, |
| 633 Id: archiveTask.Id, |
| 634 LogEntryCount: 5, |
| 635 TerminalIndex: 4, |
| 636 |
| 637 StreamUrl: gsURL(project, "logstream.ent
ries"), |
| 638 IndexUrl: gsURL(project, "logstream.ind
ex"), |
| 639 DataUrl: gsURL(project, "data.foobar")
, |
| 640 }) |
| 641 }) |
| 642 }) |
| 643 |
| 595 Convey(`With an empty project name, will fail and consume the ta
sk.`, func() { | 644 Convey(`With an empty project name, will fail and consume the ta
sk.`, func() { |
| 596 archiveTask.Project = "" | 645 archiveTask.Project = "" |
| 597 | 646 |
| 598 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "invalid
project name") | 647 So(ar.archiveTaskImpl(c, task), ShouldErrLike, "invalid
project name") |
| 599 So(task.consumed, ShouldBeTrue) | 648 So(task.consumed, ShouldBeTrue) |
| 600 }) | 649 }) |
| 601 | 650 |
| 602 Convey(`With an invalid project name, will fail and consume the
task.`, func() { | 651 Convey(`With an invalid project name, will fail and consume the
task.`, func() { |
| 603 archiveTask.Project = "!!! invalid project name !!!" | 652 archiveTask.Project = "!!! invalid project name !!!" |
| 604 | 653 |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 677 | 726 |
| 678 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") | 727 So(ar.archiveTaskImpl(c, task),
ShouldErrLike, "test error") |
| 679 So(task.consumed, ShouldBeFalse) | 728 So(task.consumed, ShouldBeFalse) |
| 680 So(archiveRequest, ShouldBeNil) | 729 So(archiveRequest, ShouldBeNil) |
| 681 }) | 730 }) |
| 682 } | 731 } |
| 683 } | 732 } |
| 684 }) | 733 }) |
| 685 }) | 734 }) |
| 686 } | 735 } |
| OLD | NEW |