OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "strings" | 9 "strings" |
10 "sync" | 10 "sync" |
11 "testing" | 11 "testing" |
12 "time" | 12 "time" |
13 | 13 |
14 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
16 "github.com/luci/luci-go/common/clock/testclock" | 16 "github.com/luci/luci-go/common/clock/testclock" |
| 17 "github.com/luci/luci-go/common/config" |
17 "github.com/luci/luci-go/common/errors" | 18 "github.com/luci/luci-go/common/errors" |
18 "github.com/luci/luci-go/common/gcloud/gs" | 19 "github.com/luci/luci-go/common/gcloud/gs" |
19 "github.com/luci/luci-go/common/logdog/types" | 20 "github.com/luci/luci-go/common/logdog/types" |
20 "github.com/luci/luci-go/common/proto/google" | 21 "github.com/luci/luci-go/common/proto/google" |
21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 22 "github.com/luci/luci-go/common/proto/logdog/logpb" |
22 "github.com/luci/luci-go/server/logdog/storage" | 23 "github.com/luci/luci-go/server/logdog/storage" |
23 "github.com/luci/luci-go/server/logdog/storage/memory" | 24 "github.com/luci/luci-go/server/logdog/storage/memory" |
24 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
25 "google.golang.org/grpc" | 26 "google.golang.org/grpc" |
26 | 27 |
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
195 func TestHandleArchive(t *testing.T) { | 196 func TestHandleArchive(t *testing.T) { |
196 t.Parallel() | 197 t.Parallel() |
197 | 198 |
198 Convey(`A testing archive setup`, t, func() { | 199 Convey(`A testing archive setup`, t, func() { |
199 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeUTC) | 200 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeUTC) |
200 | 201 |
201 st := memory.Storage{} | 202 st := memory.Storage{} |
202 gsc := testGSClient{} | 203 gsc := testGSClient{} |
203 | 204 |
204 // Set up our test log stream. | 205 // Set up our test log stream. |
| 206 project := "test-project" |
205 desc := logpb.LogStreamDescriptor{ | 207 desc := logpb.LogStreamDescriptor{ |
206 Prefix: "testing", | 208 Prefix: "testing", |
207 Name: "foo", | 209 Name: "foo", |
208 BinaryFileExt: "bin", | 210 BinaryFileExt: "bin", |
209 } | 211 } |
210 descBytes, err := proto.Marshal(&desc) | 212 descBytes, err := proto.Marshal(&desc) |
211 if err != nil { | 213 if err != nil { |
212 panic(err) | 214 panic(err) |
213 } | 215 } |
214 | 216 |
215 // Utility function to add a log entry for "ls". | 217 // Utility function to add a log entry for "ls". |
216 » » addTestEntry := func(idxs ...int) { | 218 » » addTestEntry := func(p string, idxs ...int) { |
217 for _, v := range idxs { | 219 for _, v := range idxs { |
218 le := logpb.LogEntry{ | 220 le := logpb.LogEntry{ |
219 PrefixIndex: uint64(v), | 221 PrefixIndex: uint64(v), |
220 StreamIndex: uint64(v), | 222 StreamIndex: uint64(v), |
221 Content: &logpb.LogEntry_Text{&logpb.Tex
t{ | 223 Content: &logpb.LogEntry_Text{&logpb.Tex
t{ |
222 Lines: []*logpb.Text_Line{ | 224 Lines: []*logpb.Text_Line{ |
223 { | 225 { |
224 Value: fmt.S
printf("line #%d", v), | 226 Value: fmt.S
printf("line #%d", v), |
225 Delimiter: "\n", | 227 Delimiter: "\n", |
226 }, | 228 }, |
227 }, | 229 }, |
228 }}, | 230 }}, |
229 } | 231 } |
230 | 232 |
231 d, err := proto.Marshal(&le) | 233 d, err := proto.Marshal(&le) |
232 if err != nil { | 234 if err != nil { |
233 panic(err) | 235 panic(err) |
234 } | 236 } |
235 | 237 |
236 err = st.Put(storage.PutRequest{ | 238 err = st.Put(storage.PutRequest{ |
237 » » » » » Path: desc.Path(), | 239 » » » » » Project: config.ProjectName(p), |
238 » » » » » Index: types.MessageIndex(v), | 240 » » » » » Path: desc.Path(), |
239 » » » » » Values: [][]byte{d}, | 241 » » » » » Index: types.MessageIndex(v), |
| 242 » » » » » Values: [][]byte{d}, |
240 }) | 243 }) |
241 if err != nil { | 244 if err != nil { |
242 panic(err) | 245 panic(err) |
243 } | 246 } |
244 | 247 |
245 // Advance the time for each log entry. | 248 // Advance the time for each log entry. |
246 tc.Add(time.Second) | 249 tc.Add(time.Second) |
247 } | 250 } |
248 } | 251 } |
249 | 252 |
250 // Set up our testing archival task. | 253 // Set up our testing archival task. |
251 expired := 10 * time.Minute | 254 expired := 10 * time.Minute |
252 archiveTask := logdog.ArchiveTask{ | 255 archiveTask := logdog.ArchiveTask{ |
| 256 Project: project, |
253 Path: string(desc.Path()), | 257 Path: string(desc.Path()), |
254 SettleDelay: google.NewDuration(10 * time.Second), | 258 SettleDelay: google.NewDuration(10 * time.Second), |
255 CompletePeriod: google.NewDuration(expired), | 259 CompletePeriod: google.NewDuration(expired), |
256 Key: []byte("random archival key"), | 260 Key: []byte("random archival key"), |
257 } | 261 } |
258 expired++ // This represents a time PAST CompletePeriod. | 262 expired++ // This represents a time PAST CompletePeriod. |
259 | 263 |
260 task := &testTask{ | 264 task := &testTask{ |
261 task: &archiveTask, | 265 task: &archiveTask, |
262 } | 266 } |
(...skipping 27 matching lines...) Expand all Loading... |
290 } | 294 } |
291 | 295 |
292 ar := Archivist{ | 296 ar := Archivist{ |
293 Service: &sc, | 297 Service: &sc, |
294 Storage: &st, | 298 Storage: &st, |
295 GSClient: &gsc, | 299 GSClient: &gsc, |
296 GSBase: gs.Path("gs://archive-test/path/to/archiv
e/"), // Extra slashes to test concatenation. | 300 GSBase: gs.Path("gs://archive-test/path/to/archiv
e/"), // Extra slashes to test concatenation. |
297 GSStagingBase: gs.Path("gs://archive-test-staging/path/t
o/archive/"), // Extra slashes to test concatenation. | 301 GSStagingBase: gs.Path("gs://archive-test-staging/path/t
o/archive/"), // Extra slashes to test concatenation. |
298 } | 302 } |
299 | 303 |
300 » » gsURL := func(p string) string { | 304 » » gsURL := func(project, name string) string { |
301 » » » return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) | 305 » » » return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s/%s", project, desc.Path(), name) |
302 } | 306 } |
303 | 307 |
304 // hasStreams can be called to check that the retained archiveRe
quest had | 308 // hasStreams can be called to check that the retained archiveRe
quest had |
305 // data sizes for the named archive stream types. | 309 // data sizes for the named archive stream types. |
306 // | 310 // |
307 // After checking, the values are set to zero. This allows us to
use | 311 // After checking, the values are set to zero. This allows us to
use |
308 // ShouldEqual without hardcoding specific archival sizes into t
he results. | 312 // ShouldEqual without hardcoding specific archival sizes into t
he results. |
309 hasStreams := func(log, index, data bool) bool { | 313 hasStreams := func(log, index, data bool) bool { |
310 So(archiveRequest, ShouldNotBeNil) | 314 So(archiveRequest, ShouldNotBeNil) |
311 if (log && archiveRequest.StreamSize <= 0) || | 315 if (log && archiveRequest.StreamSize <= 0) || |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
391 Convey(`With terminal index "3"`, func() { | 395 Convey(`With terminal index "3"`, func() { |
392 stream.State.TerminalIndex = 3 | 396 stream.State.TerminalIndex = 3 |
393 | 397 |
394 Convey(`Will fail not ACK a log stream with no entries.`
, func() { | 398 Convey(`Will fail not ACK a log stream with no entries.`
, func() { |
395 ack, err := ar.archiveTaskImpl(c, task) | 399 ack, err := ar.archiveTaskImpl(c, task) |
396 So(err, ShouldEqual, storage.ErrDoesNotExist) | 400 So(err, ShouldEqual, storage.ErrDoesNotExist) |
397 So(ack, ShouldBeFalse) | 401 So(ack, ShouldBeFalse) |
398 }) | 402 }) |
399 | 403 |
400 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { | 404 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { |
401 » » » » addTestEntry(0, 1, 2, 4) | 405 » » » » addTestEntry(project, 0, 1, 2, 4) |
402 | 406 |
403 ack, err := ar.archiveTaskImpl(c, task) | 407 ack, err := ar.archiveTaskImpl(c, task) |
404 So(err, ShouldErrLike, "missing log entry") | 408 So(err, ShouldErrLike, "missing log entry") |
405 So(ack, ShouldBeFalse) | 409 So(ack, ShouldBeFalse) |
406 }) | 410 }) |
407 | 411 |
408 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { | 412 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { |
409 » » » » addTestEntry(0, 1, 2, 3, 4) | 413 » » » » addTestEntry(project, 0, 1, 2, 3, 4) |
410 | 414 |
411 ack, err := ar.archiveTaskImpl(c, task) | 415 ack, err := ar.archiveTaskImpl(c, task) |
412 So(err, ShouldBeNil) | 416 So(err, ShouldBeNil) |
413 So(ack, ShouldBeTrue) | 417 So(ack, ShouldBeTrue) |
414 | 418 |
415 So(hasStreams(true, true, true), ShouldBeTrue) | 419 So(hasStreams(true, true, true), ShouldBeTrue) |
416 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ | 420 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 421 Project: project, |
417 Path: archiveTask.Path, | 422 Path: archiveTask.Path, |
418 LogEntryCount: 4, | 423 LogEntryCount: 4, |
419 TerminalIndex: 3, | 424 TerminalIndex: 3, |
420 | 425 |
421 » » » » » StreamUrl: gsURL("logstream.entries"), | 426 » » » » » StreamUrl: gsURL(project, "logstream.ent
ries"), |
422 » » » » » IndexUrl: gsURL("logstream.index"), | 427 » » » » » IndexUrl: gsURL(project, "logstream.ind
ex"), |
423 » » » » » DataUrl: gsURL("data.bin"), | 428 » » » » » DataUrl: gsURL(project, "data.bin"), |
424 }) | 429 }) |
425 }) | 430 }) |
426 | 431 |
427 Convey(`When a transient archival error occurs, will not
ACK it.`, func() { | 432 Convey(`When a transient archival error occurs, will not
ACK it.`, func() { |
428 » » » » addTestEntry(0, 1, 2, 3, 4) | 433 » » » » addTestEntry(project, 0, 1, 2, 3, 4) |
429 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } | 434 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } |
430 | 435 |
431 ack, err := ar.archiveTaskImpl(c, task) | 436 ack, err := ar.archiveTaskImpl(c, task) |
432 So(err, ShouldErrLike, "test error") | 437 So(err, ShouldErrLike, "test error") |
433 So(ack, ShouldBeFalse) | 438 So(ack, ShouldBeFalse) |
434 }) | 439 }) |
435 | 440 |
436 Convey(`When a non-transient archival error occurs`, fun
c() { | 441 Convey(`When a non-transient archival error occurs`, fun
c() { |
437 » » » » addTestEntry(0, 1, 2, 3, 4) | 442 » » » » addTestEntry(project, 0, 1, 2, 3, 4) |
438 archiveErr := errors.New("archive failure error"
) | 443 archiveErr := errors.New("archive failure error"
) |
439 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } | 444 gsc.newWriterErr = func(*testGSWriter) error { r
eturn archiveErr } |
440 | 445 |
441 Convey(`If remote report returns an error, do no
t ACK.`, func() { | 446 Convey(`If remote report returns an error, do no
t ACK.`, func() { |
442 archiveStreamErr = errors.New("test erro
r") | 447 archiveStreamErr = errors.New("test erro
r") |
443 | 448 |
444 ack, err := ar.archiveTaskImpl(c, task) | 449 ack, err := ar.archiveTaskImpl(c, task) |
445 So(err, ShouldErrLike, "test error") | 450 So(err, ShouldErrLike, "test error") |
446 So(ack, ShouldBeFalse) | 451 So(ack, ShouldBeFalse) |
447 | 452 |
448 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 453 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
449 » » » » » » Path: archiveTask.Path, | 454 » » » » » » Project: project, |
450 » » » » » » Error: "archive failure error", | 455 » » » » » » Path: archiveTask.Path, |
| 456 » » » » » » Error: "archive failure error"
, |
451 }) | 457 }) |
452 }) | 458 }) |
453 | 459 |
454 Convey(`If remote report returns success, ACK.`,
func() { | 460 Convey(`If remote report returns success, ACK.`,
func() { |
455 ack, err := ar.archiveTaskImpl(c, task) | 461 ack, err := ar.archiveTaskImpl(c, task) |
456 So(err, ShouldBeNil) | 462 So(err, ShouldBeNil) |
457 So(ack, ShouldBeTrue) | 463 So(ack, ShouldBeTrue) |
458 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 464 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
459 » » » » » » Path: archiveTask.Path, | 465 » » » » » » Project: project, |
460 » » » » » » Error: "archive failure error", | 466 » » » » » » Path: archiveTask.Path, |
| 467 » » » » » » Error: "archive failure error"
, |
461 }) | 468 }) |
462 }) | 469 }) |
463 | 470 |
464 Convey(`If an empty error string is supplied, th
e generic error will be filled in.`, func() { | 471 Convey(`If an empty error string is supplied, th
e generic error will be filled in.`, func() { |
465 archiveErr = errors.New("") | 472 archiveErr = errors.New("") |
466 | 473 |
467 ack, err := ar.archiveTaskImpl(c, task) | 474 ack, err := ar.archiveTaskImpl(c, task) |
468 So(err, ShouldBeNil) | 475 So(err, ShouldBeNil) |
469 So(ack, ShouldBeTrue) | 476 So(ack, ShouldBeTrue) |
470 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 477 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
471 » » » » » » Path: archiveTask.Path, | 478 » » » » » » Project: project, |
472 » » » » » » Error: "archival error", | 479 » » » » » » Path: archiveTask.Path, |
| 480 » » » » » » Error: "archival error", |
473 }) | 481 }) |
474 }) | 482 }) |
475 }) | 483 }) |
476 }) | 484 }) |
477 | 485 |
478 Convey(`When not enforcing stream completeness`, func() { | 486 Convey(`When not enforcing stream completeness`, func() { |
479 stream.Age = google.NewDuration(expired) | 487 stream.Age = google.NewDuration(expired) |
480 | 488 |
481 Convey(`With no terminal index`, func() { | 489 Convey(`With no terminal index`, func() { |
482 Convey(`Will successfully archive if there are n
o entries.`, func() { | 490 Convey(`Will successfully archive if there are n
o entries.`, func() { |
483 ack, err := ar.archiveTaskImpl(c, task) | 491 ack, err := ar.archiveTaskImpl(c, task) |
484 So(err, ShouldBeNil) | 492 So(err, ShouldBeNil) |
485 So(ack, ShouldBeTrue) | 493 So(ack, ShouldBeTrue) |
486 | 494 |
487 So(hasStreams(true, true, false), Should
BeTrue) | 495 So(hasStreams(true, true, false), Should
BeTrue) |
488 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 496 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 497 Project: project, |
489 Path: archiveTask.Path, | 498 Path: archiveTask.Path, |
490 LogEntryCount: 0, | 499 LogEntryCount: 0, |
491 TerminalIndex: -1, | 500 TerminalIndex: -1, |
492 | 501 |
493 » » » » » » StreamUrl: gsURL("logstream.entr
ies"), | 502 » » » » » » StreamUrl: gsURL(project, "logst
ream.entries"), |
494 » » » » » » IndexUrl: gsURL("logstream.inde
x"), | 503 » » » » » » IndexUrl: gsURL(project, "logst
ream.index"), |
495 » » » » » » DataUrl: gsURL("data.bin"), | 504 » » » » » » DataUrl: gsURL(project, "data.
bin"), |
496 }) | 505 }) |
497 }) | 506 }) |
498 | 507 |
499 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { | 508 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { |
500 » » » » » addTestEntry(0, 1, 2, 4) | 509 » » » » » addTestEntry(project, 0, 1, 2, 4) |
501 | 510 |
502 ack, err := ar.archiveTaskImpl(c, task) | 511 ack, err := ar.archiveTaskImpl(c, task) |
503 So(err, ShouldBeNil) | 512 So(err, ShouldBeNil) |
504 So(ack, ShouldBeTrue) | 513 So(ack, ShouldBeTrue) |
505 | 514 |
506 So(hasStreams(true, true, true), ShouldB
eTrue) | 515 So(hasStreams(true, true, true), ShouldB
eTrue) |
507 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 516 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 517 Project: project, |
508 Path: archiveTask.Path, | 518 Path: archiveTask.Path, |
509 LogEntryCount: 4, | 519 LogEntryCount: 4, |
510 TerminalIndex: 4, | 520 TerminalIndex: 4, |
511 | 521 |
512 » » » » » » StreamUrl: gsURL("logstream.entr
ies"), | 522 » » » » » » StreamUrl: gsURL(project, "logst
ream.entries"), |
513 » » » » » » IndexUrl: gsURL("logstream.inde
x"), | 523 » » » » » » IndexUrl: gsURL(project, "logst
ream.index"), |
514 » » » » » » DataUrl: gsURL("data.bin"), | 524 » » » » » » DataUrl: gsURL(project, "data.
bin"), |
515 }) | 525 }) |
516 }) | 526 }) |
517 }) | 527 }) |
518 | 528 |
519 Convey(`With terminal index 3`, func() { | 529 Convey(`With terminal index 3`, func() { |
520 stream.State.TerminalIndex = 3 | 530 stream.State.TerminalIndex = 3 |
521 | 531 |
522 Convey(`Will successfully archive if there are n
o entries.`, func() { | 532 Convey(`Will successfully archive if there are n
o entries.`, func() { |
523 ack, err := ar.archiveTaskImpl(c, task) | 533 ack, err := ar.archiveTaskImpl(c, task) |
524 So(err, ShouldBeNil) | 534 So(err, ShouldBeNil) |
525 So(ack, ShouldBeTrue) | 535 So(ack, ShouldBeTrue) |
526 | 536 |
527 So(hasStreams(true, true, false), Should
BeTrue) | 537 So(hasStreams(true, true, false), Should
BeTrue) |
528 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 538 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 539 Project: project, |
529 Path: archiveTask.Path, | 540 Path: archiveTask.Path, |
530 LogEntryCount: 0, | 541 LogEntryCount: 0, |
531 TerminalIndex: -1, | 542 TerminalIndex: -1, |
532 | 543 |
533 » » » » » » StreamUrl: gsURL("logstream.entr
ies"), | 544 » » » » » » StreamUrl: gsURL(project, "logst
ream.entries"), |
534 » » » » » » IndexUrl: gsURL("logstream.inde
x"), | 545 » » » » » » IndexUrl: gsURL(project, "logst
ream.index"), |
535 » » » » » » DataUrl: gsURL("data.bin"), | 546 » » » » » » DataUrl: gsURL(project, "data.
bin"), |
536 }) | 547 }) |
537 }) | 548 }) |
538 | 549 |
539 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { | 550 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { |
540 » » » » » addTestEntry(0, 1, 2, 4) | 551 » » » » » addTestEntry(project, 0, 1, 2, 4) |
541 | 552 |
542 ack, err := ar.archiveTaskImpl(c, task) | 553 ack, err := ar.archiveTaskImpl(c, task) |
543 So(err, ShouldBeNil) | 554 So(err, ShouldBeNil) |
544 So(ack, ShouldBeTrue) | 555 So(ack, ShouldBeTrue) |
545 | 556 |
546 So(hasStreams(true, true, true), ShouldB
eTrue) | 557 So(hasStreams(true, true, true), ShouldB
eTrue) |
547 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 558 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 559 Project: project, |
548 Path: archiveTask.Path, | 560 Path: archiveTask.Path, |
549 LogEntryCount: 3, | 561 LogEntryCount: 3, |
550 TerminalIndex: 2, | 562 TerminalIndex: 2, |
551 | 563 |
552 » » » » » » StreamUrl: gsURL("logstream.entr
ies"), | 564 » » » » » » StreamUrl: gsURL(project, "logst
ream.entries"), |
553 » » » » » » IndexUrl: gsURL("logstream.inde
x"), | 565 » » » » » » IndexUrl: gsURL(project, "logst
ream.index"), |
554 » » » » » » DataUrl: gsURL("data.bin"), | 566 » » » » » » DataUrl: gsURL(project, "data.
bin"), |
555 }) | 567 }) |
556 }) | 568 }) |
557 }) | 569 }) |
558 }) | 570 }) |
559 | 571 |
| 572 Convey(`With an empty project`, func() { |
| 573 archiveTask.Project = "" |
| 574 |
| 575 Convey(`Will successfully archive {0, 1, 2, 3} with term
inal index 3 using "_" for project archive path.`, func() { |
| 576 stream.State.TerminalIndex = 3 |
| 577 addTestEntry("", 0, 1, 2, 3) |
| 578 |
| 579 ack, err := ar.archiveTaskImpl(c, task) |
| 580 So(err, ShouldBeNil) |
| 581 So(ack, ShouldBeTrue) |
| 582 |
| 583 So(hasStreams(true, true, true), ShouldBeTrue) |
| 584 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 585 Project: "", |
| 586 Path: archiveTask.Path, |
| 587 LogEntryCount: 4, |
| 588 TerminalIndex: 3, |
| 589 |
| 590 StreamUrl: gsURL("_", "logstream.entries
"), |
| 591 IndexUrl: gsURL("_", "logstream.index")
, |
| 592 DataUrl: gsURL("_", "data.bin"), |
| 593 }) |
| 594 }) |
| 595 }) |
| 596 |
560 // Simulate failures during the various stream generation operat
ions. | 597 // Simulate failures during the various stream generation operat
ions. |
561 Convey(`Stream generation failures`, func() { | 598 Convey(`Stream generation failures`, func() { |
562 stream.State.TerminalIndex = 3 | 599 stream.State.TerminalIndex = 3 |
563 » » » addTestEntry(0, 1, 2, 3) | 600 » » » addTestEntry(project, 0, 1, 2, 3) |
564 | 601 |
565 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 602 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
566 for _, testCase := range []struct { | 603 for _, testCase := range []struct { |
567 name string | 604 name string |
568 setup func() | 605 setup func() |
569 }{ | 606 }{ |
570 {"writer create failure", func() { | 607 {"writer create failure", func() { |
571 gsc.newWriterErr = func(w *testG
SWriter) error { | 608 gsc.newWriterErr = func(w *testG
SWriter) error { |
572 if strings.HasSuffix(str
ing(w.path), failName) { | 609 if strings.HasSuffix(str
ing(w.path), failName) { |
573 return errors.Wr
apTransient(errors.New("test error")) | 610 return errors.Wr
apTransient(errors.New("test error")) |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
629 ack, err := ar.archiveTaskImpl(c
, task) | 666 ack, err := ar.archiveTaskImpl(c
, task) |
630 So(err, ShouldErrLike, "test err
or") | 667 So(err, ShouldErrLike, "test err
or") |
631 So(ack, ShouldBeFalse) | 668 So(ack, ShouldBeFalse) |
632 So(archiveRequest, ShouldBeNil) | 669 So(archiveRequest, ShouldBeNil) |
633 }) | 670 }) |
634 } | 671 } |
635 } | 672 } |
636 }) | 673 }) |
637 }) | 674 }) |
638 } | 675 } |
OLD | NEW |