Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/logdog/coordinator" | 9 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/grpcutil" | 12 "github.com/luci/luci-go/common/grpcutil" |
| 13 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 14 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/common/proto/google" | 15 "github.com/luci/luci-go/common/proto/google" |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 "google.golang.org/grpc/codes" | 17 "google.golang.org/grpc/codes" |
| 18 ) | 18 ) |
| 19 | 19 |
| 20 // ArchiveStream implements the logdog.ServicesServer interface. | 20 // ArchiveStream implements the logdog.ServicesServer interface. |
| 21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { | 21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { |
| 22 » if err := Auth(c); err != nil { | 22 » svc := b.GetServices() |
| 23 » if err := Auth(c, svc); err != nil { | |
| 23 return nil, err | 24 return nil, err |
| 24 } | 25 } |
| 25 | 26 |
| 26 log.Fields{ | 27 log.Fields{ |
| 27 » » "path": req.Path, | 28 » » "path": req.Path, |
| 28 » }.Infof(c, "Marking log stream as archived.") | 29 » » "complete": req.Complete(), |
| 30 » » "terminalIndex": req.TerminalIndex, | |
| 31 » » "logEntryCount": req.LogEntryCount, | |
| 32 » » "error": req.Error, | |
| 33 » }.Infof(c, "Received archival request.") | |
|
dnj
2016/04/11 17:20:04
Makes it much easier to find logs in GAE. I recomm
| |
| 29 | 34 |
| 30 // Verify that the request is minimially valid. | 35 // Verify that the request is minimially valid. |
| 31 path := types.StreamPath(req.Path) | 36 path := types.StreamPath(req.Path) |
| 32 if err := path.Validate(); err != nil { | 37 if err := path.Validate(); err != nil { |
| 33 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) | 38 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) |
| 34 } | 39 } |
| 35 | 40 |
| 36 switch { | 41 switch { |
| 37 case req.IndexUrl == "": | 42 case req.IndexUrl == "": |
| 38 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") | 43 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") |
| 39 case req.StreamUrl == "": | 44 case req.StreamUrl == "": |
| 40 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") | 45 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") |
| 41 } | 46 } |
| 42 | 47 |
| 43 ls := coordinator.LogStreamFromPath(path) | 48 ls := coordinator.LogStreamFromPath(path) |
| 44 | 49 |
| 45 » // (Non-transactional) Is the log stream already archived? | 50 » log.Fields{ |
| 46 » switch err := ds.Get(c).Get(ls); err { | 51 » » "id": ls.HashID, |
| 47 » case nil: | 52 » }.Infof(c, "Log stream ID.") |
| 48 » » if ls.Archived() { | |
| 49 » » » log.Infof(c, "Log stream already marked as archived (non -transactional).") | |
| 50 » » » return &google.Empty{}, nil | |
| 51 » » } | |
| 52 | |
| 53 » case ds.ErrNoSuchEntity: | |
| 54 » » break | |
| 55 | |
| 56 » default: | |
| 57 » » log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") | |
| 58 » » return nil, grpcutil.Internal | |
| 59 » } | |
| 60 | 53 |
| 61 // Post the archival results to the Coordinator. | 54 // Post the archival results to the Coordinator. |
| 62 now := clock.Now(c).UTC() | 55 now := clock.Now(c).UTC() |
| 56 var ierr error | |
| 63 err := ds.Get(c).RunInTransaction(func(c context.Context) error { | 57 err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| 58 ierr = nil | |
| 59 | |
| 60 // Note that within this transaction, we have two return values: | |
| 61 // - Non-nil to abort the transaction. | |
| 62 // - Specific error via "ierr". | |
| 64 di := ds.Get(c) | 63 di := ds.Get(c) |
| 65 if err := di.Get(ls); err != nil { | 64 if err := di.Get(ls); err != nil { |
| 66 return err | 65 return err |
| 67 } | 66 } |
| 68 » » if ls.Archived() { | 67 |
| 69 » » » log.Infof(c, "Log stream already marked as archived.") | 68 » » // If our log stream is not in LSArchiveTasked, we will reject t his archive |
| 69 » » // request with FailedPrecondition. | |
| 70 » » switch { | |
| 71 » » case ls.Archived(): | |
| 72 » » » // Return nil if the log stream is already archived (ide mpotent). | |
| 73 » » » log.Warningf(c, "Log stream is already archived.") | |
| 70 return nil | 74 return nil |
| 75 | |
| 76 case ls.State != coordinator.LSArchiveTasked: | |
| 77 log.Fields{ | |
| 78 "state": ls.State, | |
| 79 }.Errorf(c, "Log stream is not in archival tasked state. ") | |
| 80 ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stre am has not tasked an archival.") | |
| 81 return ierr | |
| 82 } | |
| 83 | |
| 84 // If this request contained an error, we will record an empty a rchival and | |
| 85 // log a warning. | |
| 86 if req.Error != "" { | |
| 87 log.Fields{ | |
| 88 "archiveError": req.Error, | |
| 89 }.Warningf(c, "Log stream archival indicated error. Arch iving empty stream.") | |
| 90 | |
| 91 req.TerminalIndex = -1 | |
| 92 req.LogEntryCount = 0 | |
| 71 } | 93 } |
| 72 | 94 |
| 73 // Update archival information. Make sure this actually marks th e stream as | 95 // Update archival information. Make sure this actually marks th e stream as |
| 74 // archived. | 96 // archived. |
| 75 ls.Updated = now | |
| 76 ls.State = coordinator.LSArchived | 97 ls.State = coordinator.LSArchived |
| 77 » » ls.ArchiveWhole = req.Complete | 98 » » ls.ArchivedTime = now |
| 99 » » ls.ArchivalKey = nil // No point in wasting datastore space on t his. | |
| 100 | |
| 101 » » if ls.TerminalIndex < 0 { | |
| 102 » » » // Also set the terminated time. | |
| 103 » » » ls.TerminatedTime = now | |
| 104 » » } | |
| 78 ls.TerminalIndex = req.TerminalIndex | 105 ls.TerminalIndex = req.TerminalIndex |
| 106 | |
| 107 ls.ArchiveLogEntryCount = req.LogEntryCount | |
| 79 ls.ArchiveStreamURL = req.StreamUrl | 108 ls.ArchiveStreamURL = req.StreamUrl |
| 80 ls.ArchiveStreamSize = req.StreamSize | 109 ls.ArchiveStreamSize = req.StreamSize |
| 81 ls.ArchiveIndexURL = req.IndexUrl | 110 ls.ArchiveIndexURL = req.IndexUrl |
| 82 ls.ArchiveIndexSize = req.IndexSize | 111 ls.ArchiveIndexSize = req.IndexSize |
| 83 ls.ArchiveDataURL = req.DataUrl | 112 ls.ArchiveDataURL = req.DataUrl |
| 84 ls.ArchiveDataSize = req.DataSize | 113 ls.ArchiveDataSize = req.DataSize |
| 85 | 114 |
| 86 // Update the log stream. | 115 // Update the log stream. |
| 87 » » if err := ls.Put(di); err != nil { | 116 » » if err := di.Put(ls); err != nil { |
| 88 log.WithError(err).Errorf(c, "Failed to update log strea m.") | 117 log.WithError(err).Errorf(c, "Failed to update log strea m.") |
| 89 return err | 118 return err |
| 90 } | 119 } |
| 91 | 120 |
| 92 log.Infof(c, "Successfully marked stream as archived.") | 121 log.Infof(c, "Successfully marked stream as archived.") |
| 93 return nil | 122 return nil |
| 94 }, nil) | 123 }, nil) |
| 124 if ierr != nil { | |
| 125 log.WithError(ierr).Errorf(c, "Failed to mark stream as archived .") | |
| 126 return nil, ierr | |
| 127 } | |
| 95 if err != nil { | 128 if err != nil { |
| 96 » » log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") | 129 » » log.WithError(err).Errorf(c, "Internal error.") |
| 97 return nil, grpcutil.Internal | 130 return nil, grpcutil.Internal |
| 98 } | 131 } |
| 99 | 132 |
| 100 return &google.Empty{}, nil | 133 return &google.Empty{}, nil |
| 101 } | 134 } |
| OLD | NEW |