Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 ds "github.com/luci/gae/service/datastore" | 8 ds "github.com/luci/gae/service/datastore" |
| 9 "github.com/luci/luci-go/appengine/logdog/coordinator" | 9 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/grpcutil" | 12 "github.com/luci/luci-go/common/grpcutil" |
| 13 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 14 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 15 "github.com/luci/luci-go/common/proto/google" | 15 "github.com/luci/luci-go/common/proto/google" |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 "google.golang.org/grpc/codes" | 17 "google.golang.org/grpc/codes" |
| 18 ) | 18 ) |
| 19 | 19 |
| 20 // maxArchiveErrors is the maximum number of archival errors that we will | |
| 21 // tolerate before accepting a failed archival. | |
| 22 const maxArchiveErrors = 2 | |
| 23 | |
| 20 // ArchiveStream implements the logdog.ServicesServer interface. | 24 // ArchiveStream implements the logdog.ServicesServer interface. |
| 21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { | 25 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { |
| 22 if err := Auth(c); err != nil { | 26 if err := Auth(c); err != nil { |
| 23 return nil, err | 27 return nil, err |
| 24 } | 28 } |
| 25 | 29 |
| 26 log.Fields{ | 30 log.Fields{ |
| 27 "path": req.Path, | 31 "path": req.Path, |
| 28 }.Infof(c, "Marking log stream as archived.") | 32 }.Infof(c, "Marking log stream as archived.") |
| 29 | 33 |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 53 case ds.ErrNoSuchEntity: | 57 case ds.ErrNoSuchEntity: |
| 54 break | 58 break |
| 55 | 59 |
| 56 default: | 60 default: |
| 57 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") | 61 log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.") |
| 58 return nil, grpcutil.Internal | 62 return nil, grpcutil.Internal |
| 59 } | 63 } |
| 60 | 64 |
| 61 // Post the archival results to the Coordinator. | 65 // Post the archival results to the Coordinator. |
| 62 now := clock.Now(c).UTC() | 66 now := clock.Now(c).UTC() |
| 67 var ierr error | |
| 63 err := ds.Get(c).RunInTransaction(func(c context.Context) error { | 68 err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| 64 di := ds.Get(c) | 69 di := ds.Get(c) |
| 65 if err := di.Get(ls); err != nil { | 70 if err := di.Get(ls); err != nil { |
| 66 return err | 71 return err |
| 67 } | 72 } |
| 68 if ls.Archived() { | 73 if ls.Archived() { |
| 69 log.Infof(c, "Log stream already marked as archived.") | 74 log.Infof(c, "Log stream already marked as archived.") |
| 70 return nil | 75 return nil |
| 71 } | 76 } |
| 72 | 77 |
| 78 // If this request contained an error, we will reject it with | |
| 79 // FailedPrecondition status if we are below our error threshold . | |
| 80 if req.Error { | |
| 81 if ls.ArchiveErrors < maxArchiveErrors { | |
| 82 log.Fields{ | |
| 83 "path": req.Path, | |
| 84 "errorCount": ls.ArchiveErrors, | |
| 85 "maxArchiveErrors": maxArchiveErrors, | |
| 86 }.Warningf(c, "Rejecting failed archival: below error threshold.") | |
| 87 | |
| 88 // Increment our error count. | |
| 89 ls.ArchiveErrors++ | |
| 90 if err := ls.Put(di); err != nil { | |
| 91 log.WithError(err).Errorf(c, "Failed to update log stream error count.") | |
| 92 return err | |
| 93 } | |
| 94 | |
| 95 // Fail this RPC call to keep the archival task in the queue. | |
| 96 ierr = grpcutil.Errf(codes.FailedPrecondition, " below error threshold (%d < %d)", ls.ArchiveErrors, maxArchiveErrors) | |
| 97 return nil | |
| 98 } | |
| 99 | |
| 100 // We have exceeded our error threshold, so we will cont inue to archive | |
| 101 // this log stream. | |
| 102 log.Fields{ | |
|
Vadim Sh.
2016/03/31 22:29:59
maybe mark the entity with some indexable boolean,
dnj
2016/04/01 22:57:04
Done. I introduced "ArchiveState", which can be ei
| |
| 103 "path": req.Path, | |
| 104 "errorCount": ls.ArchiveErrors, | |
| 105 "maxArchiveErrors": maxArchiveErrors, | |
| 106 }.Warningf(c, "Log stream has exceeded archive error thr eshold.") | |
| 107 } | |
| 108 | |
| 73 // Update archival information. Make sure this actually marks th e stream as | 109 // Update archival information. Make sure this actually marks th e stream as |
| 74 // archived. | 110 // archived. |
| 75 ls.Updated = now | 111 ls.Updated = now |
| 76 ls.State = coordinator.LSArchived | 112 ls.State = coordinator.LSArchived |
| 77 ls.ArchiveWhole = req.Complete | 113 ls.ArchiveWhole = req.Complete |
| 78 ls.TerminalIndex = req.TerminalIndex | 114 ls.TerminalIndex = req.TerminalIndex |
| 79 ls.ArchiveStreamURL = req.StreamUrl | 115 ls.ArchiveStreamURL = req.StreamUrl |
| 80 ls.ArchiveStreamSize = req.StreamSize | 116 ls.ArchiveStreamSize = req.StreamSize |
| 81 ls.ArchiveIndexURL = req.IndexUrl | 117 ls.ArchiveIndexURL = req.IndexUrl |
| 82 ls.ArchiveIndexSize = req.IndexSize | 118 ls.ArchiveIndexSize = req.IndexSize |
| 83 ls.ArchiveDataURL = req.DataUrl | 119 ls.ArchiveDataURL = req.DataUrl |
| 84 ls.ArchiveDataSize = req.DataSize | 120 ls.ArchiveDataSize = req.DataSize |
| 85 | 121 |
| 86 // Update the log stream. | 122 // Update the log stream. |
| 87 if err := ls.Put(di); err != nil { | 123 if err := ls.Put(di); err != nil { |
|
Vadim Sh.
2016/03/31 22:29:59
this line got me confused :) usually it's di.Put(l
dnj
2016/04/01 22:57:03
Yeah LogStream.Put is the same as di.Put, but forc
| |
| 88 log.WithError(err).Errorf(c, "Failed to update log strea m.") | 124 log.WithError(err).Errorf(c, "Failed to update log strea m.") |
| 89 return err | 125 return err |
| 90 } | 126 } |
| 91 | 127 |
| 92 log.Infof(c, "Successfully marked stream as archived.") | 128 log.Infof(c, "Successfully marked stream as archived.") |
| 93 return nil | 129 return nil |
| 94 }, nil) | 130 }, nil) |
| 95 if err != nil { | 131 if err != nil { |
| 96 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") | 132 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") |
| 97 return nil, grpcutil.Internal | 133 return nil, grpcutil.Internal |
| 98 } | 134 } |
| 135 if ierr != nil { | |
| 136 log.WithError(ierr).Errorf(c, "Failed to mark stream as archived (inner).") | |
| 137 return nil, ierr | |
| 138 } | |
| 99 | 139 |
| 100 return &google.Empty{}, nil | 140 return &google.Empty{}, nil |
| 101 } | 141 } |
| OLD | NEW |