| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package mutations | 5 package mutations |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 29 matching lines...) Expand all Loading... |
| 40 // RollForward implements tumble.DelayedMutation. | 40 // RollForward implements tumble.DelayedMutation. |
| 41 func (m *CreateArchiveTask) RollForward(c context.Context) ([]tumble.Mutation, e
rror) { | 41 func (m *CreateArchiveTask) RollForward(c context.Context) ([]tumble.Mutation, e
rror) { |
| 42 c = log.SetField(c, "id", m.ID) | 42 c = log.SetField(c, "id", m.ID) |
| 43 | 43 |
| 44 svc := coordinator.GetServices(c) | 44 svc := coordinator.GetServices(c) |
| 45 ap, err := svc.ArchivalPublisher(c) | 45 ap, err := svc.ArchivalPublisher(c) |
| 46 if err != nil { | 46 if err != nil { |
| 47 log.WithError(err).Errorf(c, "Failed to get archival publisher."
) | 47 log.WithError(err).Errorf(c, "Failed to get archival publisher."
) |
| 48 return nil, err | 48 return nil, err |
| 49 } | 49 } |
| 50 defer func() { |
| 51 if err := ap.Close(); err != nil { |
| 52 log.WithError(err).Warningf(c, "Failed to close archival
publisher.") |
| 53 } |
| 54 }() |
| 50 | 55 |
| 51 // Get the log stream. | 56 // Get the log stream. |
| 52 state := m.logStream().State(c) | 57 state := m.logStream().State(c) |
| 53 | 58 |
| 54 if err := ds.Get(c, state); err != nil { | 59 if err := ds.Get(c, state); err != nil { |
| 55 if err == ds.ErrNoSuchEntity { | 60 if err == ds.ErrNoSuchEntity { |
| 56 log.Warningf(c, "Log stream no longer exists.") | 61 log.Warningf(c, "Log stream no longer exists.") |
| 57 return nil, nil | 62 return nil, nil |
| 58 } | 63 } |
| 59 | 64 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 104 // logStream returns the log stream associated with this task. | 109 // logStream returns the log stream associated with this task. |
| 105 func (m *CreateArchiveTask) logStream() *coordinator.LogStream { | 110 func (m *CreateArchiveTask) logStream() *coordinator.LogStream { |
| 106 return &coordinator.LogStream{ | 111 return &coordinator.LogStream{ |
| 107 ID: m.ID, | 112 ID: m.ID, |
| 108 } | 113 } |
| 109 } | 114 } |
| 110 | 115 |
| 111 func init() { | 116 func init() { |
| 112 tumble.Register((*CreateArchiveTask)(nil)) | 117 tumble.Register((*CreateArchiveTask)(nil)) |
| 113 } | 118 } |
| OLD | NEW |