Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/logdog/coordinator" | 9 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 10 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | |
| 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 11 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/grpcutil" | 13 "github.com/luci/luci-go/common/grpcutil" |
| 13 "github.com/luci/luci-go/common/logdog/types" | 14 "github.com/luci/luci-go/common/logdog/types" |
| 14 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/common/proto/google" | 16 "github.com/luci/luci-go/common/proto/google" |
| 16 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 17 "google.golang.org/grpc/codes" | 18 "google.golang.org/grpc/codes" |
| 18 ) | 19 ) |
| 19 | 20 |
| 20 // ArchiveStream implements the logdog.ServicesServer interface. | 21 // ArchiveStream implements the logdog.ServicesServer interface. |
| 21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { | 22 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { |
| 22 if err := Auth(c); err != nil { | 23 if err := Auth(c); err != nil { |
| 23 return nil, err | 24 return nil, err |
| 24 } | 25 } |
| 25 | 26 |
| 27 cfg, err := config.Load(c) | |
| 28 if err != nil { | |
| 29 log.WithError(err).Errorf(c, "Failed to load configuration.") | |
| 30 return nil, grpcutil.Internal | |
| 31 } | |
| 32 ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil. | |
| 33 | |
| 26 log.Fields{ | 34 log.Fields{ |
| 27 » » "path": req.Path, | 35 » » "path": req.Path, |
| 28 » }.Infof(c, "Marking log stream as archived.") | 36 » » "complete": req.Complete(), |
| 37 » » "terminalIndex": req.TerminalIndex, | |
| 38 » » "logEntryCount": req.LogEntryCount, | |
| 39 » » "error": req.Error, | |
| 40 » }.Infof(c, "Received archival request.") | |
| 29 | 41 |
| 30 // Verify that the request is minimially valid. | 42 // Verify that the request is minimially valid. |
| 31 path := types.StreamPath(req.Path) | 43 path := types.StreamPath(req.Path) |
| 32 if err := path.Validate(); err != nil { | 44 if err := path.Validate(); err != nil { |
| 33 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) | 45 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) |
| 34 } | 46 } |
| 35 | 47 |
| 36 switch { | 48 switch { |
| 37 case req.IndexUrl == "": | 49 case req.IndexUrl == "": |
| 38 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") | 50 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") |
| 39 case req.StreamUrl == "": | 51 case req.StreamUrl == "": |
| 40 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") | 52 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") |
| 41 } | 53 } |
| 42 | 54 |
| 43 ls := coordinator.LogStreamFromPath(path) | 55 ls := coordinator.LogStreamFromPath(path) |
| 44 | 56 |
| 45 » // (Non-transactional) Is the log stream already archived? | 57 » log.Fields{ |
| 46 » switch err := ds.Get(c).Get(ls); err { | 58 » » "id": ls.HashID(), |
| 47 » case nil: | 59 » }.Infof(c, "Log stream ID.") |
| 48 » » if ls.Archived() { | |
| 49 » » » log.Infof(c, "Log stream already marked as archived (non -transactional).") | |
| 50 » » » return &google.Empty{}, nil | |
| 51 » » } | |
| 52 | |
| 53 » case ds.ErrNoSuchEntity: | |
| 54 » » break | |
| 55 | |
| 56 » default: | |
| 57 » » log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") | |
| 58 » » return nil, grpcutil.Internal | |
| 59 » } | |
| 60 | 60 |
| 61 // Post the archival results to the Coordinator. | 61 // Post the archival results to the Coordinator. |
| 62 now := clock.Now(c).UTC() | 62 now := clock.Now(c).UTC() |
| 63 » err := ds.Get(c).RunInTransaction(func(c context.Context) error { | 63 » var ierr error |
|
Vadim Sh.
2016/04/07 01:21:32
nit: reset ierr to nil at the beginning of a trans
dnj
2016/04/11 17:20:03
Done.
| |
| 64 » err = ds.Get(c).RunInTransaction(func(c context.Context) error { | |
| 65 » » // Note that within this transaction, we have two return values: | |
| 66 » » // - Non-nil to abort the transaction. | |
| 67 » » // - Specific error via "ierr". | |
| 64 di := ds.Get(c) | 68 di := ds.Get(c) |
| 65 if err := di.Get(ls); err != nil { | 69 if err := di.Get(ls); err != nil { |
| 66 return err | 70 return err |
| 67 } | 71 } |
| 68 » » if ls.Archived() { | 72 |
| 69 » » » log.Infof(c, "Log stream already marked as archived.") | 73 » » // If our log stream is not in LSArchiveTasked, we will reject t his archive |
| 74 » » // request with FailedPrecondition. | |
| 75 » » switch { | |
| 76 » » case ls.Archived(): | |
| 77 » » » // Return nil if the log stream is already archived (ide mpotent). | |
| 78 » » » log.Warningf(c, "Log stream is already archived.") | |
| 70 return nil | 79 return nil |
| 80 | |
| 81 case ls.State != coordinator.LSArchiveTasked: | |
| 82 log.Fields{ | |
| 83 "state": ls.State, | |
| 84 }.Errorf(c, "Log stream is not in archival tasked state. ") | |
| 85 ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stre am has not tasked an archival.") | |
| 86 return ierr | |
| 87 } | |
| 88 | |
| 89 // If this request contained an error, we will reject it with Ab orted status | |
| 90 // if we are below our error threshold. | |
| 91 if req.Error { | |
| 92 // Increment our error count. | |
| 93 ls.ArchiveErrors++ | |
| 94 if err := di.Put(ls); err != nil { | |
| 95 log.WithError(err).Errorf(c, "Failed to update l og stream error count.") | |
| 96 return err | |
| 97 } | |
| 98 | |
| 99 retries := int(ccfg.ArchiveRetries) | |
| 100 if ls.ArchiveErrors < retries { | |
| 101 log.Fields{ | |
| 102 "path": req.Path, | |
| 103 "errorCount": ls.ArchiveErrors, | |
| 104 "archiveRetries": retries, | |
| 105 }.Warningf(c, "Rejecting failed archival: below error threshold.") | |
| 106 | |
| 107 // Fail this RPC call to keep the archival task in the queue. | |
| 108 // | |
| 109 // We return this via "ierr" because we want the transaction to succeed. | |
| 110 ierr = grpcutil.Errf(codes.Aborted, "below error threshold (%d < %d)", ls.ArchiveErrors, retries) | |
| 111 return nil | |
| 112 } | |
| 113 | |
| 114 // We have exceeded our error threshold, so we will cont inue to archive | |
| 115 // this log stream. | |
| 116 log.Fields{ | |
| 117 "path": req.Path, | |
| 118 "errorCount": ls.ArchiveErrors, | |
| 119 "archiveRetries": retries, | |
| 120 }.Warningf(c, "Log stream has exceeded archive error thr eshold. Archiving empty stream.") | |
| 121 | |
| 122 req.TerminalIndex = -1 | |
| 123 req.LogEntryCount = 0 | |
| 71 } | 124 } |
| 72 | 125 |
| 73 // Update archival information. Make sure this actually marks th e stream as | 126 // Update archival information. Make sure this actually marks th e stream as |
| 74 // archived. | 127 // archived. |
| 75 ls.Updated = now | |
| 76 ls.State = coordinator.LSArchived | 128 ls.State = coordinator.LSArchived |
| 77 » » ls.ArchiveWhole = req.Complete | 129 » » ls.ArchivedTime = now |
| 130 | |
| 131 » » if ls.TerminalIndex < 0 { | |
| 132 » » » // Also set the terminated time. | |
| 133 » » » ls.TerminatedTime = now | |
| 134 » » } | |
| 78 ls.TerminalIndex = req.TerminalIndex | 135 ls.TerminalIndex = req.TerminalIndex |
| 136 | |
| 137 ls.ArchiveLogEntryCount = req.LogEntryCount | |
| 79 ls.ArchiveStreamURL = req.StreamUrl | 138 ls.ArchiveStreamURL = req.StreamUrl |
| 80 ls.ArchiveStreamSize = req.StreamSize | 139 ls.ArchiveStreamSize = req.StreamSize |
| 81 ls.ArchiveIndexURL = req.IndexUrl | 140 ls.ArchiveIndexURL = req.IndexUrl |
| 82 ls.ArchiveIndexSize = req.IndexSize | 141 ls.ArchiveIndexSize = req.IndexSize |
| 83 ls.ArchiveDataURL = req.DataUrl | 142 ls.ArchiveDataURL = req.DataUrl |
| 84 ls.ArchiveDataSize = req.DataSize | 143 ls.ArchiveDataSize = req.DataSize |
| 85 | 144 |
| 86 // Update the log stream. | 145 // Update the log stream. |
| 87 » » if err := ls.Put(di); err != nil { | 146 » » if err := di.Put(ls); err != nil { |
| 88 log.WithError(err).Errorf(c, "Failed to update log strea m.") | 147 log.WithError(err).Errorf(c, "Failed to update log strea m.") |
| 89 return err | 148 return err |
| 90 } | 149 } |
| 91 | 150 |
| 92 log.Infof(c, "Successfully marked stream as archived.") | 151 log.Infof(c, "Successfully marked stream as archived.") |
| 93 return nil | 152 return nil |
| 94 }, nil) | 153 }, nil) |
| 154 if ierr != nil { | |
| 155 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") | |
| 156 return nil, ierr | |
| 157 } | |
| 95 if err != nil { | 158 if err != nil { |
| 96 » » log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") | 159 » » log.WithError(err).Errorf(c, "Internal error.") |
| 97 return nil, grpcutil.Internal | 160 return nil, grpcutil.Internal |
| 98 } | 161 } |
| 99 | 162 |
| 100 return &google.Empty{}, nil | 163 return &google.Empty{}, nil |
| 101 } | 164 } |
| OLD | NEW |