OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package services | 5 package services |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/subtle" | 8 "crypto/subtle" |
9 | 9 |
10 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
11 "github.com/luci/gae/service/info" | 11 "github.com/luci/gae/service/info" |
12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 14 "github.com/luci/luci-go/appengine/tumble" |
13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
14 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
15 "github.com/luci/luci-go/common/grpcutil" | 17 "github.com/luci/luci-go/common/grpcutil" |
16 "github.com/luci/luci-go/common/logdog/types" | 18 "github.com/luci/luci-go/common/logdog/types" |
17 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
18 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
19 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
20 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
21 ) | 23 ) |
22 | 24 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 | 57 |
56 // Initialize our archival parameters. | 58 // Initialize our archival parameters. |
57 params := coordinator.ArchivalParams{ | 59 params := coordinator.ArchivalParams{ |
58 RequestID: info.Get(c).RequestID(), | 60 RequestID: info.Get(c).RequestID(), |
59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), | 61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
60 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), | 62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), |
61 } | 63 } |
62 | 64 |
63 // Transactionally validate and update the terminal index. | 65 // Transactionally validate and update the terminal index. |
64 err = ds.Get(c).RunInTransaction(func(c context.Context) error { | 66 err = ds.Get(c).RunInTransaction(func(c context.Context) error { |
65 » » if err := ds.Get(c).Get(ls); err != nil { | 67 » » di := ds.Get(c) |
| 68 |
| 69 » » if err := di.Get(ls); err != nil { |
66 if err == ds.ErrNoSuchEntity { | 70 if err == ds.ErrNoSuchEntity { |
67 log.Debugf(c, "LogEntry not found.") | 71 log.Debugf(c, "LogEntry not found.") |
68 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", req.Path) | 72 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", req.Path) |
69 } | 73 } |
70 | 74 |
71 log.WithError(err).Errorf(c, "Failed to load LogEntry.") | 75 log.WithError(err).Errorf(c, "Failed to load LogEntry.") |
72 return grpcutil.Internal | 76 return grpcutil.Internal |
73 } | 77 } |
74 | 78 |
75 switch { | 79 switch { |
(...skipping 26 matching lines...) Expand all Loading... |
102 if err := params.PublishTask(c, ap, ls); err != nil { | 106 if err := params.PublishTask(c, ap, ls); err != nil { |
103 if err == coordinator.ErrArchiveTasked { | 107 if err == coordinator.ErrArchiveTasked { |
104 log.Warningf(c, "Archival has already be
en tasked for this stream.") | 108 log.Warningf(c, "Archival has already be
en tasked for this stream.") |
105 return nil | 109 return nil |
106 } | 110 } |
107 | 111 |
108 log.WithError(err).Errorf(c, "Failed to create a
rchive task.") | 112 log.WithError(err).Errorf(c, "Failed to create a
rchive task.") |
109 return grpcutil.Internal | 113 return grpcutil.Internal |
110 } | 114 } |
111 | 115 |
112 » » » if err := ds.Get(c).Put(ls); err != nil { | 116 » » » if err := di.Put(ls); err != nil { |
113 log.Fields{ | 117 log.Fields{ |
114 log.ErrorKey: err, | 118 log.ErrorKey: err, |
115 }.Errorf(c, "Failed to Put() LogStream.") | 119 }.Errorf(c, "Failed to Put() LogStream.") |
116 return grpcutil.Internal | 120 return grpcutil.Internal |
117 } | 121 } |
118 | 122 |
| 123 // Delete the archive expiration mutation, since we have
just dispatched |
| 124 // an archive request. |
| 125 aeParent, aeName := (&mutations.CreateArchiveTask{Path:
path}).TaskName(di) |
| 126 if err := tumble.CancelNamedMutations(c, aeParent, aeNam
e); err != nil { |
| 127 log.WithError(err).Errorf(c, "Failed to cancel a
rchive expiration mutation.") |
| 128 return grpcutil.Internal |
| 129 } |
| 130 |
119 log.Fields{ | 131 log.Fields{ |
120 "terminalIndex": ls.TerminalIndex, | 132 "terminalIndex": ls.TerminalIndex, |
121 }.Infof(c, "Terminal index was set and archival was disp
atched.") | 133 }.Infof(c, "Terminal index was set and archival was disp
atched.") |
122 return nil | 134 return nil |
123 } | 135 } |
124 }, nil) | 136 }, nil) |
125 if err != nil { | 137 if err != nil { |
126 log.Fields{ | 138 log.Fields{ |
127 log.ErrorKey: err, | 139 log.ErrorKey: err, |
128 }.Errorf(c, "Failed to update LogStream.") | 140 }.Errorf(c, "Failed to update LogStream.") |
129 return nil, err | 141 return nil, err |
130 } | 142 } |
131 | 143 |
132 return &google.Empty{}, nil | 144 return &google.Empty{}, nil |
133 } | 145 } |
OLD | NEW |