Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: logdog/server/archivist/archivist.go

Issue 2321173002: LogDog/Archivist: Conditionally render data. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | logdog/server/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
441 }.Errorf(c, "Invalid storage staging base.") 441 }.Errorf(c, "Invalid storage staging base.")
442 return nil, errors.New("invalid storage staging base") 442 return nil, errors.New("invalid storage staging base")
443 443
444 default: 444 default:
445 return st, nil 445 return st, nil
446 } 446 }
447 } 447 }
448 448
449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, 449 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name,
450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) { 450 st *Settings, ls *logdog.LoadStreamResponse, uid string) (*stagedArchiva l, error) {
451
451 sa := stagedArchival{ 452 sa := stagedArchival{
452 Archivist: a, 453 Archivist: a,
453 Settings: st, 454 Settings: st,
454 project: project, 455 project: project,
455 456
456 terminalIndex: types.MessageIndex(ls.State.TerminalIndex), 457 terminalIndex: types.MessageIndex(ls.State.TerminalIndex),
457 } 458 }
458 459
459 // Deserialize and validate the descriptor protobuf. If this fails, it i s a 460 // Deserialize and validate the descriptor protobuf. If this fails, it i s a
460 // non-transient error. 461 // non-transient error.
461 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { 462 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
462 log.Fields{ 463 log.Fields{
463 log.ErrorKey: err, 464 log.ErrorKey: err,
464 "protoVersion": ls.State.ProtoVersion, 465 "protoVersion": ls.State.ProtoVersion,
465 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") 466 }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
466 return nil, err 467 return nil, err
467 } 468 }
468 sa.path = sa.desc.Path() 469 sa.path = sa.desc.Path()
469 470
470 bext := sa.desc.BinaryFileExt
471 if bext == "" {
472 bext = "bin"
473 }
474
475 // Construct our staged archival paths. 471 // Construct our staged archival paths.
476 sa.stream = sa.makeStagingPaths("logstream.entries", uid) 472 sa.stream = sa.makeStagingPaths("logstream.entries", uid)
477 sa.index = sa.makeStagingPaths("logstream.index", uid) 473 sa.index = sa.makeStagingPaths("logstream.index", uid)
478 » sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid) 474
475 » // If we're emitting binary files, construct that too.
476 » bext := sa.desc.BinaryFileExt
477 » if bext != "" || sa.AlwaysRender {
478 » » // If no binary file extension was supplied, choose a default.
479 » » if bext == "" {
480 » » » bext = "bin"
481 » » }
482
483 » » sa.data = sa.makeStagingPaths(fmt.Sprintf("data.%s", bext), uid)
484 » }
479 return &sa, nil 485 return &sa, nil
480 } 486 }
481 487
482 type stagedArchival struct { 488 type stagedArchival struct {
483 *Archivist 489 *Archivist
484 *Settings 490 *Settings
485 491
486 project config.ProjectName 492 project config.ProjectName
487 path types.StreamPath 493 path types.StreamPath
488 desc logpb.LogStreamDescriptor 494 desc logpb.LogStreamDescriptor
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
610 if streamWriter, err = createWriter(sa.stream.staged); err != nil { 616 if streamWriter, err = createWriter(sa.stream.staged); err != nil {
611 return 617 return
612 } 618 }
613 defer closeWriter(streamWriter, sa.stream.staged) 619 defer closeWriter(streamWriter, sa.stream.staged)
614 620
615 if indexWriter, err = createWriter(sa.index.staged); err != nil { 621 if indexWriter, err = createWriter(sa.index.staged); err != nil {
616 return err 622 return err
617 } 623 }
618 defer closeWriter(indexWriter, sa.index.staged) 624 defer closeWriter(indexWriter, sa.index.staged)
619 625
620 » if dataWriter, err = createWriter(sa.data.staged); err != nil { 626 » if sa.data.enabled() {
621 » » return err 627 » » // Only emit a data stream if we are configured to do so.
628 » » if dataWriter, err = createWriter(sa.data.staged); err != nil {
629 » » » return err
630 » » }
631 » » defer closeWriter(dataWriter, sa.data.staged)
622 } 632 }
623 defer closeWriter(dataWriter, sa.data.staged)
624 633
625 // Read our log entries from intermediate storage. 634 // Read our log entries from intermediate storage.
626 ss := storageSource{ 635 ss := storageSource{
627 Context: c, 636 Context: c,
628 st: sa.Storage, 637 st: sa.Storage,
629 project: sa.project, 638 project: sa.project,
630 path: sa.path, 639 path: sa.path,
631 terminalIndex: sa.terminalIndex, 640 terminalIndex: sa.terminalIndex,
632 lastIndex: -1, 641 lastIndex: -1,
633 } 642 }
(...skipping 26 matching lines...) Expand all
660 "terminalIndex": ss.lastIndex, 669 "terminalIndex": ss.lastIndex,
661 "logEntryCount": ss.logEntryCount, 670 "logEntryCount": ss.logEntryCount,
662 }.Debugf(c, "Finished archiving log stream.") 671 }.Debugf(c, "Finished archiving log stream.")
663 } 672 }
664 673
665 // Update our state with archival results. 674 // Update our state with archival results.
666 sa.terminalIndex = ss.lastIndex 675 sa.terminalIndex = ss.lastIndex
667 sa.logEntryCount = ss.logEntryCount 676 sa.logEntryCount = ss.logEntryCount
668 sa.stream.bytesWritten = streamWriter.Count() 677 sa.stream.bytesWritten = streamWriter.Count()
669 sa.index.bytesWritten = indexWriter.Count() 678 sa.index.bytesWritten = indexWriter.Count()
670 » sa.data.bytesWritten = dataWriter.Count() 679 » if dataWriter != nil {
680 » » sa.data.bytesWritten = dataWriter.Count()
681 » }
671 return 682 return
672 } 683 }
673 684
674 type stagingPaths struct { 685 type stagingPaths struct {
675 staged gs.Path 686 staged gs.Path
676 final gs.Path 687 final gs.Path
677 bytesWritten int64 688 bytesWritten int64
678 } 689 }
679 690
680 func (d *stagingPaths) clearStaged() { 691 func (d *stagingPaths) clearStaged() { d.staged = "" }
681 » d.staged = "" 692
682 } 693 func (d *stagingPaths) enabled() bool { return d.final != "" }
683 694
684 func (d *stagingPaths) addMetrics(c context.Context, archiveField, streamField s tring) { 695 func (d *stagingPaths) addMetrics(c context.Context, archiveField, streamField s tring) {
685 tsSize.Add(c, float64(d.bytesWritten), archiveField, streamField) 696 tsSize.Add(c, float64(d.bytesWritten), archiveField, streamField)
686 tsTotalBytes.Add(c, d.bytesWritten, archiveField, streamField) 697 tsTotalBytes.Add(c, d.bytesWritten, archiveField, streamField)
687 } 698 }
688 699
689 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd og.ArchiveStreamRequest) error { 700 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd og.ArchiveStreamRequest) error {
690 err := parallel.FanOutIn(func(taskC chan<- func() error) { 701 err := parallel.FanOutIn(func(taskC chan<- func() error) {
691 for _, d := range sa.getStagingPaths() { 702 for _, d := range sa.getStagingPaths() {
692 d := d 703 d := d
693 704
694 » » » // Don't copy zero-sized streams. 705 » » » // Don't finalize zero-sized streams.
695 » » » if d.bytesWritten == 0 { 706 » » » if !d.enabled() || d.bytesWritten == 0 {
696 continue 707 continue
697 } 708 }
698 709
699 taskC <- func() error { 710 taskC <- func() error {
700 if err := client.Rename(d.staged, d.final); err != nil { 711 if err := client.Rename(d.staged, d.final); err != nil {
701 log.Fields{ 712 log.Fields{
702 log.ErrorKey: err, 713 log.ErrorKey: err,
703 "stagedPath": d.staged, 714 "stagedPath": d.staged,
704 "finalPath": d.final, 715 "finalPath": d.final,
705 }.Errorf(c, "Failed to rename GS object. ") 716 }.Errorf(c, "Failed to rename GS object. ")
706 return err 717 return err
707 } 718 }
708 719
709 // Clear the staged value to indicate that it no longer exists. 720 // Clear the staged value to indicate that it no longer exists.
710 d.clearStaged() 721 d.clearStaged()
711 return nil 722 return nil
712 } 723 }
713 } 724 }
714 }) 725 })
715 if err != nil { 726 if err != nil {
716 return err 727 return err
717 } 728 }
718 729
719 ar.TerminalIndex = int64(sa.terminalIndex) 730 ar.TerminalIndex = int64(sa.terminalIndex)
720 ar.LogEntryCount = sa.logEntryCount 731 ar.LogEntryCount = sa.logEntryCount
721 ar.StreamUrl = string(sa.stream.final) 732 ar.StreamUrl = string(sa.stream.final)
722 ar.StreamSize = sa.stream.bytesWritten 733 ar.StreamSize = sa.stream.bytesWritten
723 ar.IndexUrl = string(sa.index.final) 734 ar.IndexUrl = string(sa.index.final)
724 ar.IndexSize = sa.index.bytesWritten 735 ar.IndexSize = sa.index.bytesWritten
725 » ar.DataUrl = string(sa.data.final) 736 » if sa.data.enabled() {
726 » ar.DataSize = sa.data.bytesWritten 737 » » ar.DataUrl = string(sa.data.final)
738 » » ar.DataSize = sa.data.bytesWritten
739 » }
727 return nil 740 return nil
728 } 741 }
729 742
730 func (sa *stagedArchival) cleanup(c context.Context) { 743 func (sa *stagedArchival) cleanup(c context.Context) {
731 for _, d := range sa.getStagingPaths() { 744 for _, d := range sa.getStagingPaths() {
732 if d.staged == "" { 745 if d.staged == "" {
733 continue 746 continue
734 } 747 }
735 748
736 if err := sa.GSClient.Delete(d.staged); err != nil { 749 if err := sa.GSClient.Delete(d.staged); err != nil {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
779 return e.inner 792 return e.inner
780 } 793 }
781 794
782 func isFailure(err error) bool { 795 func isFailure(err error) bool {
783 if err == nil { 796 if err == nil {
784 return false 797 return false
785 } 798 }
786 _, ok := err.(*statusErrorWrapper) 799 _, ok := err.(*statusErrorWrapper)
787 return !ok 800 return !ok
788 } 801 }
OLDNEW
« no previous file with comments | « no previous file | logdog/server/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698