| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 441 }.Errorf(c, "Invalid storage staging base.") | 441 }.Errorf(c, "Invalid storage staging base.") |
| 442 return nil, errors.New("invalid storage staging base") | 442 return nil, errors.New("invalid storage staging base") |
| 443 | 443 |
| 444 default: | 444 default: |
| 445 return st, nil | 445 return st, nil |
| 446 } | 446 } |
| 447 } | 447 } |
| 448 | 448 |
| 449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
Name, | 449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
Name, |
| 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva
l, error) { | 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva
l, error) { |
| 451 |
| 451 sa := stagedArchival{ | 452 sa := stagedArchival{ |
| 452 Archivist: a, | 453 Archivist: a, |
| 453 Settings: st, | 454 Settings: st, |
| 454 project: project, | 455 project: project, |
| 455 | 456 |
| 456 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), | 457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), |
| 457 } | 458 } |
| 458 | 459 |
| 459 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a | 460 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a |
| 460 // non-transient error. | 461 // non-transient error. |
| 461 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 462 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| 462 log.Fields{ | 463 log.Fields{ |
| 463 log.ErrorKey: err, | 464 log.ErrorKey: err, |
| 464 "protoVersion": ls.State.ProtoVersion, | 465 "protoVersion": ls.State.ProtoVersion, |
| 465 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 466 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| 466 return nil, err | 467 return nil, err |
| 467 } | 468 } |
| 468 sa.path = sa.desc.Path() | 469 sa.path = sa.desc.Path() |
| 469 | 470 |
| 470 bext := sa.desc.BinaryFileExt | |
| 471 if bext == "" { | |
| 472 bext = "bin" | |
| 473 } | |
| 474 | |
| 475 // Construct our staged archival paths. | 471 // Construct our staged archival paths. |
| 476 sa.stream = sa.makeStagingPaths("logstream.entries", uid) | 472 sa.stream = sa.makeStagingPaths("logstream.entries", uid) |
| 477 sa.index = sa.makeStagingPaths("logstream.index", uid) | 473 sa.index = sa.makeStagingPaths("logstream.index", uid) |
| 478 » sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) | 474 |
| 475 » // If we're emitting binary files, construct that too. |
| 476 » bext := sa.desc.BinaryFileExt |
| 477 » if bext != "" || sa.AlwaysRender { |
| 478 » » // If no binary file extension was supplied, choose a default. |
| 479 » » if bext == "" { |
| 480 » » » bext = "bin" |
| 481 » » } |
| 482 |
| 483 » » sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) |
| 484 » } |
| 479 return &sa, nil | 485 return &sa, nil |
| 480 } | 486 } |
| 481 | 487 |
| 482 type stagedArchival struct { | 488 type stagedArchival struct { |
| 483 *Archivist | 489 *Archivist |
| 484 *Settings | 490 *Settings |
| 485 | 491 |
| 486 project config.ProjectName | 492 project config.ProjectName |
| 487 path types.StreamPath | 493 path types.StreamPath |
| 488 desc logpb.LogStreamDescriptor | 494 desc logpb.LogStreamDescriptor |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 610 if streamWriter, err = createWriter(sa.stream.staged); err != nil { | 616 if streamWriter, err = createWriter(sa.stream.staged); err != nil { |
| 611 return | 617 return |
| 612 } | 618 } |
| 613 defer closeWriter(streamWriter, sa.stream.staged) | 619 defer closeWriter(streamWriter, sa.stream.staged) |
| 614 | 620 |
| 615 if indexWriter, err = createWriter(sa.index.staged); err != nil { | 621 if indexWriter, err = createWriter(sa.index.staged); err != nil { |
| 616 return err | 622 return err |
| 617 } | 623 } |
| 618 defer closeWriter(indexWriter, sa.index.staged) | 624 defer closeWriter(indexWriter, sa.index.staged) |
| 619 | 625 |
| 620 » if dataWriter, err = createWriter(sa.data.staged); err != nil { | 626 » if sa.data.enabled() { |
| 621 » » return err | 627 » » // Only emit a data stream if we are configured to do so. |
| 628 » » if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| 629 » » » return err |
| 630 » » } |
| 631 » » defer closeWriter(dataWriter, sa.data.staged) |
| 622 } | 632 } |
| 623 defer closeWriter(dataWriter, sa.data.staged) | |
| 624 | 633 |
| 625 // Read our log entries from intermediate storage. | 634 // Read our log entries from intermediate storage. |
| 626 ss := storageSource{ | 635 ss := storageSource{ |
| 627 Context: c, | 636 Context: c, |
| 628 st: sa.Storage, | 637 st: sa.Storage, |
| 629 project: sa.project, | 638 project: sa.project, |
| 630 path: sa.path, | 639 path: sa.path, |
| 631 terminalIndex: sa.terminalIndex, | 640 terminalIndex: sa.terminalIndex, |
| 632 lastIndex: -1, | 641 lastIndex: -1, |
| 633 } | 642 } |
| (...skipping 26 matching lines...) Expand all Loading... |
| 660 "terminalIndex": ss.lastIndex, | 669 "terminalIndex": ss.lastIndex, |
| 661 "logEntryCount": ss.logEntryCount, | 670 "logEntryCount": ss.logEntryCount, |
| 662 }.Debugf(c, "Finished archiving log stream.") | 671 }.Debugf(c, "Finished archiving log stream.") |
| 663 } | 672 } |
| 664 | 673 |
| 665 // Update our state with archival results. | 674 // Update our state with archival results. |
| 666 sa.terminalIndex = ss.lastIndex | 675 sa.terminalIndex = ss.lastIndex |
| 667 sa.logEntryCount = ss.logEntryCount | 676 sa.logEntryCount = ss.logEntryCount |
| 668 sa.stream.bytesWritten = streamWriter.Count() | 677 sa.stream.bytesWritten = streamWriter.Count() |
| 669 sa.index.bytesWritten = indexWriter.Count() | 678 sa.index.bytesWritten = indexWriter.Count() |
| 670 » sa.data.bytesWritten = dataWriter.Count() | 679 » if dataWriter != nil { |
| 680 » » sa.data.bytesWritten = dataWriter.Count() |
| 681 » } |
| 671 return | 682 return |
| 672 } | 683 } |
| 673 | 684 |
| 674 type stagingPaths struct { | 685 type stagingPaths struct { |
| 675 staged gs.Path | 686 staged gs.Path |
| 676 final gs.Path | 687 final gs.Path |
| 677 bytesWritten int64 | 688 bytesWritten int64 |
| 678 } | 689 } |
| 679 | 690 |
| 680 func (d *stagingPaths) clearStaged() { | 691 func (d *stagingPaths) clearStaged() { d.staged = "" } |
| 681 » d.staged = "" | 692 |
| 682 } | 693 func (d *stagingPaths) enabled() bool { return d.final != "" } |
| 683 | 694 |
| 684 func (d *stagingPaths) addMetrics(c context.Context, archiveField, streamField s
tring) { | 695 func (d *stagingPaths) addMetrics(c context.Context, archiveField, streamField s
tring) { |
| 685 tsSize.Add(c, float64(d.bytesWritten), archiveField, streamField) | 696 tsSize.Add(c, float64(d.bytesWritten), archiveField, streamField) |
| 686 tsTotalBytes.Add(c, d.bytesWritten, archiveField, streamField) | 697 tsTotalBytes.Add(c, d.bytesWritten, archiveField, streamField) |
| 687 } | 698 } |
| 688 | 699 |
| 689 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd
og.ArchiveStreamRequest) error { | 700 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd
og.ArchiveStreamRequest) error { |
| 690 err := parallel.FanOutIn(func(taskC chan<- func() error) { | 701 err := parallel.FanOutIn(func(taskC chan<- func() error) { |
| 691 for _, d := range sa.getStagingPaths() { | 702 for _, d := range sa.getStagingPaths() { |
| 692 d := d | 703 d := d |
| 693 | 704 |
| 694 » » » // Don't copy zero-sized streams. | 705 » » » // Don't finalize zero-sized streams. |
| 695 » » » if d.bytesWritten == 0 { | 706 » » » if !d.enabled() || d.bytesWritten == 0 { |
| 696 continue | 707 continue |
| 697 } | 708 } |
| 698 | 709 |
| 699 taskC <- func() error { | 710 taskC <- func() error { |
| 700 if err := client.Rename(d.staged, d.final); err
!= nil { | 711 if err := client.Rename(d.staged, d.final); err
!= nil { |
| 701 log.Fields{ | 712 log.Fields{ |
| 702 log.ErrorKey: err, | 713 log.ErrorKey: err, |
| 703 "stagedPath": d.staged, | 714 "stagedPath": d.staged, |
| 704 "finalPath": d.final, | 715 "finalPath": d.final, |
| 705 }.Errorf(c, "Failed to rename GS object.
") | 716 }.Errorf(c, "Failed to rename GS object.
") |
| 706 return err | 717 return err |
| 707 } | 718 } |
| 708 | 719 |
| 709 // Clear the staged value to indicate that it no
longer exists. | 720 // Clear the staged value to indicate that it no
longer exists. |
| 710 d.clearStaged() | 721 d.clearStaged() |
| 711 return nil | 722 return nil |
| 712 } | 723 } |
| 713 } | 724 } |
| 714 }) | 725 }) |
| 715 if err != nil { | 726 if err != nil { |
| 716 return err | 727 return err |
| 717 } | 728 } |
| 718 | 729 |
| 719 ar.TerminalIndex = int64(sa.terminalIndex) | 730 ar.TerminalIndex = int64(sa.terminalIndex) |
| 720 ar.LogEntryCount = sa.logEntryCount | 731 ar.LogEntryCount = sa.logEntryCount |
| 721 ar.StreamUrl = string(sa.stream.final) | 732 ar.StreamUrl = string(sa.stream.final) |
| 722 ar.StreamSize = sa.stream.bytesWritten | 733 ar.StreamSize = sa.stream.bytesWritten |
| 723 ar.IndexUrl = string(sa.index.final) | 734 ar.IndexUrl = string(sa.index.final) |
| 724 ar.IndexSize = sa.index.bytesWritten | 735 ar.IndexSize = sa.index.bytesWritten |
| 725 » ar.DataUrl = string(sa.data.final) | 736 » if sa.data.enabled() { |
| 726 » ar.DataSize = sa.data.bytesWritten | 737 » » ar.DataUrl = string(sa.data.final) |
| 738 » » ar.DataSize = sa.data.bytesWritten |
| 739 » } |
| 727 return nil | 740 return nil |
| 728 } | 741 } |
| 729 | 742 |
| 730 func (sa *stagedArchival) cleanup(c context.Context) { | 743 func (sa *stagedArchival) cleanup(c context.Context) { |
| 731 for _, d := range sa.getStagingPaths() { | 744 for _, d := range sa.getStagingPaths() { |
| 732 if d.staged == "" { | 745 if d.staged == "" { |
| 733 continue | 746 continue |
| 734 } | 747 } |
| 735 | 748 |
| 736 if err := sa.GSClient.Delete(d.staged); err != nil { | 749 if err := sa.GSClient.Delete(d.staged); err != nil { |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 779 return e.inner | 792 return e.inner |
| 780 } | 793 } |
| 781 | 794 |
| 782 func isFailure(err error) bool { | 795 func isFailure(err error) bool { |
| 783 if err == nil { | 796 if err == nil { |
| 784 return false | 797 return false |
| 785 } | 798 } |
| 786 _, ok := err.(*statusErrorWrapper) | 799 _, ok := err.(*statusErrorWrapper) |
| 787 return !ok | 800 return !ok |
| 788 } | 801 } |
| OLD | NEW |