Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(482)

Side by Side Diff: server/internal/logdog/archivist/archivist_test.go

Issue 1909053003: LogDog: Add project namespacing to Archivist. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-logs
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "strings" 9 "strings"
10 "sync" 10 "sync"
11 "testing" 11 "testing"
12 "time" 12 "time"
13 13
14 "github.com/golang/protobuf/proto" 14 "github.com/golang/protobuf/proto"
15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
16 "github.com/luci/luci-go/common/clock/testclock" 16 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/config"
17 "github.com/luci/luci-go/common/errors" 18 "github.com/luci/luci-go/common/errors"
18 "github.com/luci/luci-go/common/gcloud/gs" 19 "github.com/luci/luci-go/common/gcloud/gs"
19 "github.com/luci/luci-go/common/logdog/types" 20 "github.com/luci/luci-go/common/logdog/types"
20 "github.com/luci/luci-go/common/proto/google" 21 "github.com/luci/luci-go/common/proto/google"
21 "github.com/luci/luci-go/common/proto/logdog/logpb" 22 "github.com/luci/luci-go/common/proto/logdog/logpb"
22 "github.com/luci/luci-go/server/logdog/storage" 23 "github.com/luci/luci-go/server/logdog/storage"
23 "github.com/luci/luci-go/server/logdog/storage/memory" 24 "github.com/luci/luci-go/server/logdog/storage/memory"
24 "golang.org/x/net/context" 25 "golang.org/x/net/context"
25 "google.golang.org/grpc" 26 "google.golang.org/grpc"
26 27
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 func TestHandleArchive(t *testing.T) { 196 func TestHandleArchive(t *testing.T) {
196 t.Parallel() 197 t.Parallel()
197 198
198 Convey(`A testing archive setup`, t, func() { 199 Convey(`A testing archive setup`, t, func() {
199 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeUTC) 200 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeUTC)
200 201
201 st := memory.Storage{} 202 st := memory.Storage{}
202 gsc := testGSClient{} 203 gsc := testGSClient{}
203 204
204 // Set up our test log stream. 205 // Set up our test log stream.
206 project := "test-project"
205 desc := logpb.LogStreamDescriptor{ 207 desc := logpb.LogStreamDescriptor{
206 Prefix: "testing", 208 Prefix: "testing",
207 Name: "foo", 209 Name: "foo",
208 BinaryFileExt: "bin", 210 BinaryFileExt: "bin",
209 } 211 }
210 descBytes, err := proto.Marshal(&desc) 212 descBytes, err := proto.Marshal(&desc)
211 if err != nil { 213 if err != nil {
212 panic(err) 214 panic(err)
213 } 215 }
214 216
215 // Utility function to add a log entry for "ls". 217 // Utility function to add a log entry for "ls".
216 » » addTestEntry := func(idxs ...int) { 218 » » addTestEntry := func(p string, idxs ...int) {
217 for _, v := range idxs { 219 for _, v := range idxs {
218 le := logpb.LogEntry{ 220 le := logpb.LogEntry{
219 PrefixIndex: uint64(v), 221 PrefixIndex: uint64(v),
220 StreamIndex: uint64(v), 222 StreamIndex: uint64(v),
221 Content: &logpb.LogEntry_Text{&logpb.Tex t{ 223 Content: &logpb.LogEntry_Text{&logpb.Tex t{
222 Lines: []*logpb.Text_Line{ 224 Lines: []*logpb.Text_Line{
223 { 225 {
224 Value: fmt.S printf("line #%d", v), 226 Value: fmt.S printf("line #%d", v),
225 Delimiter: "\n", 227 Delimiter: "\n",
226 }, 228 },
227 }, 229 },
228 }}, 230 }},
229 } 231 }
230 232
231 d, err := proto.Marshal(&le) 233 d, err := proto.Marshal(&le)
232 if err != nil { 234 if err != nil {
233 panic(err) 235 panic(err)
234 } 236 }
235 237
236 err = st.Put(storage.PutRequest{ 238 err = st.Put(storage.PutRequest{
237 » » » » » Path: desc.Path(), 239 » » » » » Project: config.ProjectName(p),
238 » » » » » Index: types.MessageIndex(v), 240 » » » » » Path: desc.Path(),
239 » » » » » Values: [][]byte{d}, 241 » » » » » Index: types.MessageIndex(v),
242 » » » » » Values: [][]byte{d},
240 }) 243 })
241 if err != nil { 244 if err != nil {
242 panic(err) 245 panic(err)
243 } 246 }
244 247
245 // Advance the time for each log entry. 248 // Advance the time for each log entry.
246 tc.Add(time.Second) 249 tc.Add(time.Second)
247 } 250 }
248 } 251 }
249 252
250 // Set up our testing archival task. 253 // Set up our testing archival task.
251 expired := 10 * time.Minute 254 expired := 10 * time.Minute
252 archiveTask := logdog.ArchiveTask{ 255 archiveTask := logdog.ArchiveTask{
256 Project: project,
253 Path: string(desc.Path()), 257 Path: string(desc.Path()),
254 SettleDelay: google.NewDuration(10 * time.Second), 258 SettleDelay: google.NewDuration(10 * time.Second),
255 CompletePeriod: google.NewDuration(expired), 259 CompletePeriod: google.NewDuration(expired),
256 Key: []byte("random archival key"), 260 Key: []byte("random archival key"),
257 } 261 }
258 expired++ // This represents a time PAST CompletePeriod. 262 expired++ // This represents a time PAST CompletePeriod.
259 263
260 task := &testTask{ 264 task := &testTask{
261 task: &archiveTask, 265 task: &archiveTask,
262 } 266 }
(...skipping 27 matching lines...) Expand all
290 } 294 }
291 295
292 ar := Archivist{ 296 ar := Archivist{
293 Service: &sc, 297 Service: &sc,
294 Storage: &st, 298 Storage: &st,
295 GSClient: &gsc, 299 GSClient: &gsc,
296 GSBase: gs.Path("gs://archive-test/path/to/archiv e/"), // Extra slashes to test concatenation. 300 GSBase: gs.Path("gs://archive-test/path/to/archiv e/"), // Extra slashes to test concatenation.
297 GSStagingBase: gs.Path("gs://archive-test-staging/path/t o/archive/"), // Extra slashes to test concatenation. 301 GSStagingBase: gs.Path("gs://archive-test-staging/path/t o/archive/"), // Extra slashes to test concatenation.
298 } 302 }
299 303
300 » » gsURL := func(p string) string { 304 » » gsURL := func(project, name string) string {
301 » » » return fmt.Sprintf("gs://archive-test/path/to/archive/%s /%s", desc.Path(), p) 305 » » » return fmt.Sprintf("gs://archive-test/path/to/archive/%s /%s/%s", project, desc.Path(), name)
302 } 306 }
303 307
304 // hasStreams can be called to check that the retained archiveRe quest had 308 // hasStreams can be called to check that the retained archiveRe quest had
305 // data sizes for the named archive stream types. 309 // data sizes for the named archive stream types.
306 // 310 //
307 // After checking, the values are set to zero. This allows us to use 311 // After checking, the values are set to zero. This allows us to use
308 // ShouldEqual without hardcoding specific archival sizes into t he results. 312 // ShouldEqual without hardcoding specific archival sizes into t he results.
309 hasStreams := func(log, index, data bool) bool { 313 hasStreams := func(log, index, data bool) bool {
310 So(archiveRequest, ShouldNotBeNil) 314 So(archiveRequest, ShouldNotBeNil)
311 if (log && archiveRequest.StreamSize <= 0) || 315 if (log && archiveRequest.StreamSize <= 0) ||
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
391 Convey(`With terminal index "3"`, func() { 395 Convey(`With terminal index "3"`, func() {
392 stream.State.TerminalIndex = 3 396 stream.State.TerminalIndex = 3
393 397
394 Convey(`Will fail not ACK a log stream with no entries.` , func() { 398 Convey(`Will fail not ACK a log stream with no entries.` , func() {
395 ack, err := ar.archiveTaskImpl(c, task) 399 ack, err := ar.archiveTaskImpl(c, task)
396 So(err, ShouldEqual, storage.ErrDoesNotExist) 400 So(err, ShouldEqual, storage.ErrDoesNotExist)
397 So(ack, ShouldBeFalse) 401 So(ack, ShouldBeFalse)
398 }) 402 })
399 403
400 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).` , func() { 404 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).` , func() {
401 » » » » addTestEntry(0, 1, 2, 4) 405 » » » » addTestEntry(project, 0, 1, 2, 4)
402 406
403 ack, err := ar.archiveTaskImpl(c, task) 407 ack, err := ar.archiveTaskImpl(c, task)
404 So(err, ShouldErrLike, "missing log entry") 408 So(err, ShouldErrLike, "missing log entry")
405 So(ack, ShouldBeFalse) 409 So(ack, ShouldBeFalse)
406 }) 410 })
407 411
408 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp ing at the terminal index.`, func() { 412 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp ing at the terminal index.`, func() {
409 » » » » addTestEntry(0, 1, 2, 3, 4) 413 » » » » addTestEntry(project, 0, 1, 2, 3, 4)
410 414
411 ack, err := ar.archiveTaskImpl(c, task) 415 ack, err := ar.archiveTaskImpl(c, task)
412 So(err, ShouldBeNil) 416 So(err, ShouldBeNil)
413 So(ack, ShouldBeTrue) 417 So(ack, ShouldBeTrue)
414 418
415 So(hasStreams(true, true, true), ShouldBeTrue) 419 So(hasStreams(true, true, true), ShouldBeTrue)
416 So(archiveRequest, ShouldResemble, &logdog.Archi veStreamRequest{ 420 So(archiveRequest, ShouldResemble, &logdog.Archi veStreamRequest{
421 Project: project,
417 Path: archiveTask.Path, 422 Path: archiveTask.Path,
418 LogEntryCount: 4, 423 LogEntryCount: 4,
419 TerminalIndex: 3, 424 TerminalIndex: 3,
420 425
421 » » » » » StreamUrl: gsURL("logstream.entries"), 426 » » » » » StreamUrl: gsURL(project, "logstream.ent ries"),
422 » » » » » IndexUrl: gsURL("logstream.index"), 427 » » » » » IndexUrl: gsURL(project, "logstream.ind ex"),
423 » » » » » DataUrl: gsURL("data.bin"), 428 » » » » » DataUrl: gsURL(project, "data.bin"),
424 }) 429 })
425 }) 430 })
426 431
427 Convey(`When a transient archival error occurs, will not ACK it.`, func() { 432 Convey(`When a transient archival error occurs, will not ACK it.`, func() {
428 » » » » addTestEntry(0, 1, 2, 3, 4) 433 » » » » addTestEntry(project, 0, 1, 2, 3, 4)
429 gsc.newWriterErr = func(*testGSWriter) error { r eturn errors.WrapTransient(errors.New("test error")) } 434 gsc.newWriterErr = func(*testGSWriter) error { r eturn errors.WrapTransient(errors.New("test error")) }
430 435
431 ack, err := ar.archiveTaskImpl(c, task) 436 ack, err := ar.archiveTaskImpl(c, task)
432 So(err, ShouldErrLike, "test error") 437 So(err, ShouldErrLike, "test error")
433 So(ack, ShouldBeFalse) 438 So(ack, ShouldBeFalse)
434 }) 439 })
435 440
436 Convey(`When a non-transient archival error occurs`, fun c() { 441 Convey(`When a non-transient archival error occurs`, fun c() {
437 » » » » addTestEntry(0, 1, 2, 3, 4) 442 » » » » addTestEntry(project, 0, 1, 2, 3, 4)
438 archiveErr := errors.New("archive failure error" ) 443 archiveErr := errors.New("archive failure error" )
439 gsc.newWriterErr = func(*testGSWriter) error { r eturn archiveErr } 444 gsc.newWriterErr = func(*testGSWriter) error { r eturn archiveErr }
440 445
441 Convey(`If remote report returns an error, do no t ACK.`, func() { 446 Convey(`If remote report returns an error, do no t ACK.`, func() {
442 archiveStreamErr = errors.New("test erro r") 447 archiveStreamErr = errors.New("test erro r")
443 448
444 ack, err := ar.archiveTaskImpl(c, task) 449 ack, err := ar.archiveTaskImpl(c, task)
445 So(err, ShouldErrLike, "test error") 450 So(err, ShouldErrLike, "test error")
446 So(ack, ShouldBeFalse) 451 So(ack, ShouldBeFalse)
447 452
448 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 453 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
449 » » » » » » Path: archiveTask.Path, 454 » » » » » » Project: project,
450 » » » » » » Error: "archive failure error", 455 » » » » » » Path: archiveTask.Path,
456 » » » » » » Error: "archive failure error" ,
451 }) 457 })
452 }) 458 })
453 459
454 Convey(`If remote report returns success, ACK.`, func() { 460 Convey(`If remote report returns success, ACK.`, func() {
455 ack, err := ar.archiveTaskImpl(c, task) 461 ack, err := ar.archiveTaskImpl(c, task)
456 So(err, ShouldBeNil) 462 So(err, ShouldBeNil)
457 So(ack, ShouldBeTrue) 463 So(ack, ShouldBeTrue)
458 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 464 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
459 » » » » » » Path: archiveTask.Path, 465 » » » » » » Project: project,
460 » » » » » » Error: "archive failure error", 466 » » » » » » Path: archiveTask.Path,
467 » » » » » » Error: "archive failure error" ,
461 }) 468 })
462 }) 469 })
463 470
464 Convey(`If an empty error string is supplied, th e generic error will be filled in.`, func() { 471 Convey(`If an empty error string is supplied, th e generic error will be filled in.`, func() {
465 archiveErr = errors.New("") 472 archiveErr = errors.New("")
466 473
467 ack, err := ar.archiveTaskImpl(c, task) 474 ack, err := ar.archiveTaskImpl(c, task)
468 So(err, ShouldBeNil) 475 So(err, ShouldBeNil)
469 So(ack, ShouldBeTrue) 476 So(ack, ShouldBeTrue)
470 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 477 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
471 » » » » » » Path: archiveTask.Path, 478 » » » » » » Project: project,
472 » » » » » » Error: "archival error", 479 » » » » » » Path: archiveTask.Path,
480 » » » » » » Error: "archival error",
473 }) 481 })
474 }) 482 })
475 }) 483 })
476 }) 484 })
477 485
478 Convey(`When not enforcing stream completeness`, func() { 486 Convey(`When not enforcing stream completeness`, func() {
479 stream.Age = google.NewDuration(expired) 487 stream.Age = google.NewDuration(expired)
480 488
481 Convey(`With no terminal index`, func() { 489 Convey(`With no terminal index`, func() {
482 Convey(`Will successfully archive if there are n o entries.`, func() { 490 Convey(`Will successfully archive if there are n o entries.`, func() {
483 ack, err := ar.archiveTaskImpl(c, task) 491 ack, err := ar.archiveTaskImpl(c, task)
484 So(err, ShouldBeNil) 492 So(err, ShouldBeNil)
485 So(ack, ShouldBeTrue) 493 So(ack, ShouldBeTrue)
486 494
487 So(hasStreams(true, true, false), Should BeTrue) 495 So(hasStreams(true, true, false), Should BeTrue)
488 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 496 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
497 Project: project,
489 Path: archiveTask.Path, 498 Path: archiveTask.Path,
490 LogEntryCount: 0, 499 LogEntryCount: 0,
491 TerminalIndex: -1, 500 TerminalIndex: -1,
492 501
493 » » » » » » StreamUrl: gsURL("logstream.entr ies"), 502 » » » » » » StreamUrl: gsURL(project, "logst ream.entries"),
494 » » » » » » IndexUrl: gsURL("logstream.inde x"), 503 » » » » » » IndexUrl: gsURL(project, "logst ream.index"),
495 » » » » » » DataUrl: gsURL("data.bin"), 504 » » » » » » DataUrl: gsURL(project, "data. bin"),
496 }) 505 })
497 }) 506 })
498 507
499 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index.`, func() { 508 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index.`, func() {
500 » » » » » addTestEntry(0, 1, 2, 4) 509 » » » » » addTestEntry(project, 0, 1, 2, 4)
501 510
502 ack, err := ar.archiveTaskImpl(c, task) 511 ack, err := ar.archiveTaskImpl(c, task)
503 So(err, ShouldBeNil) 512 So(err, ShouldBeNil)
504 So(ack, ShouldBeTrue) 513 So(ack, ShouldBeTrue)
505 514
506 So(hasStreams(true, true, true), ShouldB eTrue) 515 So(hasStreams(true, true, true), ShouldB eTrue)
507 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 516 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
517 Project: project,
508 Path: archiveTask.Path, 518 Path: archiveTask.Path,
509 LogEntryCount: 4, 519 LogEntryCount: 4,
510 TerminalIndex: 4, 520 TerminalIndex: 4,
511 521
512 » » » » » » StreamUrl: gsURL("logstream.entr ies"), 522 » » » » » » StreamUrl: gsURL(project, "logst ream.entries"),
513 » » » » » » IndexUrl: gsURL("logstream.inde x"), 523 » » » » » » IndexUrl: gsURL(project, "logst ream.index"),
514 » » » » » » DataUrl: gsURL("data.bin"), 524 » » » » » » DataUrl: gsURL(project, "data. bin"),
515 }) 525 })
516 }) 526 })
517 }) 527 })
518 528
519 Convey(`With terminal index 3`, func() { 529 Convey(`With terminal index 3`, func() {
520 stream.State.TerminalIndex = 3 530 stream.State.TerminalIndex = 3
521 531
522 Convey(`Will successfully archive if there are n o entries.`, func() { 532 Convey(`Will successfully archive if there are n o entries.`, func() {
523 ack, err := ar.archiveTaskImpl(c, task) 533 ack, err := ar.archiveTaskImpl(c, task)
524 So(err, ShouldBeNil) 534 So(err, ShouldBeNil)
525 So(ack, ShouldBeTrue) 535 So(ack, ShouldBeTrue)
526 536
527 So(hasStreams(true, true, false), Should BeTrue) 537 So(hasStreams(true, true, false), Should BeTrue)
528 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 538 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
539 Project: project,
529 Path: archiveTask.Path, 540 Path: archiveTask.Path,
530 LogEntryCount: 0, 541 LogEntryCount: 0,
531 TerminalIndex: -1, 542 TerminalIndex: -1,
532 543
533 » » » » » » StreamUrl: gsURL("logstream.entr ies"), 544 » » » » » » StreamUrl: gsURL(project, "logst ream.entries"),
534 » » » » » » IndexUrl: gsURL("logstream.inde x"), 545 » » » » » » IndexUrl: gsURL(project, "logst ream.index"),
535 » » » » » » DataUrl: gsURL("data.bin"), 546 » » » » » » DataUrl: gsURL(project, "data. bin"),
536 }) 547 })
537 }) 548 })
538 549
539 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index to 2.`, func() { 550 Convey(`With {0, 1, 2, 4} (incomplete) will arch ive the stream and update its terminal index to 2.`, func() {
540 » » » » » addTestEntry(0, 1, 2, 4) 551 » » » » » addTestEntry(project, 0, 1, 2, 4)
541 552
542 ack, err := ar.archiveTaskImpl(c, task) 553 ack, err := ar.archiveTaskImpl(c, task)
543 So(err, ShouldBeNil) 554 So(err, ShouldBeNil)
544 So(ack, ShouldBeTrue) 555 So(ack, ShouldBeTrue)
545 556
546 So(hasStreams(true, true, true), ShouldB eTrue) 557 So(hasStreams(true, true, true), ShouldB eTrue)
547 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{ 558 So(archiveRequest, ShouldResemble, &logd og.ArchiveStreamRequest{
559 Project: project,
548 Path: archiveTask.Path, 560 Path: archiveTask.Path,
549 LogEntryCount: 3, 561 LogEntryCount: 3,
550 TerminalIndex: 2, 562 TerminalIndex: 2,
551 563
552 » » » » » » StreamUrl: gsURL("logstream.entr ies"), 564 » » » » » » StreamUrl: gsURL(project, "logst ream.entries"),
553 » » » » » » IndexUrl: gsURL("logstream.inde x"), 565 » » » » » » IndexUrl: gsURL(project, "logst ream.index"),
554 » » » » » » DataUrl: gsURL("data.bin"), 566 » » » » » » DataUrl: gsURL(project, "data. bin"),
555 }) 567 })
556 }) 568 })
557 }) 569 })
558 }) 570 })
559 571
572 Convey(`With an empty project`, func() {
573 archiveTask.Project = ""
574
575 Convey(`Will successfully archive {0, 1, 2, 3} with term inal index 3 using "_" for project archive path.`, func() {
576 stream.State.TerminalIndex = 3
577 addTestEntry("", 0, 1, 2, 3)
578
579 ack, err := ar.archiveTaskImpl(c, task)
580 So(err, ShouldBeNil)
581 So(ack, ShouldBeTrue)
582
583 So(hasStreams(true, true, true), ShouldBeTrue)
584 So(archiveRequest, ShouldResemble, &logdog.Archi veStreamRequest{
585 Project: "",
586 Path: archiveTask.Path,
587 LogEntryCount: 4,
588 TerminalIndex: 3,
589
590 StreamUrl: gsURL("_", "logstream.entries "),
591 IndexUrl: gsURL("_", "logstream.index") ,
592 DataUrl: gsURL("_", "data.bin"),
593 })
594 })
595 })
596
560 // Simulate failures during the various stream generation operat ions. 597 // Simulate failures during the various stream generation operat ions.
561 Convey(`Stream generation failures`, func() { 598 Convey(`Stream generation failures`, func() {
562 stream.State.TerminalIndex = 3 599 stream.State.TerminalIndex = 3
563 » » » addTestEntry(0, 1, 2, 3) 600 » » » addTestEntry(project, 0, 1, 2, 3)
564 601
565 for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} { 602 for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
566 for _, testCase := range []struct { 603 for _, testCase := range []struct {
567 name string 604 name string
568 setup func() 605 setup func()
569 }{ 606 }{
570 {"writer create failure", func() { 607 {"writer create failure", func() {
571 gsc.newWriterErr = func(w *testG SWriter) error { 608 gsc.newWriterErr = func(w *testG SWriter) error {
572 if strings.HasSuffix(str ing(w.path), failName) { 609 if strings.HasSuffix(str ing(w.path), failName) {
573 return errors.Wr apTransient(errors.New("test error")) 610 return errors.Wr apTransient(errors.New("test error"))
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
629 ack, err := ar.archiveTaskImpl(c , task) 666 ack, err := ar.archiveTaskImpl(c , task)
630 So(err, ShouldErrLike, "test err or") 667 So(err, ShouldErrLike, "test err or")
631 So(ack, ShouldBeFalse) 668 So(ack, ShouldBeFalse)
632 So(archiveRequest, ShouldBeNil) 669 So(archiveRequest, ShouldBeNil)
633 }) 670 })
634 } 671 }
635 } 672 }
636 }) 673 })
637 }) 674 })
638 } 675 }
OLDNEW
« no previous file with comments | « server/internal/logdog/archivist/archivist.go ('k') | server/internal/logdog/archivist/storageSource.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698