Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(151)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/logdog/coordinator" 9 "github.com/luci/luci-go/appengine/logdog/coordinator"
10 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
11 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/grpcutil" 13 "github.com/luci/luci-go/common/grpcutil"
13 "github.com/luci/luci-go/common/logdog/types" 14 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/proto/google" 16 "github.com/luci/luci-go/common/proto/google"
16 "golang.org/x/net/context" 17 "golang.org/x/net/context"
17 "google.golang.org/grpc/codes" 18 "google.golang.org/grpc/codes"
18 ) 19 )
19 20
20 // ArchiveStream implements the logdog.ServicesServer interface. 21 // ArchiveStream implements the logdog.ServicesServer interface.
21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { 22 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) {
22 if err := Auth(c); err != nil { 23 if err := Auth(c); err != nil {
23 return nil, err 24 return nil, err
24 } 25 }
25 26
27 cfg, err := config.Load(c)
28 if err != nil {
29 log.WithError(err).Errorf(c, "Failed to load configuration.")
30 return nil, grpcutil.Internal
31 }
32 ccfg := cfg.GetCoordinator() // config.Load ensures this is not nil.
33
26 log.Fields{ 34 log.Fields{
27 » » "path": req.Path, 35 » » "path": req.Path,
28 » }.Infof(c, "Marking log stream as archived.") 36 » » "complete": req.Complete(),
37 » » "terminalIndex": req.TerminalIndex,
38 » » "logEntryCount": req.LogEntryCount,
39 » » "error": req.Error,
40 » }.Infof(c, "Received archival request.")
29 41
30 // Verify that the request is minimially valid. 42 // Verify that the request is minimially valid.
31 path := types.StreamPath(req.Path) 43 path := types.StreamPath(req.Path)
32 if err := path.Validate(); err != nil { 44 if err := path.Validate(); err != nil {
33 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) 45 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err)
34 } 46 }
35 47
36 switch { 48 switch {
37 case req.IndexUrl == "": 49 case req.IndexUrl == "":
38 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") 50 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL")
39 case req.StreamUrl == "": 51 case req.StreamUrl == "":
40 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") 52 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL")
41 } 53 }
42 54
43 ls := coordinator.LogStreamFromPath(path) 55 ls := coordinator.LogStreamFromPath(path)
44 56
45 » // (Non-transactional) Is the log stream already archived? 57 » log.Fields{
46 » switch err := ds.Get(c).Get(ls); err { 58 » » "id": ls.HashID(),
47 » case nil: 59 » }.Infof(c, "Log stream ID.")
48 » » if ls.Archived() {
49 » » » log.Infof(c, "Log stream already marked as archived (non -transactional).")
50 » » » return &google.Empty{}, nil
51 » » }
52
53 » case ds.ErrNoSuchEntity:
54 » » break
55
56 » default:
57 » » log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.")
58 » » return nil, grpcutil.Internal
59 » }
60 60
61 // Post the archival results to the Coordinator. 61 // Post the archival results to the Coordinator.
62 now := clock.Now(c).UTC() 62 now := clock.Now(c).UTC()
63 » err := ds.Get(c).RunInTransaction(func(c context.Context) error { 63 » var ierr error
Vadim Sh. 2016/04/07 01:21:32 nit: reset ierr to nil at the beginning of a trans
dnj 2016/04/11 17:20:03 Done.
64 » err = ds.Get(c).RunInTransaction(func(c context.Context) error {
65 » » // Note that within this transaction, we have two return values:
66 » » // - Non-nil to abort the transaction.
67 » » // - Specific error via "ierr".
64 di := ds.Get(c) 68 di := ds.Get(c)
65 if err := di.Get(ls); err != nil { 69 if err := di.Get(ls); err != nil {
66 return err 70 return err
67 } 71 }
68 » » if ls.Archived() { 72
69 » » » log.Infof(c, "Log stream already marked as archived.") 73 » » // If our log stream is not in LSArchiveTasked, we will reject t his archive
74 » » // request with FailedPrecondition.
75 » » switch {
76 » » case ls.Archived():
77 » » » // Return nil if the log stream is already archived (ide mpotent).
78 » » » log.Warningf(c, "Log stream is already archived.")
70 return nil 79 return nil
80
81 case ls.State != coordinator.LSArchiveTasked:
82 log.Fields{
83 "state": ls.State,
84 }.Errorf(c, "Log stream is not in archival tasked state. ")
85 ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stre am has not tasked an archival.")
86 return ierr
87 }
88
89 // If this request contained an error, we will reject it with Ab orted status
90 // if we are below our error threshold.
91 if req.Error {
92 // Increment our error count.
93 ls.ArchiveErrors++
94 if err := di.Put(ls); err != nil {
95 log.WithError(err).Errorf(c, "Failed to update l og stream error count.")
96 return err
97 }
98
99 retries := int(ccfg.ArchiveRetries)
100 if ls.ArchiveErrors < retries {
101 log.Fields{
102 "path": req.Path,
103 "errorCount": ls.ArchiveErrors,
104 "archiveRetries": retries,
105 }.Warningf(c, "Rejecting failed archival: below error threshold.")
106
107 // Fail this RPC call to keep the archival task in the queue.
108 //
109 // We return this via "ierr" because we want the transaction to succeed.
110 ierr = grpcutil.Errf(codes.Aborted, "below error threshold (%d < %d)", ls.ArchiveErrors, retries)
111 return nil
112 }
113
114 // We have exceeded our error threshold, so we will cont inue to archive
115 // this log stream.
116 log.Fields{
117 "path": req.Path,
118 "errorCount": ls.ArchiveErrors,
119 "archiveRetries": retries,
120 }.Warningf(c, "Log stream has exceeded archive error thr eshold. Archiving empty stream.")
121
122 req.TerminalIndex = -1
123 req.LogEntryCount = 0
71 } 124 }
72 125
73 // Update archival information. Make sure this actually marks th e stream as 126 // Update archival information. Make sure this actually marks th e stream as
74 // archived. 127 // archived.
75 ls.Updated = now
76 ls.State = coordinator.LSArchived 128 ls.State = coordinator.LSArchived
77 » » ls.ArchiveWhole = req.Complete 129 » » ls.ArchivedTime = now
130
131 » » if ls.TerminalIndex < 0 {
132 » » » // Also set the terminated time.
133 » » » ls.TerminatedTime = now
134 » » }
78 ls.TerminalIndex = req.TerminalIndex 135 ls.TerminalIndex = req.TerminalIndex
136
137 ls.ArchiveLogEntryCount = req.LogEntryCount
79 ls.ArchiveStreamURL = req.StreamUrl 138 ls.ArchiveStreamURL = req.StreamUrl
80 ls.ArchiveStreamSize = req.StreamSize 139 ls.ArchiveStreamSize = req.StreamSize
81 ls.ArchiveIndexURL = req.IndexUrl 140 ls.ArchiveIndexURL = req.IndexUrl
82 ls.ArchiveIndexSize = req.IndexSize 141 ls.ArchiveIndexSize = req.IndexSize
83 ls.ArchiveDataURL = req.DataUrl 142 ls.ArchiveDataURL = req.DataUrl
84 ls.ArchiveDataSize = req.DataSize 143 ls.ArchiveDataSize = req.DataSize
85 144
86 // Update the log stream. 145 // Update the log stream.
87 » » if err := ls.Put(di); err != nil { 146 » » if err := di.Put(ls); err != nil {
88 log.WithError(err).Errorf(c, "Failed to update log strea m.") 147 log.WithError(err).Errorf(c, "Failed to update log strea m.")
89 return err 148 return err
90 } 149 }
91 150
92 log.Infof(c, "Successfully marked stream as archived.") 151 log.Infof(c, "Successfully marked stream as archived.")
93 return nil 152 return nil
94 }, nil) 153 }, nil)
154 if ierr != nil {
155 log.WithError(err).Errorf(c, "Failed to mark stream as archived. ")
156 return nil, ierr
157 }
95 if err != nil { 158 if err != nil {
96 » » log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") 159 » » log.WithError(err).Errorf(c, "Internal error.")
97 return nil, grpcutil.Internal 160 return nil, grpcutil.Internal
98 } 161 }
99 162
100 return &google.Empty{}, nil 163 return &google.Empty{}, nil
101 } 164 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698