| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 | 9 |
| 10 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
| 11 "github.com/luci/gae/service/info" | 11 "github.com/luci/gae/service/info" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 14 "github.com/luci/luci-go/appengine/tumble" | 15 "github.com/luci/luci-go/appengine/tumble" |
| 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 16 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
| 18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 20 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 21 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
| 22 ) | 23 ) |
| 23 | 24 |
| 24 // TerminateStream is an idempotent stream state terminate operation. | 25 // TerminateStream is an idempotent stream state terminate operation. |
| 25 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { | 26 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { |
| 26 log.Fields{ | 27 log.Fields{ |
| 27 "project": req.Project, | 28 "project": req.Project, |
| 28 "id": req.Id, | 29 "id": req.Id, |
| 29 "terminalIndex": req.TerminalIndex, | 30 "terminalIndex": req.TerminalIndex, |
| 30 }.Infof(c, "Request to terminate log stream.") | 31 }.Infof(c, "Request to terminate log stream.") |
| 31 | 32 |
| 32 if req.TerminalIndex < 0 { | 33 if req.TerminalIndex < 0 { |
| 33 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") | 34 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") |
| 34 } | 35 } |
| 35 | 36 |
| 36 id := coordinator.HashID(req.Id) | 37 id := coordinator.HashID(req.Id) |
| 37 if err := id.Normalize(); err != nil { | 38 if err := id.Normalize(); err != nil { |
| 38 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s
): %s", id, err) | 39 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s
): %s", id, err) |
| 39 } | 40 } |
| 40 | 41 |
| 42 // Load our service and project configs. |
| 41 svc := coordinator.GetServices(c) | 43 svc := coordinator.GetServices(c) |
| 42 cfg, err := svc.Config(c) | 44 cfg, err := svc.Config(c) |
| 43 if err != nil { | 45 if err != nil { |
| 44 log.WithError(err).Errorf(c, "Failed to load configuration.") | 46 log.WithError(err).Errorf(c, "Failed to load configuration.") |
| 45 return nil, grpcutil.Internal | 47 return nil, grpcutil.Internal |
| 46 } | 48 } |
| 47 | 49 |
| 50 pcfg, err := coordinator.CurrentProjectConfig(c) |
| 51 if err != nil { |
| 52 log.WithError(err).Errorf(c, "Failed to load current project con
figuration.") |
| 53 return nil, grpcutil.Internal |
| 54 } |
| 55 |
| 56 // Initialize our archival parameters. |
| 57 params := coordinator.ArchivalParams{ |
| 58 RequestID: info.Get(c).RequestID(), |
| 59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
| 60 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel
ayMax, pcfg.MaxStreamAge), |
| 61 } |
| 62 |
| 48 ap, err := svc.ArchivalPublisher(c) | 63 ap, err := svc.ArchivalPublisher(c) |
| 49 if err != nil { | 64 if err != nil { |
| 50 log.WithError(err).Errorf(c, "Failed to get archival publisher i
nstance.") | 65 log.WithError(err).Errorf(c, "Failed to get archival publisher i
nstance.") |
| 51 return nil, grpcutil.Internal | 66 return nil, grpcutil.Internal |
| 52 } | 67 } |
| 53 | 68 |
| 54 // Initialize our log stream state. | 69 // Initialize our log stream state. |
| 55 di := ds.Get(c) | 70 di := ds.Get(c) |
| 56 lst := coordinator.NewLogStreamState(di, id) | 71 lst := coordinator.NewLogStreamState(di, id) |
| 57 | 72 |
| 58 // Initialize our archival parameters. | |
| 59 params := coordinator.ArchivalParams{ | |
| 60 RequestID: info.Get(c).RequestID(), | |
| 61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), | |
| 62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), | |
| 63 } | |
| 64 | |
| 65 // Transactionally validate and update the terminal index. | 73 // Transactionally validate and update the terminal index. |
| 66 err = di.RunInTransaction(func(c context.Context) error { | 74 err = di.RunInTransaction(func(c context.Context) error { |
| 67 di := ds.Get(c) | 75 di := ds.Get(c) |
| 68 | 76 |
| 69 if err := di.Get(lst); err != nil { | 77 if err := di.Get(lst); err != nil { |
| 70 if err == ds.ErrNoSuchEntity { | 78 if err == ds.ErrNoSuchEntity { |
| 71 log.Debugf(c, "Log stream state not found.") | 79 log.Debugf(c, "Log stream state not found.") |
| 72 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", id) | 80 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", id) |
| 73 } | 81 } |
| 74 | 82 |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 136 }, nil) | 144 }, nil) |
| 137 if err != nil { | 145 if err != nil { |
| 138 log.Fields{ | 146 log.Fields{ |
| 139 log.ErrorKey: err, | 147 log.ErrorKey: err, |
| 140 }.Errorf(c, "Failed to update LogStream.") | 148 }.Errorf(c, "Failed to update LogStream.") |
| 141 return nil, err | 149 return nil, err |
| 142 } | 150 } |
| 143 | 151 |
| 144 return &google.Empty{}, nil | 152 return &google.Empty{}, nil |
| 145 } | 153 } |
| OLD | NEW |