Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time"
9 10
10 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 12 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
12 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/gcloud/gs" 14 "github.com/luci/luci-go/common/gcloud/gs"
14 "github.com/luci/luci-go/common/logdog/types" 15 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/proto/logdog/logpb" 17 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "github.com/luci/luci-go/server/logdog/archive" 18 "github.com/luci/luci-go/server/logdog/archive"
18 "github.com/luci/luci-go/server/logdog/storage" 19 "github.com/luci/luci-go/server/logdog/storage"
(...skipping 26 matching lines...) Expand all
45 // ByteRange is the maximum number of stream data bytes in between index 46 // ByteRange is the maximum number of stream data bytes in between index
46 // entries. See archive.Manifest for more information. 47 // entries. See archive.Manifest for more information.
47 ByteRange int 48 ByteRange int
48 } 49 }
49 50
50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used 51 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
51 // to during archival. This should be greater than the maximum LogEntry size. 52 // to during archival. This should be greater than the maximum LogEntry size.
52 const storageBufferSize = types.MaxLogEntryDataSize * 64 53 const storageBufferSize = types.MaxLogEntryDataSize * 64
53 54
54 // ArchiveTask processes and executes a single log stream archive task. 55 // ArchiveTask processes and executes a single log stream archive task.
55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { 56 //
57 // It returns true on success (delete the task) and false on failure (don't
58 // delete the task). The return value of true should only be used if the task
59 // is truely complete, as deleting a named task in the task queue will cause it
Vadim Sh. 2016/04/07 01:21:32 typo: truly also you don't use named task (anymor
dnj 2016/04/11 17:20:04 Done.
60 // to not be reschedulable for a long period of time (10 days).
61 //
62 // If the supplied Context is Done, operation may terminate before completion,
63 // returning the Context's error.
64 func (a *Archivist) ArchiveTask(c context.Context, desc []byte, age time.Duratio n) bool {
56 var task logdog.ArchiveTask 65 var task logdog.ArchiveTask
57 if err := proto.Unmarshal(desc, &task); err != nil { 66 if err := proto.Unmarshal(desc, &task); err != nil {
58 log.WithError(err).Errorf(c, "Failed to decode archive task.") 67 log.WithError(err).Errorf(c, "Failed to decode archive task.")
59 » » return err 68 » » return false
60 } 69 }
61 » return a.Archive(c, &task) 70 » if err := a.Archive(c, &task, age); err != nil {
71 » » log.WithError(err).Errorf(c, "Archival task failed.")
72 » » return false
73 » }
74 » return true
62 } 75 }
63 76
64 // Archive archives a single log stream. If unsuccessful, an error is returned. 77 // Archive processes and executes a single log stream archive task.
65 // 78 //
66 // This error may be wrapped in errors.Transient if it is believed to have been 79 // It returns an error if the archival isn't complete and confirmed by the
67 // caused by a transient failure. 80 // Coordinator.
68 // 81 //
69 // If the supplied Context is Done, operation may terminate before completion, 82 // If the supplied Context is Done, operation may terminate before completion,
70 // returning the Context's error. 83 // returning the Context's error.
71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { 84 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask, age time.D uration) error {
85 » complete := (age <= t.CompletePeriod.Duration())
86
72 // Load the log stream's current state. If it is already archived, we wi ll 87 // Load the log stream's current state. If it is already archived, we wi ll
73 // return an immediate success. 88 // return an immediate success.
74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ 89 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
75 Path: t.Path, 90 Path: t.Path,
76 Desc: true, 91 Desc: true,
77 }) 92 })
78 switch { 93 switch {
79 case err != nil: 94 case err != nil:
80 » » log.WithError(err).Errorf(c, "Failed to load log stream.") 95 » » return fmt.Errorf("failed to load log stream: %v", err)
81 » » return err
82 case ls.State == nil: 96 case ls.State == nil:
83 » » return errors.New("missing state") 97 » » return errors.New("log stream did not include state")
84 case ls.State.ProtoVersion != logpb.Version: 98 case ls.State.ProtoVersion != logpb.Version:
85 log.Fields{ 99 log.Fields{
86 "protoVersion": ls.State.ProtoVersion, 100 "protoVersion": ls.State.ProtoVersion,
87 "expectedVersion": logpb.Version, 101 "expectedVersion": logpb.Version,
88 }.Errorf(c, "Unsupported log stream protobuf version.") 102 }.Errorf(c, "Unsupported log stream protobuf version.")
89 » » return errors.New("unsupported protobuf version") 103 » » return errors.New("unsupported log stream protobuf version")
90 case ls.Desc == nil: 104 case ls.Desc == nil:
91 » » return errors.New("missing descriptor") 105 » » return errors.New("log stream did not include a descriptor")
92 106
93 case ls.State.Purged: 107 case ls.State.Purged:
94 log.Warningf(c, "Log stream is purged.") 108 log.Warningf(c, "Log stream is purged.")
95 return nil 109 return nil
96 case ls.State.Archived: 110 case ls.State.Archived:
97 log.Infof(c, "Log stream is already archived.") 111 log.Infof(c, "Log stream is already archived.")
98 return nil 112 return nil
113
114 case complete && ls.State.TerminalIndex < 0:
115 return errors.WrapTransient(errors.New("cannot archive complete stream with no terminal index"))
99 } 116 }
100 117
101 // Deserialize and validate the descriptor protobuf. 118 // Deserialize and validate the descriptor protobuf.
102 var desc logpb.LogStreamDescriptor 119 var desc logpb.LogStreamDescriptor
103 if err := proto.Unmarshal(ls.Desc, &desc); err != nil { 120 if err := proto.Unmarshal(ls.Desc, &desc); err != nil {
104 log.Fields{ 121 log.Fields{
105 log.ErrorKey: err, 122 log.ErrorKey: err,
106 "protoVersion": ls.State.ProtoVersion, 123 "protoVersion": ls.State.ProtoVersion,
107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") 124 }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
108 return err 125 return err
109 } 126 }
110 127
111 task := &archiveTask{ 128 task := &archiveTask{
112 Archivist: a, 129 Archivist: a,
113 ArchiveTask: t, 130 ArchiveTask: t,
131 complete: complete,
114 ls: ls, 132 ls: ls,
115 desc: &desc, 133 desc: &desc,
116 } 134 }
117 » if err := task.archive(c); err != nil { 135 » aerr := task.archive(c)
118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati on.") 136 » if aerr != nil {
119 » » return err 137 » » // If this is a transient error, fail immediately with it. Use
138 » » // isTransientError because err can be an errors.MultiError.
139 » » //
140 » » // Returning a transient error here will cause task queue to res chedule.
141 » » if isTransientError(aerr) {
142 » » » log.WithError(aerr).Errorf(c, "TRANSIENT error during ar chival operation.")
143 » » » return aerr
144 » » }
145
146 » » // This is a non-transient error, so we will report it as an err or to the
147 » » // service. It may accept the archival (success), in which case we'll delete
148 » » // our task, or respond with an error code (Aborted) indicating that this
149 » » // archival run should be retried and that we should not delete the task.
150 » » log.WithError(aerr).Errorf(c, "Archival failed with non-transien t error.")
151 » » task.ar.Error = true
120 } 152 }
153
121 log.Fields{ 154 log.Fields{
122 "streamURL": task.ar.StreamUrl, 155 "streamURL": task.ar.StreamUrl,
123 "indexURL": task.ar.IndexUrl, 156 "indexURL": task.ar.IndexUrl,
124 "dataURL": task.ar.DataUrl, 157 "dataURL": task.ar.DataUrl,
125 "terminalIndex": task.ar.TerminalIndex, 158 "terminalIndex": task.ar.TerminalIndex,
126 » » "complete": task.ar.Complete, 159 » » "logEntryCount": task.ar.LogEntryCount,
127 » }.Debugf(c, "Finished archive construction.") 160 » » "hadError": task.ar.Error,
161 » » "complete": task.ar.Complete(),
162 » }.Debugf(c, "Finished archive construction; reporting archive state.")
128 163
129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { 164 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") 165 » » // All service errors will be reported and returned as transient . We will
131 » » return err 166 » » // NOT delete this archival task unless the service has explicit ly
167 » » // instructed us to via success code.
168 » » log.WithError(err).Errorf(c, "Failed to report archive state.")
169 » » return errors.WrapTransient(errors.New("failed to report archive state"))
132 } 170 }
133 return nil 171 return nil
134 } 172 }
135 173
136 // archiveTask is the set of parameters for a single archival. 174 // archiveTask is the set of parameters for a single archival.
137 type archiveTask struct { 175 type archiveTask struct {
138 *Archivist 176 *Archivist
139 *logdog.ArchiveTask 177 *logdog.ArchiveTask
140 178
179 // complete, if true, will cause incomplete streams to return transient
180 // errors.
181 complete bool
141 // ls is the log stream state. 182 // ls is the log stream state.
142 ls *logdog.LoadStreamResponse 183 ls *logdog.LoadStreamResponse
143 // desc is the unmarshaled log stream descriptor. 184 // desc is the unmarshaled log stream descriptor.
144 desc *logpb.LogStreamDescriptor 185 desc *logpb.LogStreamDescriptor
145 186
146 // ar will be populated during archive construction. 187 // ar will be populated during archive construction.
147 ar logdog.ArchiveStreamRequest 188 ar logdog.ArchiveStreamRequest
148 } 189 }
149 190
150 // archiveState performs the archival operation on a stream described by a 191 // archiveState performs the archival operation on a stream described by a
151 // Coordinator State. Upon success, the State will be updated with the result 192 // Coordinator State. Upon success, the State will be updated with the result
152 // of the archival operation. 193 // of the archival operation.
153 func (t *archiveTask) archive(c context.Context) (err error) { 194 func (t *archiveTask) archive(c context.Context) (err error) {
195 // Minimal ArchiveRequest parameters.
196 t.ar.Path = t.Path
197 t.ar.TerminalIndex = -1
198
154 // Generate our archival object managers. 199 // Generate our archival object managers.
155 bext := t.desc.BinaryFileExt 200 bext := t.desc.BinaryFileExt
156 if bext == "" { 201 if bext == "" {
157 bext = "bin" 202 bext = "bin"
158 } 203 }
159 204
160 path := t.Path 205 path := t.Path
161 var streamO, indexO, dataO *gsObject 206 var streamO, indexO, dataO *gsObject
162 streamO, err = t.newGSObject(c, path, "logstream.entries") 207 streamO, err = t.newGSObject(c, path, "logstream.entries")
163 if err != nil { 208 if err != nil {
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
205 defer deleteOnFail(indexO) 250 defer deleteOnFail(indexO)
206 defer deleteOnFail(dataO) 251 defer deleteOnFail(dataO)
207 252
208 // Close our GS object managers on exit. If any of them fail to close, m arh 253 // Close our GS object managers on exit. If any of them fail to close, m arh
209 // the archival as a failure. 254 // the archival as a failure.
210 closeOM := func(o *gsObject) { 255 closeOM := func(o *gsObject) {
211 if o == nil { 256 if o == nil {
212 return 257 return
213 } 258 }
214 if ierr := o.Close(); ierr != nil { 259 if ierr := o.Close(); ierr != nil {
215 » » » err = ierr 260 » » » err = joinArchiveErrors(err, ierr)
216 } 261 }
217 } 262 }
218 defer closeOM(streamO) 263 defer closeOM(streamO)
219 defer closeOM(indexO) 264 defer closeOM(indexO)
220 defer closeOM(dataO) 265 defer closeOM(dataO)
221 266
222 // Read our log entries from intermediate storage. 267 // Read our log entries from intermediate storage.
223 ss := storageSource{ 268 ss := storageSource{
224 Context: c, 269 Context: c,
225 st: t.Storage, 270 st: t.Storage,
226 path: types.StreamPath(t.Path), 271 path: types.StreamPath(t.Path),
227 » » contiguous: t.Complete, 272 » » contiguous: t.complete,
228 terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), 273 terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex),
229 lastIndex: -1, 274 lastIndex: -1,
230 } 275 }
231 276
232 m := archive.Manifest{ 277 m := archive.Manifest{
233 Desc: t.desc, 278 Desc: t.desc,
234 Source: &ss, 279 Source: &ss,
235 LogWriter: streamO, 280 LogWriter: streamO,
236 IndexWriter: indexO, 281 IndexWriter: indexO,
237 DataWriter: dataO, 282 DataWriter: dataO,
238 StreamIndexRange: t.StreamIndexRange, 283 StreamIndexRange: t.StreamIndexRange,
239 PrefixIndexRange: t.PrefixIndexRange, 284 PrefixIndexRange: t.PrefixIndexRange,
240 ByteRange: t.ByteRange, 285 ByteRange: t.ByteRange,
241 286
242 Logger: log.Get(c), 287 Logger: log.Get(c),
243 } 288 }
244 » err = archive.Archive(m) 289 » if err = archive.Archive(m); err != nil {
245 » if err != nil {
246 log.WithError(err).Errorf(c, "Failed to archive log stream.") 290 log.WithError(err).Errorf(c, "Failed to archive log stream.")
247 return 291 return
248 } 292 }
249 293
250 t.ar.TerminalIndex = int64(ss.lastIndex)
251 if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { 294 if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex {
252 » » // Fail, if we were requested to archive only the complete log. 295 » » // Fail if we were requested to archive only the complete log. W e consider
253 » » if t.Complete { 296 » » // this a transient error with the expectation that the missing entries will
254 » » » log.Fields{ 297 » » // show up in future retries.
255 » » » » "terminalIndex": tidx, 298 » » switch {
256 » » » » "lastIndex": t.ar.TerminalIndex, 299 » » case t.complete && ss.hasMissingEntries:
257 » » » }.Errorf(c, "Log stream archival stopped prior to termin al index.") 300 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.")
258 » » » return errors.New("stream finished short of terminal ind ex") 301 » » » return errors.WrapTransient(errors.New("stream has missi ng entries"))
259 » » }
260 302
261 » » if t.ar.TerminalIndex < 0 { 303 » » case ss.logEntryCount == 0:
262 // If our last log index was <0, then no logs were archi ved. 304 // If our last log index was <0, then no logs were archi ved.
263 log.Warningf(c, "No log entries were archived.") 305 log.Warningf(c, "No log entries were archived.")
264 » » } else { 306
307 » » default:
265 // Update our terminal index. 308 // Update our terminal index.
266 log.Fields{ 309 log.Fields{
267 » » » » "from": tidx, 310 » » » » "terminalIndex": ss.lastIndex,
268 » » » » "to": t.ar.TerminalIndex, 311 » » » » "logEntryCount": ss.logEntryCount,
269 » » » }.Infof(c, "Updated log stream terminal index.") 312 » » » » "hasMissingEntries": ss.hasMissingEntries,
313 » » » }.Debugf(c, "Finished archiving log stream.")
270 } 314 }
271 } 315 }
272 316
273 // Update our state with archival results. 317 // Update our state with archival results.
274 » t.ar.Path = t.Path 318 » t.ar.TerminalIndex = int64(ss.lastIndex)
319 » t.ar.LogEntryCount = ss.logEntryCount
275 t.ar.StreamSize = streamO.Count() 320 t.ar.StreamSize = streamO.Count()
276 t.ar.IndexSize = indexO.Count() 321 t.ar.IndexSize = indexO.Count()
277 t.ar.DataSize = dataO.Count() 322 t.ar.DataSize = dataO.Count()
278 t.ar.Complete = !ss.hasMissingEntries
279 return 323 return
280 } 324 }
281 325
282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { 326 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) {
283 p := t.GSBase.Concat(path, name) 327 p := t.GSBase.Concat(path, name)
284 o := gsObject{ 328 o := gsObject{
285 gs: t.GSClient, 329 gs: t.GSClient,
286 bucket: p.Bucket(), 330 bucket: p.Bucket(),
287 path: p.Filename(), 331 path: p.Filename(),
288 } 332 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
338 type storageSource struct { 382 type storageSource struct {
339 context.Context 383 context.Context
340 384
341 st storage.Storage // the storage instance to read from 385 st storage.Storage // the storage instance to read from
342 path types.StreamPath // the path of the log stream 386 path types.StreamPath // the path of the log stream
343 contiguous bool // if true, enforce contiguous entries 387 contiguous bool // if true, enforce contiguous entries
344 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this 388 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
345 389
346 buf []*logpb.LogEntry 390 buf []*logpb.LogEntry
347 lastIndex types.MessageIndex 391 lastIndex types.MessageIndex
392 logEntryCount int64
348 hasMissingEntries bool // true if some log entries were missing. 393 hasMissingEntries bool // true if some log entries were missing.
349 } 394 }
350 395
351 func (s *storageSource) bufferEntries(start types.MessageIndex) error { 396 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
352 bytes := 0 397 bytes := 0
353 398
354 req := storage.GetRequest{ 399 req := storage.GetRequest{
355 Path: s.path, 400 Path: s.path,
356 Index: start, 401 Index: start,
357 } 402 }
(...skipping 13 matching lines...) Expand all
371 return bytes < storageBufferSize 416 return bytes < storageBufferSize
372 }) 417 })
373 } 418 }
374 419
375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { 420 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
376 if len(s.buf) == 0 { 421 if len(s.buf) == 0 {
377 s.buf = s.buf[:0] 422 s.buf = s.buf[:0]
378 if err := s.bufferEntries(s.lastIndex + 1); err != nil { 423 if err := s.bufferEntries(s.lastIndex + 1); err != nil {
379 if err == storage.ErrDoesNotExist { 424 if err == storage.ErrDoesNotExist {
380 log.Warningf(s, "Archive target stream does not exist in intermediate storage.") 425 log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
426 if s.terminalIndex >= 0 {
427 s.hasMissingEntries = true
428 }
381 return nil, archive.ErrEndOfStream 429 return nil, archive.ErrEndOfStream
382 } 430 }
383 431
384 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.") 432 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.")
385 return nil, err 433 return nil, err
386 } 434 }
387 } 435 }
388 436
437 // If we have no more buffered entries, we have exhausted our log stream .
389 if len(s.buf) == 0 { 438 if len(s.buf) == 0 {
390 » » log.Fields{ 439 » » // If we have a terminal index, but we didn't actually emit that index,
391 » » » "lastIndex": s.lastIndex, 440 » » // mark that we have missing entries.
392 » » }.Debugf(s, "Encountered end of stream.") 441 » » if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex {
442 » » » log.Fields{
443 » » » » "terminalIndex": s.terminalIndex,
444 » » » » "lastIndex": s.lastIndex,
445 » » » }.Warningf(s, "Log stream stopped before terminal index. ")
446 » » » s.hasMissingEntries = true
447 » » } else {
448 » » » log.Fields{
449 » » » » "lastIndex": s.lastIndex,
450 » » » }.Debugf(s, "Encountered end of stream.")
451 » » }
452
393 return nil, archive.ErrEndOfStream 453 return nil, archive.ErrEndOfStream
394 } 454 }
395 455
456 // Pop the next log entry and advance the stream.
396 var le *logpb.LogEntry 457 var le *logpb.LogEntry
397 le, s.buf = s.buf[0], s.buf[1:] 458 le, s.buf = s.buf[0], s.buf[1:]
398 459
399 // If we're enforcing a contiguous log stream, error if this LogEntry is not 460 // If we're enforcing a contiguous log stream, error if this LogEntry is not
400 // contiguous. 461 // contiguous.
401 sidx := types.MessageIndex(le.StreamIndex) 462 sidx := types.MessageIndex(le.StreamIndex)
402 nidx := (s.lastIndex + 1) 463 nidx := (s.lastIndex + 1)
403 if sidx != nidx { 464 if sidx != nidx {
404 s.hasMissingEntries = true 465 s.hasMissingEntries = true
405 466 » }
406 » » if s.contiguous { 467 » if s.contiguous && s.hasMissingEntries {
407 » » » log.Fields{ 468 » » log.Fields{
408 » » » » "index": sidx, 469 » » » "index": sidx,
409 » » » » "nextIndex": nidx, 470 » » » "nextIndex": nidx,
410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing." ) 471 » » }.Warningf(s, "Non-contiguous log stream while enforcing.")
411 » » » return nil, errors.New("non-contiguous log stream") 472 » » return nil, archive.ErrEndOfStream
412 » » }
413 } 473 }
414 474
415 // If we're enforcing a maximum terminal index, return end of stream if this 475 // If we're enforcing a maximum terminal index, return end of stream if this
416 // LogEntry exceeds that index. 476 // LogEntry exceeds that index.
417 if s.terminalIndex >= 0 && sidx > s.terminalIndex { 477 if s.terminalIndex >= 0 && sidx > s.terminalIndex {
418 log.Fields{ 478 log.Fields{
419 "index": sidx, 479 "index": sidx,
420 "terminalIndex": s.terminalIndex, 480 "terminalIndex": s.terminalIndex,
421 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 481 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
422 return nil, archive.ErrEndOfStream 482 return nil, archive.ErrEndOfStream
423 } 483 }
424 484
425 s.lastIndex = sidx 485 s.lastIndex = sidx
486 s.logEntryCount++
426 return le, nil 487 return le, nil
427 } 488 }
489
490 func joinArchiveErrors(err error, add error) error {
491 switch {
492 case err == nil:
493 return add
494 case add == nil:
495 return err
496
497 default:
498 merr, ok := err.(errors.MultiError)
499 if !ok {
500 merr = errors.MultiError{err, nil}[:1]
501 }
502 merr = append(merr, add)
503 return merr
504 }
505 }
506
507 func isTransientError(err error) bool {
508 if errors.IsTransient(err) {
509 return true
510 }
511
512 if merr, ok := err.(errors.MultiError); ok {
513 for _, ierr := range merr {
514 if isTransientError(ierr) {
515 return true
516 }
517 }
518 }
519 return false
520 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698