Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(831)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1971493003: LogDog: Project READ access for user endpoints. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-service-config
Patch Set: Updated patchset dependency Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "crypto/subtle" 8 "crypto/subtle"
9 9
10 ds "github.com/luci/gae/service/datastore" 10 ds "github.com/luci/gae/service/datastore"
11 "github.com/luci/gae/service/info" 11 "github.com/luci/gae/service/info"
12 "github.com/luci/luci-go/appengine/logdog/coordinator" 12 "github.com/luci/luci-go/appengine/logdog/coordinator"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints"
13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations"
14 "github.com/luci/luci-go/appengine/tumble" 15 "github.com/luci/luci-go/appengine/tumble"
15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
16 "github.com/luci/luci-go/common/clock" 17 "github.com/luci/luci-go/common/clock"
17 "github.com/luci/luci-go/common/grpcutil" 18 "github.com/luci/luci-go/common/grpcutil"
18 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
21 "google.golang.org/grpc/codes" 22 "google.golang.org/grpc/codes"
22 ) 23 )
23 24
24 // TerminateStream is an idempotent stream state terminate operation. 25 // TerminateStream is an idempotent stream state terminate operation.
25 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) { 26 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*google.Empty, error) {
26 log.Fields{ 27 log.Fields{
27 "project": req.Project, 28 "project": req.Project,
28 "id": req.Id, 29 "id": req.Id,
29 "terminalIndex": req.TerminalIndex, 30 "terminalIndex": req.TerminalIndex,
30 }.Infof(c, "Request to terminate log stream.") 31 }.Infof(c, "Request to terminate log stream.")
31 32
32 if req.TerminalIndex < 0 { 33 if req.TerminalIndex < 0 {
33 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.") 34 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi nal index.")
34 } 35 }
35 36
36 id := coordinator.HashID(req.Id) 37 id := coordinator.HashID(req.Id)
37 if err := id.Normalize(); err != nil { 38 if err := id.Normalize(); err != nil {
38 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s ): %s", id, err) 39 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s ): %s", id, err)
39 } 40 }
40 41
42 // Load our service and project configs.
41 svc := coordinator.GetServices(c) 43 svc := coordinator.GetServices(c)
42 cfg, err := svc.Config(c) 44 cfg, err := svc.Config(c)
43 if err != nil { 45 if err != nil {
44 log.WithError(err).Errorf(c, "Failed to load configuration.") 46 log.WithError(err).Errorf(c, "Failed to load configuration.")
45 return nil, grpcutil.Internal 47 return nil, grpcutil.Internal
46 } 48 }
47 49
50 pcfg, err := coordinator.CurrentProjectConfig(c)
51 if err != nil {
52 log.WithError(err).Errorf(c, "Failed to load current project con figuration.")
53 return nil, grpcutil.Internal
54 }
55
56 // Initialize our archival parameters.
57 params := coordinator.ArchivalParams{
58 RequestID: info.Get(c).RequestID(),
59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
60 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel ayMax, pcfg.MaxStreamAge),
61 }
62
48 ap, err := svc.ArchivalPublisher(c) 63 ap, err := svc.ArchivalPublisher(c)
49 if err != nil { 64 if err != nil {
50 log.WithError(err).Errorf(c, "Failed to get archival publisher i nstance.") 65 log.WithError(err).Errorf(c, "Failed to get archival publisher i nstance.")
51 return nil, grpcutil.Internal 66 return nil, grpcutil.Internal
52 } 67 }
53 68
54 // Initialize our log stream state. 69 // Initialize our log stream state.
55 di := ds.Get(c) 70 di := ds.Get(c)
56 lst := coordinator.NewLogStreamState(di, id) 71 lst := coordinator.NewLogStreamState(di, id)
57 72
58 // Initialize our archival parameters.
59 params := coordinator.ArchivalParams{
60 RequestID: info.Get(c).RequestID(),
61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
63 }
64
65 // Transactionally validate and update the terminal index. 73 // Transactionally validate and update the terminal index.
66 err = di.RunInTransaction(func(c context.Context) error { 74 err = di.RunInTransaction(func(c context.Context) error {
67 di := ds.Get(c) 75 di := ds.Get(c)
68 76
69 if err := di.Get(lst); err != nil { 77 if err := di.Get(lst); err != nil {
70 if err == ds.ErrNoSuchEntity { 78 if err == ds.ErrNoSuchEntity {
71 log.Debugf(c, "Log stream state not found.") 79 log.Debugf(c, "Log stream state not found.")
72 return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", id) 80 return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", id)
73 } 81 }
74 82
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
136 }, nil) 144 }, nil)
137 if err != nil { 145 if err != nil {
138 log.Fields{ 146 log.Fields{
139 log.ErrorKey: err, 147 log.ErrorKey: err,
140 }.Errorf(c, "Failed to update LogStream.") 148 }.Errorf(c, "Failed to update LogStream.")
141 return nil, err 149 return nil, err
142 } 150 }
143 151
144 return &google.Empty{}, nil 152 return &google.Empty{}, nil
145 } 153 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698