Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(232)

Side by Side Diff: logdog/appengine/coordinator/endpoints/services/terminateStream.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. 1 // Copyright 2015 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 package services 15 package services
16 16
17 import ( 17 import (
18 "crypto/subtle" 18 "crypto/subtle"
19 19
20 "github.com/golang/protobuf/ptypes/empty" 20 "github.com/golang/protobuf/ptypes/empty"
21
21 ds "github.com/luci/gae/service/datastore" 22 ds "github.com/luci/gae/service/datastore"
23
22 "github.com/luci/luci-go/common/clock" 24 "github.com/luci/luci-go/common/clock"
23 log "github.com/luci/luci-go/common/logging" 25 log "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/common/proto/google" 26 "github.com/luci/luci-go/common/proto/google"
25 "github.com/luci/luci-go/grpc/grpcutil" 27 "github.com/luci/luci-go/grpc/grpcutil"
26 "github.com/luci/luci-go/logdog/api/config/svcconfig" 28 "github.com/luci/luci-go/logdog/api/config/svcconfig"
27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" 29 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1"
28 "github.com/luci/luci-go/logdog/appengine/coordinator" 30 "github.com/luci/luci-go/logdog/appengine/coordinator"
29 "github.com/luci/luci-go/logdog/appengine/coordinator/config" 31 "github.com/luci/luci-go/logdog/appengine/coordinator/config"
30 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" 32 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints"
31 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" 33 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations"
34 "github.com/luci/luci-go/logdog/appengine/coordinator/tasks"
32 "github.com/luci/luci-go/tumble" 35 "github.com/luci/luci-go/tumble"
36
33 "golang.org/x/net/context" 37 "golang.org/x/net/context"
34 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/codes"
35 ) 39 )
36 40
37 // TerminateStream is an idempotent stream state terminate operation. 41 // TerminateStream is an idempotent stream state terminate operation.
38 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*empty.Empty, error) { 42 func (s *server) TerminateStream(c context.Context, req *logdog.TerminateStreamR equest) (*empty.Empty, error) {
39 log.Fields{ 43 log.Fields{
40 "project": req.Project, 44 "project": req.Project,
41 "id": req.Id, 45 "id": req.Id,
42 "terminalIndex": req.TerminalIndex, 46 "terminalIndex": req.TerminalIndex,
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 lst.TerminalIndex = req.TerminalIndex 113 lst.TerminalIndex = req.TerminalIndex
110 lst.TerminatedTime = now 114 lst.TerminatedTime = now
111 115
112 if err := ds.Put(c, lst); err != nil { 116 if err := ds.Put(c, lst); err != nil {
113 log.Fields{ 117 log.Fields{
114 log.ErrorKey: err, 118 log.ErrorKey: err,
115 }.Errorf(c, "Failed to Put() LogStream.") 119 }.Errorf(c, "Failed to Put() LogStream.")
116 return grpcutil.Internal 120 return grpcutil.Internal
117 } 121 }
118 122
119 » » » // Replace the pessimistic archive expiration mutation s cheduled in 123 » » » // Replace the pessimistic archive expiration task sched uled in
120 » » » // RegisterStream with an optimistic archival mutation. 124 » » » // RegisterStream with an optimistic archival task.
121 » » » cat := mutations.CreateArchiveTask{ 125 » » » if err := tasks.CreateArchivalTask(c, id, logdog.Archive DispatchTask_TERMINATED,
122 » » » » ID: id, 126 » » » » params.SettleDelay, params); err != nil {
123 127
124 » » » » // Optimistic parameters. 128 » » » » log.WithError(err).Errorf(c, "Failed to create t erminated archival task.")
125 » » » » SettleDelay: params.SettleDelay, 129 » » » » return grpcutil.Internal
126 » » » » CompletePeriod: params.CompletePeriod,
127
128 » » » » // Schedule this mutation to execute after our s ettle delay.
129 » » » » Expiration: now.Add(params.SettleDelay),
130 } 130 }
131 131
132 » » » aeParent, aeName := ds.KeyForObj(c, lst), cat.TaskName(c ) 132 » » » if err := tasks.DeleteArchiveStreamExpiredTask(c, id); e rr != nil {
133 » » » if err := tumble.PutNamedMutations(c, aeParent, map[stri ng]tumble.Mutation{aeName: &cat}); err != nil { 133 » » » » // If we can't delete this task, it will just ru n, notice that the
134 » » » » log.WithError(err).Errorf(c, "Failed to replace archive expiration mutation.") 134 » » » » // stream is archived, and quit. No big deal.
135 » » » » return grpcutil.Internal 135 » » » » log.WithError(err).Warningf(c, "(Non-fatal) Fail ed to delete expired archival task.")
136 » » » }
137
138 » » » // In case the stream was *registered* with Tumble, but is now being
139 » » » // processed with task queue code, clear the Tumble arch ival mutation.
140 » » » //
141 » » » // TODO(dnj): Remove this once Tumble is drained.
142 » » » archiveMutation := mutations.CreateArchiveTask{ID: id}
143 » » » if err := tumble.CancelNamedMutations(c, archiveMutation .Root(c), archiveMutation.TaskName(c)); err != nil {
144 » » » » log.WithError(err).Warningf(c, "(Non-fatal) Fail ed to cancel archive mutation.")
136 } 145 }
137 146
138 log.Fields{ 147 log.Fields{
139 "terminalIndex": lst.TerminalIndex, 148 "terminalIndex": lst.TerminalIndex,
140 » » » » "settleDelay": cat.SettleDelay, 149 » » » » "settleDelay": params.SettleDelay,
141 » » » » "completePeriod": cat.CompletePeriod, 150 » » » » "completePeriod": params.CompletePeriod,
142 » » » » "scheduledAt": cat.Expiration, 151 » » » }.Debugf(c, "Terminal index was set, and archival task w as scheduled.")
143 » » » }.Debugf(c, "Terminal index was set, and archival mutati on was scheduled.")
144 return nil 152 return nil
145 } 153 }
146 }, nil) 154 }, nil)
147 if err != nil { 155 if err != nil {
148 log.Fields{ 156 log.Fields{
149 log.ErrorKey: err, 157 log.ErrorKey: err,
150 }.Errorf(c, "Failed to update LogStream.") 158 }.Errorf(c, "Failed to update LogStream.")
151 return nil, err 159 return nil, err
152 } 160 }
153 161
154 return &empty.Empty{}, nil 162 return &empty.Empty{}, nil
155 } 163 }
156 164
157 func standardArchivalParams(cfg *config.Config, pcfg *svcconfig.ProjectConfig) * coordinator.ArchivalParams { 165 func standardArchivalParams(cfg *config.Config, pcfg *svcconfig.ProjectConfig) * coordinator.ArchivalParams {
158 return &coordinator.ArchivalParams{ 166 return &coordinator.ArchivalParams{
159 SettleDelay: google.DurationFromProto(cfg.Coordinator.Archive SettleDelay), 167 SettleDelay: google.DurationFromProto(cfg.Coordinator.Archive SettleDelay),
160 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel ayMax, pcfg.MaxStreamAge), 168 CompletePeriod: endpoints.MinDuration(cfg.Coordinator.ArchiveDel ayMax, pcfg.MaxStreamAge),
161 } 169 }
162 } 170 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698