OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/subtle" | 8 "crypto/subtle" |
9 | 9 |
10 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
11 "github.com/luci/gae/service/info" | 11 "github.com/luci/gae/service/info" |
12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/endpoints" |
13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
14 "github.com/luci/luci-go/appengine/tumble" | 15 "github.com/luci/luci-go/appengine/tumble" |
15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
16 "github.com/luci/luci-go/common/clock" | 17 "github.com/luci/luci-go/common/clock" |
17 "github.com/luci/luci-go/common/grpcutil" | 18 "github.com/luci/luci-go/common/grpcutil" |
18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
19 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
20 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
21 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
22 ) | 23 ) |
23 | 24 |
24 // TerminateStream is an idempotent stream state terminate operation. | 25 // TerminateStream is an idempotent stream state terminate operation. |
25 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { | 26 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
equest) (*google.Empty, error) { |
26 log.Fields{ | 27 log.Fields{ |
27 "project": req.Project, | 28 "project": req.Project, |
28 "id": req.Id, | 29 "id": req.Id, |
29 "terminalIndex": req.TerminalIndex, | 30 "terminalIndex": req.TerminalIndex, |
30 }.Infof(c, "Request to terminate log stream.") | 31 }.Infof(c, "Request to terminate log stream.") |
31 | 32 |
32 if req.TerminalIndex < 0 { | 33 if req.TerminalIndex < 0 { |
33 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") | 34 return nil, grpcutil.Errf(codes.InvalidArgument, "Negative termi
nal index.") |
34 } | 35 } |
35 | 36 |
36 id := coordinator.HashID(req.Id) | 37 id := coordinator.HashID(req.Id) |
37 if err := id.Normalize(); err != nil { | 38 if err := id.Normalize(); err != nil { |
38 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s
): %s", id, err) | 39 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid ID (%s
): %s", id, err) |
39 } | 40 } |
40 | 41 |
| 42 // Load our service and project configs. |
41 svc := coordinator.GetServices(c) | 43 svc := coordinator.GetServices(c) |
42 cfg, err := svc.Config(c) | 44 cfg, err := svc.Config(c) |
43 if err != nil { | 45 if err != nil { |
44 log.WithError(err).Errorf(c, "Failed to load configuration.") | 46 log.WithError(err).Errorf(c, "Failed to load configuration.") |
45 return nil, grpcutil.Internal | 47 return nil, grpcutil.Internal |
46 } | 48 } |
47 | 49 |
| 50 pcfg, err := coordinator.CurrentProjectConfig(c) |
| 51 if err != nil { |
| 52 log.WithError(err).Errorf(c, "Failed to load current project con
figuration.") |
| 53 return nil, grpcutil.Internal |
| 54 } |
| 55 |
| 56 // Initialize our archival parameters. |
| 57 params := coordinator.ArchivalParams{ |
| 58 RequestID: info.Get(c).RequestID(), |
| 59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
| 60 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel
ayMax, pcfg.MaxStreamAge), |
| 61 } |
| 62 |
48 ap, err := svc.ArchivalPublisher(c) | 63 ap, err := svc.ArchivalPublisher(c) |
49 if err != nil { | 64 if err != nil { |
50 log.WithError(err).Errorf(c, "Failed to get archival publisher i
nstance.") | 65 log.WithError(err).Errorf(c, "Failed to get archival publisher i
nstance.") |
51 return nil, grpcutil.Internal | 66 return nil, grpcutil.Internal |
52 } | 67 } |
53 | 68 |
54 // Initialize our log stream state. | 69 // Initialize our log stream state. |
55 di := ds.Get(c) | 70 di := ds.Get(c) |
56 lst := coordinator.NewLogStreamState(di, id) | 71 lst := coordinator.NewLogStreamState(di, id) |
57 | 72 |
58 // Initialize our archival parameters. | |
59 params := coordinator.ArchivalParams{ | |
60 RequestID: info.Get(c).RequestID(), | |
61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), | |
62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), | |
63 } | |
64 | |
65 // Transactionally validate and update the terminal index. | 73 // Transactionally validate and update the terminal index. |
66 err = di.RunInTransaction(func(c context.Context) error { | 74 err = di.RunInTransaction(func(c context.Context) error { |
67 di := ds.Get(c) | 75 di := ds.Get(c) |
68 | 76 |
69 if err := di.Get(lst); err != nil { | 77 if err := di.Get(lst); err != nil { |
70 if err == ds.ErrNoSuchEntity { | 78 if err == ds.ErrNoSuchEntity { |
71 log.Debugf(c, "Log stream state not found.") | 79 log.Debugf(c, "Log stream state not found.") |
72 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", id) | 80 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", id) |
73 } | 81 } |
74 | 82 |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 }, nil) | 144 }, nil) |
137 if err != nil { | 145 if err != nil { |
138 log.Fields{ | 146 log.Fields{ |
139 log.ErrorKey: err, | 147 log.ErrorKey: err, |
140 }.Errorf(c, "Failed to update LogStream.") | 148 }.Errorf(c, "Failed to update LogStream.") |
141 return nil, err | 149 return nil, err |
142 } | 150 } |
143 | 151 |
144 return &google.Empty{}, nil | 152 return &google.Empty{}, nil |
145 } | 153 } |
OLD | NEW |