| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 | 9 |
| 10 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
| 11 "github.com/luci/gae/service/info" | 11 "github.com/luci/gae/service/info" |
| 12 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 13 "github.com/luci/luci-go/appengine/logdog/coordinator/mutations" |
| 14 "github.com/luci/luci-go/appengine/tumble" |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 14 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/grpcutil" | 17 "github.com/luci/luci-go/common/grpcutil" |
| 16 "github.com/luci/luci-go/common/logdog/types" | 18 "github.com/luci/luci-go/common/logdog/types" |
| 17 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 19 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 20 "google.golang.org/grpc/codes" | 22 "google.golang.org/grpc/codes" |
| 21 ) | 23 ) |
| 22 | 24 |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 55 | 57 |
| 56 // Initialize our archival parameters. | 58 // Initialize our archival parameters. |
| 57 params := coordinator.ArchivalParams{ | 59 params := coordinator.ArchivalParams{ |
| 58 RequestID: info.Get(c).RequestID(), | 60 RequestID: info.Get(c).RequestID(), |
| 59 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), | 61 SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
| 60 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), | 62 CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), |
| 61 } | 63 } |
| 62 | 64 |
| 63 // Transactionally validate and update the terminal index. | 65 // Transactionally validate and update the terminal index. |
| 64 err = ds.Get(c).RunInTransaction(func(c context.Context) error { | 66 err = ds.Get(c).RunInTransaction(func(c context.Context) error { |
| 65 » » if err := ds.Get(c).Get(ls); err != nil { | 67 » » di := ds.Get(c) |
| 68 |
| 69 » » if err := di.Get(ls); err != nil { |
| 66 if err == ds.ErrNoSuchEntity { | 70 if err == ds.ErrNoSuchEntity { |
| 67 log.Debugf(c, "LogEntry not found.") | 71 log.Debugf(c, "LogEntry not found.") |
| 68 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", req.Path) | 72 return grpcutil.Errf(codes.NotFound, "Log stream
%q is not registered", req.Path) |
| 69 } | 73 } |
| 70 | 74 |
| 71 log.WithError(err).Errorf(c, "Failed to load LogEntry.") | 75 log.WithError(err).Errorf(c, "Failed to load LogEntry.") |
| 72 return grpcutil.Internal | 76 return grpcutil.Internal |
| 73 } | 77 } |
| 74 | 78 |
| 75 switch { | 79 switch { |
| (...skipping 26 matching lines...) Expand all Loading... |
| 102 if err := params.PublishTask(c, ap, ls); err != nil { | 106 if err := params.PublishTask(c, ap, ls); err != nil { |
| 103 if err == coordinator.ErrArchiveTasked { | 107 if err == coordinator.ErrArchiveTasked { |
| 104 log.Warningf(c, "Archival has already be
en tasked for this stream.") | 108 log.Warningf(c, "Archival has already be
en tasked for this stream.") |
| 105 return nil | 109 return nil |
| 106 } | 110 } |
| 107 | 111 |
| 108 log.WithError(err).Errorf(c, "Failed to create a
rchive task.") | 112 log.WithError(err).Errorf(c, "Failed to create a
rchive task.") |
| 109 return grpcutil.Internal | 113 return grpcutil.Internal |
| 110 } | 114 } |
| 111 | 115 |
| 112 » » » if err := ds.Get(c).Put(ls); err != nil { | 116 » » » if err := di.Put(ls); err != nil { |
| 113 log.Fields{ | 117 log.Fields{ |
| 114 log.ErrorKey: err, | 118 log.ErrorKey: err, |
| 115 }.Errorf(c, "Failed to Put() LogStream.") | 119 }.Errorf(c, "Failed to Put() LogStream.") |
| 116 return grpcutil.Internal | 120 return grpcutil.Internal |
| 117 } | 121 } |
| 118 | 122 |
| 123 // Delete the archive expiration mutation, since we have
just dispatched |
| 124 // an archive request. |
| 125 aeParent, aeName := (&mutations.CreateArchiveTask{Path:
path}).TaskName(di) |
| 126 if err := tumble.CancelNamedMutations(c, aeParent, aeNam
e); err != nil { |
| 127 log.WithError(err).Errorf(c, "Failed to cancel a
rchive expiration mutation.") |
| 128 return grpcutil.Internal |
| 129 } |
| 130 |
| 119 log.Fields{ | 131 log.Fields{ |
| 120 "terminalIndex": ls.TerminalIndex, | 132 "terminalIndex": ls.TerminalIndex, |
| 121 }.Infof(c, "Terminal index was set and archival was disp
atched.") | 133 }.Infof(c, "Terminal index was set and archival was disp
atched.") |
| 122 return nil | 134 return nil |
| 123 } | 135 } |
| 124 }, nil) | 136 }, nil) |
| 125 if err != nil { | 137 if err != nil { |
| 126 log.Fields{ | 138 log.Fields{ |
| 127 log.ErrorKey: err, | 139 log.ErrorKey: err, |
| 128 }.Errorf(c, "Failed to update LogStream.") | 140 }.Errorf(c, "Failed to update LogStream.") |
| 129 return nil, err | 141 return nil, err |
| 130 } | 142 } |
| 131 | 143 |
| 132 return &google.Empty{}, nil | 144 return &google.Empty{}, nil |
| 133 } | 145 } |
| OLD | NEW |