Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | |
| 9 | 10 |
| 10 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 12 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 12 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/gcloud/gs" | 14 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 "github.com/luci/luci-go/common/logdog/types" | 15 "github.com/luci/luci-go/common/logdog/types" |
| 15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" | 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 17 "github.com/luci/luci-go/server/logdog/archive" | 18 "github.com/luci/luci-go/server/logdog/archive" |
| 18 "github.com/luci/luci-go/server/logdog/storage" | 19 "github.com/luci/luci-go/server/logdog/storage" |
| (...skipping 26 matching lines...) Expand all Loading... | |
| 45 // ByteRange is the maximum number of stream data bytes in between index | 46 // ByteRange is the maximum number of stream data bytes in between index |
| 46 // entries. See archive.Manifest for more information. | 47 // entries. See archive.Manifest for more information. |
| 47 ByteRange int | 48 ByteRange int |
| 48 } | 49 } |
| 49 | 50 |
| 50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used | 51 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used |
| 51 // to during archival. This should be greater than the maximum LogEntry size. | 52 // to during archival. This should be greater than the maximum LogEntry size. |
| 52 const storageBufferSize = types.MaxLogEntryDataSize * 64 | 53 const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| 53 | 54 |
| 54 // ArchiveTask processes and executes a single log stream archive task. | 55 // ArchiveTask processes and executes a single log stream archive task. |
| 55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { | 56 // |
| 57 // It returns true on success (delete the task) and false on failure (don't | |
| 58 // delete the task). The return value of true should only be used if the task | |
| 59 // is truely complete, as deleting a named task in the task queue will cause it | |
|
Vadim Sh.
2016/04/07 01:21:32
typo: truly
also you don't use named task (anymor
dnj
2016/04/11 17:20:04
Done.
| |
| 60 // to not be reschedulable for a long period of time (10 days). | |
| 61 // | |
| 62 // If the supplied Context is Done, operation may terminate before completion, | |
| 63 // returning the Context's error. | |
| 64 func (a *Archivist) ArchiveTask(c context.Context, desc []byte, age time.Duratio n) bool { | |
| 56 var task logdog.ArchiveTask | 65 var task logdog.ArchiveTask |
| 57 if err := proto.Unmarshal(desc, &task); err != nil { | 66 if err := proto.Unmarshal(desc, &task); err != nil { |
| 58 log.WithError(err).Errorf(c, "Failed to decode archive task.") | 67 log.WithError(err).Errorf(c, "Failed to decode archive task.") |
| 59 » » return err | 68 » » return false |
| 60 } | 69 } |
| 61 » return a.Archive(c, &task) | 70 » if err := a.Archive(c, &task, age); err != nil { |
| 71 » » log.WithError(err).Errorf(c, "Archival task failed.") | |
| 72 » » return false | |
| 73 » } | |
| 74 » return true | |
| 62 } | 75 } |
| 63 | 76 |
| 64 // Archive archives a single log stream. If unsuccessful, an error is returned. | 77 // Archive processes and executes a single log stream archive task. |
| 65 // | 78 // |
| 66 // This error may be wrapped in errors.Transient if it is believed to have been | 79 // It returns an error if the archival isn't complete and confirmed by the |
| 67 // caused by a transient failure. | 80 // Coordinator. |
| 68 // | 81 // |
| 69 // If the supplied Context is Done, operation may terminate before completion, | 82 // If the supplied Context is Done, operation may terminate before completion, |
| 70 // returning the Context's error. | 83 // returning the Context's error. |
| 71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { | 84 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask, age time.D uration) error { |
| 85 » complete := (age <= t.CompletePeriod.Duration()) | |
| 86 | |
| 72 // Load the log stream's current state. If it is already archived, we wi ll | 87 // Load the log stream's current state. If it is already archived, we wi ll |
| 73 // return an immediate success. | 88 // return an immediate success. |
| 74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ | 89 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| 75 Path: t.Path, | 90 Path: t.Path, |
| 76 Desc: true, | 91 Desc: true, |
| 77 }) | 92 }) |
| 78 switch { | 93 switch { |
| 79 case err != nil: | 94 case err != nil: |
| 80 » » log.WithError(err).Errorf(c, "Failed to load log stream.") | 95 » » return fmt.Errorf("failed to load log stream: %v", err) |
| 81 » » return err | |
| 82 case ls.State == nil: | 96 case ls.State == nil: |
| 83 » » return errors.New("missing state") | 97 » » return errors.New("log stream did not include state") |
| 84 case ls.State.ProtoVersion != logpb.Version: | 98 case ls.State.ProtoVersion != logpb.Version: |
| 85 log.Fields{ | 99 log.Fields{ |
| 86 "protoVersion": ls.State.ProtoVersion, | 100 "protoVersion": ls.State.ProtoVersion, |
| 87 "expectedVersion": logpb.Version, | 101 "expectedVersion": logpb.Version, |
| 88 }.Errorf(c, "Unsupported log stream protobuf version.") | 102 }.Errorf(c, "Unsupported log stream protobuf version.") |
| 89 » » return errors.New("unsupported protobuf version") | 103 » » return errors.New("unsupported log stream protobuf version") |
| 90 case ls.Desc == nil: | 104 case ls.Desc == nil: |
| 91 » » return errors.New("missing descriptor") | 105 » » return errors.New("log stream did not include a descriptor") |
| 92 | 106 |
| 93 case ls.State.Purged: | 107 case ls.State.Purged: |
| 94 log.Warningf(c, "Log stream is purged.") | 108 log.Warningf(c, "Log stream is purged.") |
| 95 return nil | 109 return nil |
| 96 case ls.State.Archived: | 110 case ls.State.Archived: |
| 97 log.Infof(c, "Log stream is already archived.") | 111 log.Infof(c, "Log stream is already archived.") |
| 98 return nil | 112 return nil |
| 113 | |
| 114 case complete && ls.State.TerminalIndex < 0: | |
| 115 return errors.WrapTransient(errors.New("cannot archive complete stream with no terminal index")) | |
| 99 } | 116 } |
| 100 | 117 |
| 101 // Deserialize and validate the descriptor protobuf. | 118 // Deserialize and validate the descriptor protobuf. |
| 102 var desc logpb.LogStreamDescriptor | 119 var desc logpb.LogStreamDescriptor |
| 103 if err := proto.Unmarshal(ls.Desc, &desc); err != nil { | 120 if err := proto.Unmarshal(ls.Desc, &desc); err != nil { |
| 104 log.Fields{ | 121 log.Fields{ |
| 105 log.ErrorKey: err, | 122 log.ErrorKey: err, |
| 106 "protoVersion": ls.State.ProtoVersion, | 123 "protoVersion": ls.State.ProtoVersion, |
| 107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 124 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| 108 return err | 125 return err |
| 109 } | 126 } |
| 110 | 127 |
| 111 task := &archiveTask{ | 128 task := &archiveTask{ |
| 112 Archivist: a, | 129 Archivist: a, |
| 113 ArchiveTask: t, | 130 ArchiveTask: t, |
| 131 complete: complete, | |
| 114 ls: ls, | 132 ls: ls, |
| 115 desc: &desc, | 133 desc: &desc, |
| 116 } | 134 } |
| 117 » if err := task.archive(c); err != nil { | 135 » aerr := task.archive(c) |
| 118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati on.") | 136 » if aerr != nil { |
| 119 » » return err | 137 » » // If this is a transient error, fail immediately with it. Use |
| 138 » » // isTransientError because err can be an errors.MultiError. | |
| 139 » » // | |
| 140 » » // Returning a transient error here will cause task queue to res chedule. | |
| 141 » » if isTransientError(aerr) { | |
| 142 » » » log.WithError(aerr).Errorf(c, "TRANSIENT error during ar chival operation.") | |
| 143 » » » return aerr | |
| 144 » » } | |
| 145 | |
| 146 » » // This is a non-transient error, so we will report it as an err or to the | |
| 147 » » // service. It may accept the archival (success), in which case we'll delete | |
| 148 » » // our task, or respond with an error code (Aborted) indicating that this | |
| 149 » » // archival run should be retried and that we should not delete the task. | |
| 150 » » log.WithError(aerr).Errorf(c, "Archival failed with non-transien t error.") | |
| 151 » » task.ar.Error = true | |
| 120 } | 152 } |
| 153 | |
| 121 log.Fields{ | 154 log.Fields{ |
| 122 "streamURL": task.ar.StreamUrl, | 155 "streamURL": task.ar.StreamUrl, |
| 123 "indexURL": task.ar.IndexUrl, | 156 "indexURL": task.ar.IndexUrl, |
| 124 "dataURL": task.ar.DataUrl, | 157 "dataURL": task.ar.DataUrl, |
| 125 "terminalIndex": task.ar.TerminalIndex, | 158 "terminalIndex": task.ar.TerminalIndex, |
| 126 » » "complete": task.ar.Complete, | 159 » » "logEntryCount": task.ar.LogEntryCount, |
| 127 » }.Debugf(c, "Finished archive construction.") | 160 » » "hadError": task.ar.Error, |
| 161 » » "complete": task.ar.Complete(), | |
| 162 » }.Debugf(c, "Finished archive construction; reporting archive state.") | |
| 128 | 163 |
| 129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { | 164 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { |
| 130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") | 165 » » // All service errors will be reported and returned as transient . We will |
| 131 » » return err | 166 » » // NOT delete this archival task unless the service has explicit ly |
| 167 » » // instructed us to via success code. | |
| 168 » » log.WithError(err).Errorf(c, "Failed to report archive state.") | |
| 169 » » return errors.WrapTransient(errors.New("failed to report archive state")) | |
| 132 } | 170 } |
| 133 return nil | 171 return nil |
| 134 } | 172 } |
| 135 | 173 |
| 136 // archiveTask is the set of parameters for a single archival. | 174 // archiveTask is the set of parameters for a single archival. |
| 137 type archiveTask struct { | 175 type archiveTask struct { |
| 138 *Archivist | 176 *Archivist |
| 139 *logdog.ArchiveTask | 177 *logdog.ArchiveTask |
| 140 | 178 |
| 179 // complete, if true, will cause incomplete streams to return transient | |
| 180 // errors. | |
| 181 complete bool | |
| 141 // ls is the log stream state. | 182 // ls is the log stream state. |
| 142 ls *logdog.LoadStreamResponse | 183 ls *logdog.LoadStreamResponse |
| 143 // desc is the unmarshaled log stream descriptor. | 184 // desc is the unmarshaled log stream descriptor. |
| 144 desc *logpb.LogStreamDescriptor | 185 desc *logpb.LogStreamDescriptor |
| 145 | 186 |
| 146 // ar will be populated during archive construction. | 187 // ar will be populated during archive construction. |
| 147 ar logdog.ArchiveStreamRequest | 188 ar logdog.ArchiveStreamRequest |
| 148 } | 189 } |
| 149 | 190 |
| 150 // archiveState performs the archival operation on a stream described by a | 191 // archiveState performs the archival operation on a stream described by a |
| 151 // Coordinator State. Upon success, the State will be updated with the result | 192 // Coordinator State. Upon success, the State will be updated with the result |
| 152 // of the archival operation. | 193 // of the archival operation. |
| 153 func (t *archiveTask) archive(c context.Context) (err error) { | 194 func (t *archiveTask) archive(c context.Context) (err error) { |
| 195 // Minimal ArchiveRequest parameters. | |
| 196 t.ar.Path = t.Path | |
| 197 t.ar.TerminalIndex = -1 | |
| 198 | |
| 154 // Generate our archival object managers. | 199 // Generate our archival object managers. |
| 155 bext := t.desc.BinaryFileExt | 200 bext := t.desc.BinaryFileExt |
| 156 if bext == "" { | 201 if bext == "" { |
| 157 bext = "bin" | 202 bext = "bin" |
| 158 } | 203 } |
| 159 | 204 |
| 160 path := t.Path | 205 path := t.Path |
| 161 var streamO, indexO, dataO *gsObject | 206 var streamO, indexO, dataO *gsObject |
| 162 streamO, err = t.newGSObject(c, path, "logstream.entries") | 207 streamO, err = t.newGSObject(c, path, "logstream.entries") |
| 163 if err != nil { | 208 if err != nil { |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 205 defer deleteOnFail(indexO) | 250 defer deleteOnFail(indexO) |
| 206 defer deleteOnFail(dataO) | 251 defer deleteOnFail(dataO) |
| 207 | 252 |
| 208 // Close our GS object managers on exit. If any of them fail to close, m arh | 253 // Close our GS object managers on exit. If any of them fail to close, m arh |
| 209 // the archival as a failure. | 254 // the archival as a failure. |
| 210 closeOM := func(o *gsObject) { | 255 closeOM := func(o *gsObject) { |
| 211 if o == nil { | 256 if o == nil { |
| 212 return | 257 return |
| 213 } | 258 } |
| 214 if ierr := o.Close(); ierr != nil { | 259 if ierr := o.Close(); ierr != nil { |
| 215 » » » err = ierr | 260 » » » err = joinArchiveErrors(err, ierr) |
| 216 } | 261 } |
| 217 } | 262 } |
| 218 defer closeOM(streamO) | 263 defer closeOM(streamO) |
| 219 defer closeOM(indexO) | 264 defer closeOM(indexO) |
| 220 defer closeOM(dataO) | 265 defer closeOM(dataO) |
| 221 | 266 |
| 222 // Read our log entries from intermediate storage. | 267 // Read our log entries from intermediate storage. |
| 223 ss := storageSource{ | 268 ss := storageSource{ |
| 224 Context: c, | 269 Context: c, |
| 225 st: t.Storage, | 270 st: t.Storage, |
| 226 path: types.StreamPath(t.Path), | 271 path: types.StreamPath(t.Path), |
| 227 » » contiguous: t.Complete, | 272 » » contiguous: t.complete, |
| 228 terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), | 273 terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), |
| 229 lastIndex: -1, | 274 lastIndex: -1, |
| 230 } | 275 } |
| 231 | 276 |
| 232 m := archive.Manifest{ | 277 m := archive.Manifest{ |
| 233 Desc: t.desc, | 278 Desc: t.desc, |
| 234 Source: &ss, | 279 Source: &ss, |
| 235 LogWriter: streamO, | 280 LogWriter: streamO, |
| 236 IndexWriter: indexO, | 281 IndexWriter: indexO, |
| 237 DataWriter: dataO, | 282 DataWriter: dataO, |
| 238 StreamIndexRange: t.StreamIndexRange, | 283 StreamIndexRange: t.StreamIndexRange, |
| 239 PrefixIndexRange: t.PrefixIndexRange, | 284 PrefixIndexRange: t.PrefixIndexRange, |
| 240 ByteRange: t.ByteRange, | 285 ByteRange: t.ByteRange, |
| 241 | 286 |
| 242 Logger: log.Get(c), | 287 Logger: log.Get(c), |
| 243 } | 288 } |
| 244 » err = archive.Archive(m) | 289 » if err = archive.Archive(m); err != nil { |
| 245 » if err != nil { | |
| 246 log.WithError(err).Errorf(c, "Failed to archive log stream.") | 290 log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| 247 return | 291 return |
| 248 } | 292 } |
| 249 | 293 |
| 250 t.ar.TerminalIndex = int64(ss.lastIndex) | |
| 251 if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { | 294 if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { |
| 252 » » // Fail, if we were requested to archive only the complete log. | 295 » » // Fail if we were requested to archive only the complete log. W e consider |
| 253 » » if t.Complete { | 296 » » // this a transient error with the expectation that the missing entries will |
| 254 » » » log.Fields{ | 297 » » // show up in future retries. |
| 255 » » » » "terminalIndex": tidx, | 298 » » switch { |
| 256 » » » » "lastIndex": t.ar.TerminalIndex, | 299 » » case t.complete && ss.hasMissingEntries: |
| 257 » » » }.Errorf(c, "Log stream archival stopped prior to termin al index.") | 300 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.") |
| 258 » » » return errors.New("stream finished short of terminal ind ex") | 301 » » » return errors.WrapTransient(errors.New("stream has missi ng entries")) |
| 259 » » } | |
| 260 | 302 |
| 261 » » if t.ar.TerminalIndex < 0 { | 303 » » case ss.logEntryCount == 0: |
| 262 // If our last log index was <0, then no logs were archi ved. | 304 // If our last log index was <0, then no logs were archi ved. |
| 263 log.Warningf(c, "No log entries were archived.") | 305 log.Warningf(c, "No log entries were archived.") |
| 264 » » } else { | 306 |
| 307 » » default: | |
| 265 // Update our terminal index. | 308 // Update our terminal index. |
| 266 log.Fields{ | 309 log.Fields{ |
| 267 » » » » "from": tidx, | 310 » » » » "terminalIndex": ss.lastIndex, |
| 268 » » » » "to": t.ar.TerminalIndex, | 311 » » » » "logEntryCount": ss.logEntryCount, |
| 269 » » » }.Infof(c, "Updated log stream terminal index.") | 312 » » » » "hasMissingEntries": ss.hasMissingEntries, |
| 313 » » » }.Debugf(c, "Finished archiving log stream.") | |
| 270 } | 314 } |
| 271 } | 315 } |
| 272 | 316 |
| 273 // Update our state with archival results. | 317 // Update our state with archival results. |
| 274 » t.ar.Path = t.Path | 318 » t.ar.TerminalIndex = int64(ss.lastIndex) |
| 319 » t.ar.LogEntryCount = ss.logEntryCount | |
| 275 t.ar.StreamSize = streamO.Count() | 320 t.ar.StreamSize = streamO.Count() |
| 276 t.ar.IndexSize = indexO.Count() | 321 t.ar.IndexSize = indexO.Count() |
| 277 t.ar.DataSize = dataO.Count() | 322 t.ar.DataSize = dataO.Count() |
| 278 t.ar.Complete = !ss.hasMissingEntries | |
| 279 return | 323 return |
| 280 } | 324 } |
| 281 | 325 |
| 282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { | 326 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { |
| 283 p := t.GSBase.Concat(path, name) | 327 p := t.GSBase.Concat(path, name) |
| 284 o := gsObject{ | 328 o := gsObject{ |
| 285 gs: t.GSClient, | 329 gs: t.GSClient, |
| 286 bucket: p.Bucket(), | 330 bucket: p.Bucket(), |
| 287 path: p.Filename(), | 331 path: p.Filename(), |
| 288 } | 332 } |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 338 type storageSource struct { | 382 type storageSource struct { |
| 339 context.Context | 383 context.Context |
| 340 | 384 |
| 341 st storage.Storage // the storage instance to read from | 385 st storage.Storage // the storage instance to read from |
| 342 path types.StreamPath // the path of the log stream | 386 path types.StreamPath // the path of the log stream |
| 343 contiguous bool // if true, enforce contiguous entries | 387 contiguous bool // if true, enforce contiguous entries |
| 344 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this | 388 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this |
| 345 | 389 |
| 346 buf []*logpb.LogEntry | 390 buf []*logpb.LogEntry |
| 347 lastIndex types.MessageIndex | 391 lastIndex types.MessageIndex |
| 392 logEntryCount int64 | |
| 348 hasMissingEntries bool // true if some log entries were missing. | 393 hasMissingEntries bool // true if some log entries were missing. |
| 349 } | 394 } |
| 350 | 395 |
| 351 func (s *storageSource) bufferEntries(start types.MessageIndex) error { | 396 func (s *storageSource) bufferEntries(start types.MessageIndex) error { |
| 352 bytes := 0 | 397 bytes := 0 |
| 353 | 398 |
| 354 req := storage.GetRequest{ | 399 req := storage.GetRequest{ |
| 355 Path: s.path, | 400 Path: s.path, |
| 356 Index: start, | 401 Index: start, |
| 357 } | 402 } |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 371 return bytes < storageBufferSize | 416 return bytes < storageBufferSize |
| 372 }) | 417 }) |
| 373 } | 418 } |
| 374 | 419 |
| 375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { | 420 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| 376 if len(s.buf) == 0 { | 421 if len(s.buf) == 0 { |
| 377 s.buf = s.buf[:0] | 422 s.buf = s.buf[:0] |
| 378 if err := s.bufferEntries(s.lastIndex + 1); err != nil { | 423 if err := s.bufferEntries(s.lastIndex + 1); err != nil { |
| 379 if err == storage.ErrDoesNotExist { | 424 if err == storage.ErrDoesNotExist { |
| 380 log.Warningf(s, "Archive target stream does not exist in intermediate storage.") | 425 log.Warningf(s, "Archive target stream does not exist in intermediate storage.") |
| 426 if s.terminalIndex >= 0 { | |
| 427 s.hasMissingEntries = true | |
| 428 } | |
| 381 return nil, archive.ErrEndOfStream | 429 return nil, archive.ErrEndOfStream |
| 382 } | 430 } |
| 383 | 431 |
| 384 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.") | 432 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.") |
| 385 return nil, err | 433 return nil, err |
| 386 } | 434 } |
| 387 } | 435 } |
| 388 | 436 |
| 437 // If we have no more buffered entries, we have exhausted our log stream . | |
| 389 if len(s.buf) == 0 { | 438 if len(s.buf) == 0 { |
| 390 » » log.Fields{ | 439 » » // If we have a terminal index, but we didn't actually emit that index, |
| 391 » » » "lastIndex": s.lastIndex, | 440 » » // mark that we have missing entries. |
| 392 » » }.Debugf(s, "Encountered end of stream.") | 441 » » if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex { |
| 442 » » » log.Fields{ | |
| 443 » » » » "terminalIndex": s.terminalIndex, | |
| 444 » » » » "lastIndex": s.lastIndex, | |
| 445 » » » }.Warningf(s, "Log stream stopped before terminal index. ") | |
| 446 » » » s.hasMissingEntries = true | |
| 447 » » } else { | |
| 448 » » » log.Fields{ | |
| 449 » » » » "lastIndex": s.lastIndex, | |
| 450 » » » }.Debugf(s, "Encountered end of stream.") | |
| 451 » » } | |
| 452 | |
| 393 return nil, archive.ErrEndOfStream | 453 return nil, archive.ErrEndOfStream |
| 394 } | 454 } |
| 395 | 455 |
| 456 // Pop the next log entry and advance the stream. | |
| 396 var le *logpb.LogEntry | 457 var le *logpb.LogEntry |
| 397 le, s.buf = s.buf[0], s.buf[1:] | 458 le, s.buf = s.buf[0], s.buf[1:] |
| 398 | 459 |
| 399 // If we're enforcing a contiguous log stream, error if this LogEntry is not | 460 // If we're enforcing a contiguous log stream, error if this LogEntry is not |
| 400 // contiguous. | 461 // contiguous. |
| 401 sidx := types.MessageIndex(le.StreamIndex) | 462 sidx := types.MessageIndex(le.StreamIndex) |
| 402 nidx := (s.lastIndex + 1) | 463 nidx := (s.lastIndex + 1) |
| 403 if sidx != nidx { | 464 if sidx != nidx { |
| 404 s.hasMissingEntries = true | 465 s.hasMissingEntries = true |
| 405 | 466 » } |
| 406 » » if s.contiguous { | 467 » if s.contiguous && s.hasMissingEntries { |
| 407 » » » log.Fields{ | 468 » » log.Fields{ |
| 408 » » » » "index": sidx, | 469 » » » "index": sidx, |
| 409 » » » » "nextIndex": nidx, | 470 » » » "nextIndex": nidx, |
| 410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing." ) | 471 » » }.Warningf(s, "Non-contiguous log stream while enforcing.") |
| 411 » » » return nil, errors.New("non-contiguous log stream") | 472 » » return nil, archive.ErrEndOfStream |
| 412 » » } | |
| 413 } | 473 } |
| 414 | 474 |
| 415 // If we're enforcing a maximum terminal index, return end of stream if this | 475 // If we're enforcing a maximum terminal index, return end of stream if this |
| 416 // LogEntry exceeds that index. | 476 // LogEntry exceeds that index. |
| 417 if s.terminalIndex >= 0 && sidx > s.terminalIndex { | 477 if s.terminalIndex >= 0 && sidx > s.terminalIndex { |
| 418 log.Fields{ | 478 log.Fields{ |
| 419 "index": sidx, | 479 "index": sidx, |
| 420 "terminalIndex": s.terminalIndex, | 480 "terminalIndex": s.terminalIndex, |
| 421 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") | 481 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") |
| 422 return nil, archive.ErrEndOfStream | 482 return nil, archive.ErrEndOfStream |
| 423 } | 483 } |
| 424 | 484 |
| 425 s.lastIndex = sidx | 485 s.lastIndex = sidx |
| 486 s.logEntryCount++ | |
| 426 return le, nil | 487 return le, nil |
| 427 } | 488 } |
| 489 | |
| 490 func joinArchiveErrors(err error, add error) error { | |
| 491 switch { | |
| 492 case err == nil: | |
| 493 return add | |
| 494 case add == nil: | |
| 495 return err | |
| 496 | |
| 497 default: | |
| 498 merr, ok := err.(errors.MultiError) | |
| 499 if !ok { | |
| 500 merr = errors.MultiError{err, nil}[:1] | |
| 501 } | |
| 502 merr = append(merr, add) | |
| 503 return merr | |
| 504 } | |
| 505 } | |
| 506 | |
| 507 func isTransientError(err error) bool { | |
| 508 if errors.IsTransient(err) { | |
| 509 return true | |
| 510 } | |
| 511 | |
| 512 if merr, ok := err.(errors.MultiError); ok { | |
| 513 for _, ierr := range merr { | |
| 514 if isTransientError(ierr) { | |
| 515 return true | |
| 516 } | |
| 517 } | |
| 518 } | |
| 519 return false | |
| 520 } | |
| OLD | NEW |