Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 ds "github.com/luci/gae/service/datastore" 8 ds "github.com/luci/gae/service/datastore"
9 "github.com/luci/luci-go/appengine/logdog/coordinator" 9 "github.com/luci/luci-go/appengine/logdog/coordinator"
10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 10 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
11 "github.com/luci/luci-go/common/clock" 11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/grpcutil" 12 "github.com/luci/luci-go/common/grpcutil"
13 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
14 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
15 "github.com/luci/luci-go/common/proto/google" 15 "github.com/luci/luci-go/common/proto/google"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 "google.golang.org/grpc/codes" 17 "google.golang.org/grpc/codes"
18 ) 18 )
19 19
20 // ArchiveStream implements the logdog.ServicesServer interface. 20 // ArchiveStream implements the logdog.ServicesServer interface.
21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) { 21 func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque st) (*google.Empty, error) {
22 » if err := Auth(c); err != nil { 22 » svc := b.GetServices()
23 » if err := Auth(c, svc); err != nil {
23 return nil, err 24 return nil, err
24 } 25 }
25 26
26 log.Fields{ 27 log.Fields{
27 » » "path": req.Path, 28 » » "path": req.Path,
28 » }.Infof(c, "Marking log stream as archived.") 29 » » "complete": req.Complete(),
30 » » "terminalIndex": req.TerminalIndex,
31 » » "logEntryCount": req.LogEntryCount,
32 » » "error": req.Error,
33 » }.Infof(c, "Received archival request.")
29 34
30 // Verify that the request is minimially valid. 35 // Verify that the request is minimially valid.
31 path := types.StreamPath(req.Path) 36 path := types.StreamPath(req.Path)
32 if err := path.Validate(); err != nil { 37 if err := path.Validate(); err != nil {
33 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err) 38 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid log st ream path: %v", err)
34 } 39 }
35 40
36 switch { 41 switch {
37 case req.IndexUrl == "": 42 case req.IndexUrl == "":
38 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL") 43 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed index archive URL")
39 case req.StreamUrl == "": 44 case req.StreamUrl == "":
40 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL") 45 return nil, grpcutil.Errf(codes.InvalidArgument, "missing requir ed stream archive URL")
41 } 46 }
42 47
43 ls := coordinator.LogStreamFromPath(path) 48 ls := coordinator.LogStreamFromPath(path)
44 49
45 » // (Non-transactional) Is the log stream already archived? 50 » log.Fields{
46 » switch err := ds.Get(c).Get(ls); err { 51 » » "id": ls.HashID,
47 » case nil: 52 » }.Infof(c, "Log stream ID.")
48 » » if ls.Archived() {
49 » » » log.Infof(c, "Log stream already marked as archived (non -transactional).")
50 » » » return &google.Empty{}, nil
51 » » }
52
53 » case ds.ErrNoSuchEntity:
54 » » break
55
56 » default:
57 » » log.WithError(err).Errorf(c, "Failed to check for log stream arc hvial state.")
58 » » return nil, grpcutil.Internal
59 » }
60 53
61 // Post the archival results to the Coordinator. 54 // Post the archival results to the Coordinator.
62 now := clock.Now(c).UTC() 55 now := clock.Now(c).UTC()
56 var ierr error
63 err := ds.Get(c).RunInTransaction(func(c context.Context) error { 57 err := ds.Get(c).RunInTransaction(func(c context.Context) error {
58 ierr = nil
59
60 // Note that within this transaction, we have two return values:
61 // - Non-nil to abort the transaction.
62 // - Specific error via "ierr".
64 di := ds.Get(c) 63 di := ds.Get(c)
65 if err := di.Get(ls); err != nil { 64 if err := di.Get(ls); err != nil {
66 return err 65 return err
67 } 66 }
68 » » if ls.Archived() { 67
69 » » » log.Infof(c, "Log stream already marked as archived.") 68 » » // If our log stream is not in LSArchiveTasked, we will reject t his archive
69 » » // request with FailedPrecondition.
70 » » switch {
71 » » case ls.Archived():
72 » » » // Return nil if the log stream is already archived (ide mpotent).
73 » » » log.Warningf(c, "Log stream is already archived.")
70 return nil 74 return nil
75
76 case ls.State != coordinator.LSArchiveTasked:
77 log.Fields{
78 "state": ls.State,
79 }.Errorf(c, "Log stream is not in archival tasked state. ")
80 ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stre am has not tasked an archival.")
81 return ierr
82 }
83
84 // If this request contained an error, we will record an empty a rchival and
85 // log a warning.
86 if req.Error != "" {
87 log.Fields{
88 "archiveError": req.Error,
89 }.Warningf(c, "Log stream archival indicated error. Arch iving empty stream.")
90
91 req.TerminalIndex = -1
92 req.LogEntryCount = 0
71 } 93 }
72 94
73 // Update archival information. Make sure this actually marks th e stream as 95 // Update archival information. Make sure this actually marks th e stream as
74 // archived. 96 // archived.
75 ls.Updated = now
76 ls.State = coordinator.LSArchived 97 ls.State = coordinator.LSArchived
77 » » ls.ArchiveWhole = req.Complete 98 » » ls.ArchivedTime = now
99 » » ls.ArchivalKey = nil // No point in wasting datastore space on t his.
100
101 » » if ls.TerminalIndex < 0 {
102 » » » // Also set the terminated time.
103 » » » ls.TerminatedTime = now
104 » » }
78 ls.TerminalIndex = req.TerminalIndex 105 ls.TerminalIndex = req.TerminalIndex
106
107 ls.ArchiveLogEntryCount = req.LogEntryCount
79 ls.ArchiveStreamURL = req.StreamUrl 108 ls.ArchiveStreamURL = req.StreamUrl
80 ls.ArchiveStreamSize = req.StreamSize 109 ls.ArchiveStreamSize = req.StreamSize
81 ls.ArchiveIndexURL = req.IndexUrl 110 ls.ArchiveIndexURL = req.IndexUrl
82 ls.ArchiveIndexSize = req.IndexSize 111 ls.ArchiveIndexSize = req.IndexSize
83 ls.ArchiveDataURL = req.DataUrl 112 ls.ArchiveDataURL = req.DataUrl
84 ls.ArchiveDataSize = req.DataSize 113 ls.ArchiveDataSize = req.DataSize
85 114
86 // Update the log stream. 115 // Update the log stream.
87 » » if err := ls.Put(di); err != nil { 116 » » if err := di.Put(ls); err != nil {
88 log.WithError(err).Errorf(c, "Failed to update log strea m.") 117 log.WithError(err).Errorf(c, "Failed to update log strea m.")
89 return err 118 return err
90 } 119 }
91 120
92 log.Infof(c, "Successfully marked stream as archived.") 121 log.Infof(c, "Successfully marked stream as archived.")
93 return nil 122 return nil
94 }, nil) 123 }, nil)
124 if ierr != nil {
125 log.WithError(ierr).Errorf(c, "Failed to mark stream as archived .")
126 return nil, ierr
127 }
95 if err != nil { 128 if err != nil {
96 » » log.WithError(err).Errorf(c, "Failed to mark stream as archived. ") 129 » » log.WithError(err).Errorf(c, "Internal error.")
97 return nil, grpcutil.Internal 130 return nil, grpcutil.Internal
98 } 131 }
99 132
100 return &google.Empty{}, nil 133 return &google.Empty{}, nil
101 } 134 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698