| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" |
| 9 "encoding/hex" |
| 8 "fmt" | 10 "fmt" |
| 11 "io" |
| 9 | 12 |
| 10 "github.com/golang/protobuf/proto" | 13 "github.com/golang/protobuf/proto" |
| 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 12 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/gcloud/gs" | 16 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 "github.com/luci/luci-go/common/logdog/types" | 17 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" | 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" | 20 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 17 "github.com/luci/luci-go/server/logdog/archive" | 21 "github.com/luci/luci-go/server/logdog/archive" |
| 18 "github.com/luci/luci-go/server/logdog/storage" | 22 "github.com/luci/luci-go/server/logdog/storage" |
| 19 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 20 ) | 24 ) |
| 21 | 25 |
| 26 // Task is a single archive task. |
| 27 type Task interface { |
| 28 // UniqueID returns a task-unique value. Other tasks, and other retries
of |
| 29 // this task, should (try to) not reuse this ID. |
| 30 UniqueID() string |
| 31 |
| 32 // Task is the archive task to execute. |
| 33 Task() *logdog.ArchiveTask |
| 34 |
| 35 // AssertLease asserts that the lease for this Task is still held. |
| 36 // |
| 37 // On failure, it will return an error. If successful, the Archivist may |
| 38 // assume that it holds the lease longer. |
| 39 AssertLease(context.Context) error |
| 40 } |
| 41 |
| 22 // Archivist is a stateless configuration capable of archiving individual log | 42 // Archivist is a stateless configuration capable of archiving individual log |
| 23 // streams. | 43 // streams. |
| 24 type Archivist struct { | 44 type Archivist struct { |
| 25 // Service is the client to use to communicate with Coordinator's Servic
es | 45 // Service is the client to use to communicate with Coordinator's Servic
es |
| 26 // endpoint. | 46 // endpoint. |
| 27 Service logdog.ServicesClient | 47 Service logdog.ServicesClient |
| 28 | 48 |
| 29 » // Storage is the intermediate storage instance to use to pull log entri
es for | 49 » // Storage is the archival source Storage instance. |
| 30 » // archival. | |
| 31 Storage storage.Storage | 50 Storage storage.Storage |
| 32 | |
| 33 // GSClient is the Google Storage client to for archive generation. | 51 // GSClient is the Google Storage client to for archive generation. |
| 34 GSClient gs.Client | 52 GSClient gs.Client |
| 35 | 53 |
| 36 // GSBase is the base Google Storage path. This includes the bucket name | 54 // GSBase is the base Google Storage path. This includes the bucket name |
| 37 // and any associated path. | 55 // and any associated path. |
| 38 GSBase gs.Path | 56 GSBase gs.Path |
| 57 // GSStagingBase is the base Google Storage path for archive staging. Th
is |
| 58 // includes the bucket name and any associated path. |
| 59 GSStagingBase gs.Path |
| 60 |
| 39 // PrefixIndexRange is the maximum number of stream indexes in between i
ndex | 61 // PrefixIndexRange is the maximum number of stream indexes in between i
ndex |
| 40 // entries. See archive.Manifest for more information. | 62 // entries. See archive.Manifest for more information. |
| 41 StreamIndexRange int | 63 StreamIndexRange int |
| 42 // PrefixIndexRange is the maximum number of prefix indexes in between i
ndex | 64 // PrefixIndexRange is the maximum number of prefix indexes in between i
ndex |
| 43 // entries. See archive.Manifest for more information. | 65 // entries. See archive.Manifest for more information. |
| 44 PrefixIndexRange int | 66 PrefixIndexRange int |
| 45 // ByteRange is the maximum number of stream data bytes in between index | 67 // ByteRange is the maximum number of stream data bytes in between index |
| 46 // entries. See archive.Manifest for more information. | 68 // entries. See archive.Manifest for more information. |
| 47 ByteRange int | 69 ByteRange int |
| 48 } | 70 } |
| 49 | 71 |
| 50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 72 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| 51 // to during archival. This should be greater than the maximum LogEntry size. | 73 // to during archival. This should be greater than the maximum LogEntry size. |
| 52 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 74 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| 53 | 75 |
| 54 // ArchiveTask processes and executes a single log stream archive task. | 76 // ArchiveTask processes and executes a single log stream archive task. |
| 55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { | |
| 56 var task logdog.ArchiveTask | |
| 57 if err := proto.Unmarshal(desc, &task); err != nil { | |
| 58 log.WithError(err).Errorf(c, "Failed to decode archive task.") | |
| 59 return err | |
| 60 } | |
| 61 return a.Archive(c, &task) | |
| 62 } | |
| 63 | |
| 64 // Archive archives a single log stream. If unsuccessful, an error is returned. | |
| 65 // | 77 // |
| 66 // This error may be wrapped in errors.Transient if it is believed to have been | 78 // It returns true on success (delete the task) and false on failure (don't |
| 67 // caused by a transient failure. | 79 // delete the task). The return value of true should only be used if the task |
| 80 // is truly complete and acknowledged by the Coordinator. |
| 68 // | 81 // |
| 69 // If the supplied Context is Done, operation may terminate before completion, | 82 // If the supplied Context is Done, operation may terminate before completion, |
| 70 // returning the Context's error. | 83 // returning the Context's error. |
| 71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { | 84 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
| 85 » delete, err := a.archiveTaskImpl(c, task) |
| 86 » log.Fields{ |
| 87 » » log.ErrorKey: err, |
| 88 » » "delete": delete, |
| 89 » » "path": task.Task().Path, |
| 90 » }.Infof(c, "Finished archive task.") |
| 91 » return delete |
| 92 } |
| 93 |
| 94 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes |
| 95 // an error. The error is useful for testing to assert that certain conditions |
| 96 // were hit. |
| 97 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
{ |
| 98 » at := task.Task() |
| 99 » log.Fields{ |
| 100 » » "path": at.Path, |
| 101 » }.Debugf(c, "Received archival task.") |
| 102 |
| 72 // Load the log stream's current state. If it is already archived, we wi
ll | 103 // Load the log stream's current state. If it is already archived, we wi
ll |
| 73 // return an immediate success. | 104 // return an immediate success. |
| 74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ | 105 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| 75 » » Path: t.Path, | 106 » » Path: at.Path, |
| 76 Desc: true, | 107 Desc: true, |
| 77 }) | 108 }) |
| 78 switch { | 109 switch { |
| 79 case err != nil: | 110 case err != nil: |
| 80 log.WithError(err).Errorf(c, "Failed to load log stream.") | 111 log.WithError(err).Errorf(c, "Failed to load log stream.") |
| 81 » » return err | 112 » » return false, err |
| 113 |
| 82 case ls.State == nil: | 114 case ls.State == nil: |
| 83 » » return errors.New("missing state") | 115 » » log.Errorf(c, "Log stream did not include state.") |
| 116 » » return false, errors.New("log stream did not include state") |
| 117 |
| 118 » case ls.State.Purged: |
| 119 » » log.Warningf(c, "Log stream is purged. Discarding archival reque
st.") |
| 120 » » return true, errors.New("log stream is purged") |
| 121 |
| 122 » case ls.State.Archived: |
| 123 » » log.Infof(c, "Log stream is already archived. Discarding archiva
l request.") |
| 124 » » return true, errors.New("log stream is archived") |
| 125 |
| 126 » case !bytes.Equal(ls.ArchivalKey, at.Key): |
| 127 » » if len(ls.ArchivalKey) == 0 { |
| 128 » » » // The log stream is not registering as "archive pending
" state. |
| 129 » » » // |
| 130 » » » // This can happen if the eventually-consistent datastor
e hasn't updated |
| 131 » » » // its log stream state by the time this Pub/Sub task is
received. In |
| 132 » » » // this case, we will continue retrying the task until d
atastore registers |
| 133 » » » // that some key is associated with it. |
| 134 » » » log.Fields{ |
| 135 » » » » "logStreamArchivalKey": hex.EncodeToString(ls.Ar
chivalKey), |
| 136 » » » » "requestArchivalKey": hex.EncodeToString(at.Ke
y), |
| 137 » » » }.Infof(c, "Archival request received before log stream
has its key.") |
| 138 » » » return false, errors.New("premature archival request") |
| 139 » » } |
| 140 |
| 141 » » // This can happen if a Pub/Sub message is dispatched during a t
ransaction, |
| 142 » » // but that specific transaction failed. In this case, the Pub/S
ub message |
| 143 » » // will have a key that doesn't match the key that was transacti
onally |
| 144 » » // encoded, and can be discarded. |
| 145 » » log.Fields{ |
| 146 » » » "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKe
y), |
| 147 » » » "requestArchivalKey": hex.EncodeToString(at.Key), |
| 148 » » }.Infof(c, "Superfluous archival request (keys do not match). Di
scarding.") |
| 149 » » return true, errors.New("superfluous archival request") |
| 150 |
| 84 case ls.State.ProtoVersion != logpb.Version: | 151 case ls.State.ProtoVersion != logpb.Version: |
| 85 log.Fields{ | 152 log.Fields{ |
| 86 "protoVersion": ls.State.ProtoVersion, | 153 "protoVersion": ls.State.ProtoVersion, |
| 87 "expectedVersion": logpb.Version, | 154 "expectedVersion": logpb.Version, |
| 88 }.Errorf(c, "Unsupported log stream protobuf version.") | 155 }.Errorf(c, "Unsupported log stream protobuf version.") |
| 89 » » return errors.New("unsupported protobuf version") | 156 » » return false, errors.New("unsupported log stream protobuf versio
n") |
| 157 |
| 90 case ls.Desc == nil: | 158 case ls.Desc == nil: |
| 91 » » return errors.New("missing descriptor") | 159 » » log.Errorf(c, "Log stream did not include a descriptor.") |
| 92 | 160 » » return false, errors.New("log stream did not include a descripto
r") |
| 93 » case ls.State.Purged: | 161 » } |
| 94 » » log.Warningf(c, "Log stream is purged.") | 162 |
| 95 » » return nil | 163 » // If the archival request is younger than the settle delay, kick it bac
k to |
| 96 » case ls.State.Archived: | 164 » // retry later. |
| 97 » » log.Infof(c, "Log stream is already archived.") | 165 » age := ls.Age.Duration() |
| 98 » » return nil | 166 » if age < at.SettleDelay.Duration() { |
| 99 » } | 167 » » log.Fields{ |
| 100 | 168 » » » "age": age, |
| 101 » // Deserialize and validate the descriptor protobuf. | 169 » » » "settleDelay": at.SettleDelay.Duration(), |
| 102 » var desc logpb.LogStreamDescriptor | 170 » » }.Infof(c, "Log stream is younger than the settle delay. Returni
ng task to queue.") |
| 103 » if err := proto.Unmarshal(ls.Desc, &desc); err != nil { | 171 » » return false, errors.New("log stream is within settle delay") |
| 172 » } |
| 173 |
| 174 » // Are we required to archive a complete log stream? |
| 175 » complete := (age <= at.CompletePeriod.Duration()) |
| 176 » if complete && ls.State.TerminalIndex < 0 { |
| 177 » » log.Warningf(c, "Cannot archive complete stream with no terminal
index.") |
| 178 » » return false, errors.New("completeness required, but stream has
no terminal index") |
| 179 » } |
| 180 |
| 181 » ar := logdog.ArchiveStreamRequest{ |
| 182 » » Path: at.Path, |
| 183 » } |
| 184 |
| 185 » // Archive to staging. |
| 186 » // |
| 187 » // If a non-transient failure occurs here, we will report it to the Arch
ivist |
| 188 » // under the assumption that it will continue occurring. |
| 189 » // |
| 190 » // We will handle error creating the plan and executing the plan in the
same |
| 191 » // switch statement below. |
| 192 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta
sk.UniqueID()) |
| 193 » if err != nil { |
| 194 » » log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") |
| 195 » } else { |
| 196 » » err = staged.stage(c, complete) |
| 197 » } |
| 198 |
| 199 » switch { |
| 200 » case errors.IsTransient(err): |
| 201 » » // If this is a transient error, exit immediately and do not del
ete the |
| 202 » » // archival task. |
| 203 » » log.WithError(err).Warningf(c, "TRANSIENT error during archival
operation.") |
| 204 » » return false, err |
| 205 |
| 206 » case err != nil: |
| 207 » » // This is a non-transient error, so we are confident that any f
uture |
| 208 » » // Archival will also encounter this error. We will mark this ar
chival |
| 209 » » // as an error and report it to the Coordinator. |
| 210 » » log.WithError(err).Errorf(c, "Archival failed with non-transient
error.") |
| 211 » » ar.Error = err.Error() |
| 212 » » if ar.Error == "" { |
| 213 » » » // This needs to be non-nil, so if our acutal error has
an empty string, |
| 214 » » » // fill in a generic message. |
| 215 » » » ar.Error = "archival error" |
| 216 » » } |
| 217 |
| 218 » default: |
| 219 » » // In case something fails, clean up our staged archival (best e
ffort). |
| 220 » » defer staged.cleanup(c) |
| 221 |
| 222 » » // Finalize the archival. First, extend our lease to confirm tha
t we still |
| 223 » » // hold it. |
| 224 » » if err := task.AssertLease(c); err != nil { |
| 225 » » » log.WithError(err).Errorf(c, "Failed to extend task leas
e before finalizing.") |
| 226 » » » return false, err |
| 227 » » } |
| 228 |
| 229 » » // Finalize the archival. |
| 230 » » if err := staged.finalize(c, a.GSClient, &ar); err != nil { |
| 231 » » » log.WithError(err).Errorf(c, "Failed to finalize archiva
l.") |
| 232 » » » return false, err |
| 233 » » } |
| 234 » } |
| 235 |
| 236 » log.Fields{ |
| 237 » » "streamURL": ar.StreamUrl, |
| 238 » » "indexURL": ar.IndexUrl, |
| 239 » » "dataURL": ar.DataUrl, |
| 240 » » "terminalIndex": ar.TerminalIndex, |
| 241 » » "logEntryCount": ar.LogEntryCount, |
| 242 » » "hadError": ar.Error, |
| 243 » » "complete": ar.Complete(), |
| 244 » }.Debugf(c, "Finished archival round. Reporting archive state.") |
| 245 |
| 246 » // Extend the lease again to confirm that we still hold it. |
| 247 » if err := task.AssertLease(c); err != nil { |
| 248 » » log.WithError(err).Errorf(c, "Failed to extend task lease before
reporting.") |
| 249 » » return false, err |
| 250 » } |
| 251 |
| 252 » if _, err := a.Service.ArchiveStream(c, &ar); err != nil { |
| 253 » » log.WithError(err).Errorf(c, "Failed to report archive state.") |
| 254 » » return false, err |
| 255 » } |
| 256 |
| 257 » // Archival is complete and acknowledged by Coordinator. Consume the arc
hival |
| 258 » // task. |
| 259 » return true, nil |
| 260 } |
| 261 |
| 262 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
ls *logdog.LoadStreamResponse, uid string) ( |
| 263 » *stagedArchival, error) { |
| 264 » sa := stagedArchival{ |
| 265 » » Archivist: a, |
| 266 » » path: path, |
| 267 |
| 268 » » terminalIndex: ls.State.TerminalIndex, |
| 269 » } |
| 270 |
| 271 » // Deserialize and validate the descriptor protobuf. If this fails, it i
s a |
| 272 » // non-transient error. |
| 273 » if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| 104 log.Fields{ | 274 log.Fields{ |
| 105 log.ErrorKey: err, | 275 log.ErrorKey: err, |
| 106 "protoVersion": ls.State.ProtoVersion, | 276 "protoVersion": ls.State.ProtoVersion, |
| 107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 277 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| 108 » » return err | 278 » » return nil, err |
| 109 » } | 279 » } |
| 110 | 280 |
| 111 » task := &archiveTask{ | 281 » bext := sa.desc.BinaryFileExt |
| 112 » » Archivist: a, | |
| 113 » » ArchiveTask: t, | |
| 114 » » ls: ls, | |
| 115 » » desc: &desc, | |
| 116 » } | |
| 117 » if err := task.archive(c); err != nil { | |
| 118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati
on.") | |
| 119 » » return err | |
| 120 » } | |
| 121 » log.Fields{ | |
| 122 » » "streamURL": task.ar.StreamUrl, | |
| 123 » » "indexURL": task.ar.IndexUrl, | |
| 124 » » "dataURL": task.ar.DataUrl, | |
| 125 » » "terminalIndex": task.ar.TerminalIndex, | |
| 126 » » "complete": task.ar.Complete, | |
| 127 » }.Debugf(c, "Finished archive construction.") | |
| 128 | |
| 129 » if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { | |
| 130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi
ved.") | |
| 131 » » return err | |
| 132 » } | |
| 133 » return nil | |
| 134 } | |
| 135 | |
| 136 // archiveTask is the set of parameters for a single archival. | |
| 137 type archiveTask struct { | |
| 138 » *Archivist | |
| 139 » *logdog.ArchiveTask | |
| 140 | |
| 141 » // ls is the log stream state. | |
| 142 » ls *logdog.LoadStreamResponse | |
| 143 » // desc is the unmarshaled log stream descriptor. | |
| 144 » desc *logpb.LogStreamDescriptor | |
| 145 | |
| 146 » // ar will be populated during archive construction. | |
| 147 » ar logdog.ArchiveStreamRequest | |
| 148 } | |
| 149 | |
| 150 // archiveState performs the archival operation on a stream described by a | |
| 151 // Coordinator State. Upon success, the State will be updated with the result | |
| 152 // of the archival operation. | |
| 153 func (t *archiveTask) archive(c context.Context) (err error) { | |
| 154 » // Generate our archival object managers. | |
| 155 » bext := t.desc.BinaryFileExt | |
| 156 if bext == "" { | 282 if bext == "" { |
| 157 bext = "bin" | 283 bext = "bin" |
| 158 } | 284 } |
| 159 | 285 |
| 160 » path := t.Path | 286 » // Construct our staged archival paths. |
| 161 » var streamO, indexO, dataO *gsObject | 287 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) |
| 162 » streamO, err = t.newGSObject(c, path, "logstream.entries") | 288 » sa.index = a.makeStagingPaths(path, "logstream.index", uid) |
| 163 » if err != nil { | 289 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) |
| 164 » » log.WithError(err).Errorf(c, "Failed to create log object.") | 290 » return &sa, nil |
| 165 » » return | 291 } |
| 166 » } | 292 |
| 167 | 293 // makeStagingPaths returns a stagingPaths instance for the given path and |
| 168 » indexO, err = t.newGSObject(c, path, "logstream.index") | 294 // file name. It incorporates a unique ID into the staging name to differentiate |
| 169 » if err != nil { | 295 // it from other staging paths for the same path/name. |
| 170 » » log.WithError(err).Errorf(c, "Failed to create index object.") | 296 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st
agingPaths { |
| 171 » » return | 297 » return stagingPaths{ |
| 172 » } | 298 » » staged: a.GSStagingBase.Concat(string(path), uid, name), |
| 173 | 299 » » final: a.GSBase.Concat(string(path), name), |
| 174 » dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext)) | 300 » } |
| 175 » if err != nil { | 301 } |
| 176 » » log.WithError(err).Errorf(c, "Failed to create data object.") | 302 |
| 177 » » return | 303 type stagedArchival struct { |
| 178 » } | 304 » *Archivist |
| 179 | 305 |
| 180 » // Load the URLs into our state. | 306 » path types.StreamPath |
| 181 » t.ar.StreamUrl = streamO.url | 307 » desc logpb.LogStreamDescriptor |
| 182 » t.ar.IndexUrl = indexO.url | 308 |
| 183 » t.ar.DataUrl = dataO.url | 309 » stream stagingPaths |
| 184 | 310 » streamSize int64 |
| 311 |
| 312 » index stagingPaths |
| 313 » indexSize int64 |
| 314 |
| 315 » data stagingPaths |
| 316 » dataSize int64 |
| 317 |
| 318 » finalized bool |
| 319 » terminalIndex int64 |
| 320 » logEntryCount int64 |
| 321 } |
| 322 |
| 323 // stage executes the archival process, archiving to the staged storage paths. |
| 324 // |
| 325 // If stage fails, it may return a transient error. |
| 326 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) { |
| 185 log.Fields{ | 327 log.Fields{ |
| 186 » » "streamURL": t.ar.StreamUrl, | 328 » » "streamURL": sa.stream.staged, |
| 187 » » "indexURL": t.ar.IndexUrl, | 329 » » "indexURL": sa.index.staged, |
| 188 » » "dataURL": t.ar.DataUrl, | 330 » » "dataURL": sa.data.staged, |
| 189 » }.Infof(c, "Archiving log stream...") | 331 » }.Debugf(c, "Staging log stream...") |
| 190 | 332 |
| 191 » // We want to try and delete any GS objects that were created during a f
ailed | 333 » // Group any transient errors that occur during cleanup. If we aren't |
| 192 » // archival attempt. | 334 » // returning a non-transient error, return a transient "terr". |
| 193 » deleteOnFail := func(o *gsObject) { | 335 » var terr errors.MultiError |
| 194 » » if o == nil || err == nil { | 336 » defer func() { |
| 195 » » » return | 337 » » if err == nil && len(terr) > 0 { |
| 196 » » } | 338 » » » err = errors.WrapTransient(terr) |
| 197 » » if ierr := o.delete(); ierr != nil { | 339 » » } |
| 340 » }() |
| 341 |
| 342 » // Close our writers on exit. If any of them fail to close, mark the arc
hival |
| 343 » // as a transient failure. |
| 344 » closeWriter := func(closer io.Closer, path gs.Path) { |
| 345 » » // Close the Writer. If this results in an error, append it to o
ur transient |
| 346 » » // error MultiError. |
| 347 » » if ierr := closer.Close(); ierr != nil { |
| 348 » » » terr = append(terr, ierr) |
| 349 » » } |
| 350 |
| 351 » » // If we have an archival error, also delete the path associated
with this |
| 352 » » // stream. This is a non-fatal failure, since we've already hit
a fatal |
| 353 » » // one. |
| 354 » » if err != nil || len(terr) > 0 { |
| 355 » » » if ierr := sa.GSClient.Delete(path); ierr != nil { |
| 356 » » » » log.Fields{ |
| 357 » » » » » log.ErrorKey: ierr, |
| 358 » » » » » "path": path, |
| 359 » » » » }.Warningf(c, "Failed to delete stream on error.
") |
| 360 » » » } |
| 361 » » } |
| 362 » } |
| 363 |
| 364 » // createWriter is a shorthand function for creating a writer to a path
and |
| 365 » // reporting an error if it failed. |
| 366 » createWriter := func(p gs.Path) (gs.Writer, error) { |
| 367 » » w, ierr := sa.GSClient.NewWriter(p) |
| 368 » » if ierr != nil { |
| 198 log.Fields{ | 369 log.Fields{ |
| 199 log.ErrorKey: ierr, | 370 log.ErrorKey: ierr, |
| 200 » » » » "url": o.url, | 371 » » » » "path": p, |
| 201 » » » }.Warningf(c, "Failed to clean-up GS object on failure."
) | 372 » » » }.Errorf(c, "Failed to create writer.") |
| 202 » » } | 373 » » » return nil, ierr |
| 203 » } | 374 » » } |
| 204 » defer deleteOnFail(streamO) | 375 » » return w, nil |
| 205 » defer deleteOnFail(indexO) | 376 » } |
| 206 » defer deleteOnFail(dataO) | 377 |
| 207 | 378 » var streamWriter, indexWriter, dataWriter gs.Writer |
| 208 » // Close our GS object managers on exit. If any of them fail to close, m
arh | 379 » if streamWriter, err = createWriter(sa.stream.staged); err != nil { |
| 209 » // the archival as a failure. | 380 » » return |
| 210 » closeOM := func(o *gsObject) { | 381 » } |
| 211 » » if o == nil { | 382 » defer closeWriter(streamWriter, sa.stream.staged) |
| 212 » » » return | 383 |
| 213 » » } | 384 » if indexWriter, err = createWriter(sa.index.staged); err != nil { |
| 214 » » if ierr := o.Close(); ierr != nil { | 385 » » return err |
| 215 » » » err = ierr | 386 » } |
| 216 » » } | 387 » defer closeWriter(indexWriter, sa.index.staged) |
| 217 » } | 388 |
| 218 » defer closeOM(streamO) | 389 » if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| 219 » defer closeOM(indexO) | 390 » » return err |
| 220 » defer closeOM(dataO) | 391 » } |
| 392 » defer closeWriter(dataWriter, sa.data.staged) |
| 221 | 393 |
| 222 // Read our log entries from intermediate storage. | 394 // Read our log entries from intermediate storage. |
| 223 ss := storageSource{ | 395 ss := storageSource{ |
| 224 Context: c, | 396 Context: c, |
| 225 » » st: t.Storage, | 397 » » st: sa.Storage, |
| 226 » » path: types.StreamPath(t.Path), | 398 » » path: sa.path, |
| 227 » » contiguous: t.Complete, | 399 » » contiguous: complete, |
| 228 » » terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), | 400 » » terminalIndex: types.MessageIndex(sa.terminalIndex), |
| 229 lastIndex: -1, | 401 lastIndex: -1, |
| 230 } | 402 } |
| 231 | 403 |
| 232 m := archive.Manifest{ | 404 m := archive.Manifest{ |
| 233 » » Desc: t.desc, | 405 » » Desc: &sa.desc, |
| 234 Source: &ss, | 406 Source: &ss, |
| 235 » » LogWriter: streamO, | 407 » » LogWriter: streamWriter, |
| 236 » » IndexWriter: indexO, | 408 » » IndexWriter: indexWriter, |
| 237 » » DataWriter: dataO, | 409 » » DataWriter: dataWriter, |
| 238 » » StreamIndexRange: t.StreamIndexRange, | 410 » » StreamIndexRange: sa.StreamIndexRange, |
| 239 » » PrefixIndexRange: t.PrefixIndexRange, | 411 » » PrefixIndexRange: sa.PrefixIndexRange, |
| 240 » » ByteRange: t.ByteRange, | 412 » » ByteRange: sa.ByteRange, |
| 241 | 413 |
| 242 Logger: log.Get(c), | 414 Logger: log.Get(c), |
| 243 } | 415 } |
| 244 » err = archive.Archive(m) | 416 » if err = archive.Archive(m); err != nil { |
| 245 » if err != nil { | |
| 246 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 417 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 247 return | 418 return |
| 248 } | 419 } |
| 249 | 420 |
| 250 » t.ar.TerminalIndex = int64(ss.lastIndex) | 421 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) { |
| 251 » if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { | 422 » » // Fail if we were requested to archive only the complete log. W
e consider |
| 252 » » // Fail, if we were requested to archive only the complete log. | 423 » » // this a transient error with the expectation that the missing
entries will |
| 253 » » if t.Complete { | 424 » » // show up in future retries. |
| 254 » » » log.Fields{ | 425 » » switch { |
| 255 » » » » "terminalIndex": tidx, | 426 » » case complete && ss.hasMissingEntries: |
| 256 » » » » "lastIndex": t.ar.TerminalIndex, | 427 » » » log.Errorf(c, "Log stream has missing entries, but compl
eteness is required.") |
| 257 » » » }.Errorf(c, "Log stream archival stopped prior to termin
al index.") | 428 » » » err = errors.WrapTransient(errors.New("stream has missin
g entries")) |
| 258 » » » return errors.New("stream finished short of terminal ind
ex") | 429 » » » return |
| 259 » » } | 430 |
| 260 | 431 » » case ss.logEntryCount == 0: |
| 261 » » if t.ar.TerminalIndex < 0 { | |
| 262 // If our last log index was <0, then no logs were archi
ved. | 432 // If our last log index was <0, then no logs were archi
ved. |
| 263 log.Warningf(c, "No log entries were archived.") | 433 log.Warningf(c, "No log entries were archived.") |
| 264 » » } else { | 434 |
| 435 » » default: |
| 265 // Update our terminal index. | 436 // Update our terminal index. |
| 266 log.Fields{ | 437 log.Fields{ |
| 267 » » » » "from": tidx, | 438 » » » » "terminalIndex": ss.lastIndex, |
| 268 » » » » "to": t.ar.TerminalIndex, | 439 » » » » "logEntryCount": ss.logEntryCount, |
| 269 » » » }.Infof(c, "Updated log stream terminal index.") | 440 » » » » "hasMissingEntries": ss.hasMissingEntries, |
| 441 » » » }.Debugf(c, "Finished archiving log stream.") |
| 270 } | 442 } |
| 271 } | 443 } |
| 272 | 444 |
| 273 // Update our state with archival results. | 445 // Update our state with archival results. |
| 274 » t.ar.Path = t.Path | 446 » sa.terminalIndex = int64(ss.lastIndex) |
| 275 » t.ar.StreamSize = streamO.Count() | 447 » sa.logEntryCount = ss.logEntryCount |
| 276 » t.ar.IndexSize = indexO.Count() | 448 » sa.stream.count = streamWriter.Count() |
| 277 » t.ar.DataSize = dataO.Count() | 449 » sa.index.count = indexWriter.Count() |
| 278 » t.ar.Complete = !ss.hasMissingEntries | 450 » sa.data.count = dataWriter.Count() |
| 279 return | 451 return |
| 280 } | 452 } |
| 281 | 453 |
| 282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) (
*gsObject, error) { | 454 type stagingPaths struct { |
| 283 » p := t.GSBase.Concat(path, name) | 455 » staged gs.Path |
| 284 » o := gsObject{ | 456 » final gs.Path |
| 285 » » gs: t.GSClient, | 457 » count int64 |
| 286 » » bucket: p.Bucket(), | 458 } |
| 287 » » path: p.Filename(), | 459 |
| 288 » } | 460 func (d *stagingPaths) clearStaged() { |
| 289 | 461 » d.staged = "" |
| 290 » // Build our GS URL. Note that since buildGSPath joins with "/", the ini
tial | 462 } |
| 291 » // token, "gs:/", will become "gs://". | 463 |
| 292 » o.url = string(p) | 464 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd
og.ArchiveStreamRequest) error { |
| 293 | 465 » err := parallel.FanOutIn(func(taskC chan<- func() error) { |
| 294 » var err error | 466 » » for _, d := range sa.getStagingPaths() { |
| 295 » o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path) | 467 » » » d := d |
| 468 |
| 469 » » » // Don't copy zero-sized streams. |
| 470 » » » if d.count == 0 { |
| 471 » » » » continue |
| 472 » » » } |
| 473 |
| 474 » » » taskC <- func() error { |
| 475 » » » » if err := client.Rename(d.staged, d.final); err
!= nil { |
| 476 » » » » » log.Fields{ |
| 477 » » » » » » log.ErrorKey: err, |
| 478 » » » » » » "stagedPath": d.staged, |
| 479 » » » » » » "finalPath": d.final, |
| 480 » » » » » }.Errorf(c, "Failed to rename GS object.
") |
| 481 » » » » » return err |
| 482 » » » » } |
| 483 |
| 484 » » » » // Clear the staged value to indicate that it no
longer exists. |
| 485 » » » » d.clearStaged() |
| 486 » » » » return nil |
| 487 » » » } |
| 488 » » } |
| 489 » }) |
| 296 if err != nil { | 490 if err != nil { |
| 297 » » log.Fields{ | 491 » » return err |
| 298 » » » log.ErrorKey: err, | 492 » } |
| 299 » » » "url": o.url, | 493 |
| 300 » » }.Errorf(c, "Failed to create Writer.") | 494 » ar.TerminalIndex = sa.terminalIndex |
| 301 » » return nil, err | 495 » ar.LogEntryCount = sa.logEntryCount |
| 302 » } | 496 » ar.StreamUrl = string(sa.stream.final) |
| 303 | 497 » ar.StreamSize = sa.stream.count |
| 304 » // Delete any existing object at this path. | 498 » ar.IndexUrl = string(sa.index.final) |
| 305 » if err := o.delete(); err != nil { | 499 » ar.IndexSize = sa.index.count |
| 306 » » closeErr := o.Close() | 500 » ar.DataUrl = string(sa.data.final) |
| 307 | 501 » ar.DataSize = sa.data.count |
| 308 » » log.Fields{ | 502 » return nil |
| 309 » » » log.ErrorKey: err, | 503 } |
| 310 » » » "closeErr": closeErr, | 504 |
| 311 » » » "url": o.url, | 505 func (sa *stagedArchival) cleanup(c context.Context) { |
| 312 » » }.Errorf(c, "Could not delete object during creation.") | 506 » for _, d := range sa.getStagingPaths() { |
| 313 » » return nil, err | 507 » » if d.staged == "" { |
| 314 » } | 508 » » » continue |
| 315 » return &o, nil | 509 » » } |
| 316 } | 510 |
| 317 | 511 » » if err := sa.GSClient.Delete(d.staged); err != nil { |
| 318 // gsObjectManger wraps a gsObject instance with metadata. | |
| 319 type gsObject struct { | |
| 320 » gs.Writer | |
| 321 | |
| 322 » // gs is the Client instance. | |
| 323 » gs gs.Client | |
| 324 » // bucket is the name of the object's bucket. | |
| 325 » bucket string | |
| 326 » // path is the bucket-relative path of the object. | |
| 327 » path string | |
| 328 » // url is the Google Storage URL (gs://) of this object. | |
| 329 » url string | |
| 330 } | |
| 331 | |
| 332 func (o *gsObject) delete() error { | |
| 333 » return o.gs.Delete(o.bucket, o.path) | |
| 334 } | |
| 335 | |
| 336 // storageSource is an archive.LogEntrySource that pulls log entries from | |
| 337 // intermediate storage via its storage.Storage instance. | |
| 338 type storageSource struct { | |
| 339 » context.Context | |
| 340 | |
| 341 » st storage.Storage // the storage instance to read from | |
| 342 » path types.StreamPath // the path of the log stream | |
| 343 » contiguous bool // if true, enforce contiguous entries | |
| 344 » terminalIndex types.MessageIndex // if >= 0, discard logs beyond this | |
| 345 | |
| 346 » buf []*logpb.LogEntry | |
| 347 » lastIndex types.MessageIndex | |
| 348 » hasMissingEntries bool // true if some log entries were missing. | |
| 349 } | |
| 350 | |
| 351 func (s *storageSource) bufferEntries(start types.MessageIndex) error { | |
| 352 » bytes := 0 | |
| 353 | |
| 354 » req := storage.GetRequest{ | |
| 355 » » Path: s.path, | |
| 356 » » Index: start, | |
| 357 » } | |
| 358 » return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { | |
| 359 » » le := logpb.LogEntry{} | |
| 360 » » if err := proto.Unmarshal(d, &le); err != nil { | |
| 361 log.Fields{ | 512 log.Fields{ |
| 362 » » » » log.ErrorKey: err, | 513 » » » » log.ErrorKey: err, |
| 363 » » » » "streamIndex": idx, | 514 » » » » "path": d.staged, |
| 364 » » » }.Errorf(s, "Failed to unmarshal LogEntry.") | 515 » » » }.Warningf(c, "Failed to clean up staged path.") |
| 365 » » » return false | 516 » » } |
| 366 » » } | 517 |
| 367 » » s.buf = append(s.buf, &le) | 518 » » d.clearStaged() |
| 368 | 519 » } |
| 369 » » // Stop loading if we've reached or exceeded our buffer size. | 520 } |
| 370 » » bytes += len(d) | 521 |
| 371 » » return bytes < storageBufferSize | 522 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
| 372 » }) | 523 » return []*stagingPaths{ |
| 373 } | 524 » » &sa.stream, |
| 374 | 525 » » &sa.index, |
| 375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { | 526 » » &sa.data, |
| 376 » if len(s.buf) == 0 { | 527 » } |
| 377 » » s.buf = s.buf[:0] | 528 } |
| 378 » » if err := s.bufferEntries(s.lastIndex + 1); err != nil { | |
| 379 » » » if err == storage.ErrDoesNotExist { | |
| 380 » » » » log.Warningf(s, "Archive target stream does not
exist in intermediate storage.") | |
| 381 » » » » return nil, archive.ErrEndOfStream | |
| 382 » » » } | |
| 383 | |
| 384 » » » log.WithError(err).Errorf(s, "Failed to retrieve log str
eam from storage.") | |
| 385 » » » return nil, err | |
| 386 » » } | |
| 387 » } | |
| 388 | |
| 389 » if len(s.buf) == 0 { | |
| 390 » » log.Fields{ | |
| 391 » » » "lastIndex": s.lastIndex, | |
| 392 » » }.Debugf(s, "Encountered end of stream.") | |
| 393 » » return nil, archive.ErrEndOfStream | |
| 394 » } | |
| 395 | |
| 396 » var le *logpb.LogEntry | |
| 397 » le, s.buf = s.buf[0], s.buf[1:] | |
| 398 | |
| 399 » // If we're enforcing a contiguous log stream, error if this LogEntry is
not | |
| 400 » // contiguous. | |
| 401 » sidx := types.MessageIndex(le.StreamIndex) | |
| 402 » nidx := (s.lastIndex + 1) | |
| 403 » if sidx != nidx { | |
| 404 » » s.hasMissingEntries = true | |
| 405 | |
| 406 » » if s.contiguous { | |
| 407 » » » log.Fields{ | |
| 408 » » » » "index": sidx, | |
| 409 » » » » "nextIndex": nidx, | |
| 410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing."
) | |
| 411 » » » return nil, errors.New("non-contiguous log stream") | |
| 412 » » } | |
| 413 » } | |
| 414 | |
| 415 » // If we're enforcing a maximum terminal index, return end of stream if
this | |
| 416 » // LogEntry exceeds that index. | |
| 417 » if s.terminalIndex >= 0 && sidx > s.terminalIndex { | |
| 418 » » log.Fields{ | |
| 419 » » » "index": sidx, | |
| 420 » » » "terminalIndex": s.terminalIndex, | |
| 421 » » }.Warningf(s, "Discarding log entries beyond expected terminal i
ndex.") | |
| 422 » » return nil, archive.ErrEndOfStream | |
| 423 » } | |
| 424 | |
| 425 » s.lastIndex = sidx | |
| 426 » return le, nil | |
| 427 } | |
| OLD | NEW |