| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "strings" | 9 "strings" |
| 11 "sync" | 10 "sync" |
| 12 "testing" | 11 "testing" |
| 13 "time" | 12 "time" |
| 14 | 13 |
| 15 "github.com/golang/protobuf/proto" | 14 "github.com/golang/protobuf/proto" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 15 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/clock/testclock" | 16 "github.com/luci/luci-go/common/clock/testclock" |
| 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/gcloud/gs" | 18 "github.com/luci/luci-go/common/gcloud/gs" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 "github.com/luci/luci-go/common/proto/google" | 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 "github.com/luci/luci-go/server/logdog/storage" | 22 "github.com/luci/luci-go/server/logdog/storage" |
| 23 "github.com/luci/luci-go/server/logdog/storage/memory" | 23 "github.com/luci/luci-go/server/logdog/storage/memory" |
| 24 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 25 "google.golang.org/grpc" | 25 "google.golang.org/grpc" |
| 26 | 26 |
| 27 . "github.com/luci/luci-go/common/testing/assertions" | 27 . "github.com/luci/luci-go/common/testing/assertions" |
| (...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 218 Path: string(desc.Path()), | 218 Path: string(desc.Path()), |
| 219 ProtoVersion: logpb.Version, | 219 ProtoVersion: logpb.Version, |
| 220 TerminalIndex: -1, | 220 TerminalIndex: -1, |
| 221 Archived: false, | 221 Archived: false, |
| 222 Purged: false, | 222 Purged: false, |
| 223 }, | 223 }, |
| 224 Desc: descBytes, | 224 Desc: descBytes, |
| 225 } | 225 } |
| 226 | 226 |
| 227 var archiveRequest *logdog.ArchiveStreamRequest | 227 var archiveRequest *logdog.ArchiveStreamRequest |
| 228 var archiveStreamErr error |
| 228 sc := testServicesClient{ | 229 sc := testServicesClient{ |
| 229 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 230 lsCallback: func(req *logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 230 return &stream, nil | 231 return &stream, nil |
| 231 }, | 232 }, |
| 232 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ | 233 asCallback: func(req *logdog.ArchiveStreamRequest) error
{ |
| 233 archiveRequest = req | 234 archiveRequest = req |
| 234 » » » » return nil | 235 » » » » return archiveStreamErr |
| 235 }, | 236 }, |
| 236 } | 237 } |
| 237 | 238 |
| 238 ar := Archivist{ | 239 ar := Archivist{ |
| 239 Service: &sc, | 240 Service: &sc, |
| 240 Storage: &st, | 241 Storage: &st, |
| 241 GSClient: &gsc, | 242 GSClient: &gsc, |
| 242 GSBase: gs.Path("gs://archive-test/path/to/archive/"),
// Extra slashes to test concatenation. | 243 GSBase: gs.Path("gs://archive-test/path/to/archive/"),
// Extra slashes to test concatenation. |
| 243 } | 244 } |
| 244 | 245 |
| 246 expired := 10 * time.Minute |
| 245 task := logdog.ArchiveTask{ | 247 task := logdog.ArchiveTask{ |
| 246 » » » Path: stream.State.Path, | 248 » » » Path: stream.State.Path, |
| 247 » » » Complete: true, | 249 » » » CompletePeriod: google.NewDuration(expired), |
| 248 } | 250 } |
| 251 expired++ // This represents a time PAST CompletePeriod. |
| 249 | 252 |
| 250 gsURL := func(p string) string { | 253 gsURL := func(p string) string { |
| 251 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) | 254 return fmt.Sprintf("gs://archive-test/path/to/archive/%s
/%s", desc.Path(), p) |
| 252 } | 255 } |
| 253 | 256 |
| 254 // hasStreams can be called to check that the retained archiveRe
quest had | 257 // hasStreams can be called to check that the retained archiveRe
quest had |
| 255 // data sizes for the named archive stream types. | 258 // data sizes for the named archive stream types. |
| 256 // | 259 // |
| 257 // After checking, the values are set to zero. This allows us to
use | 260 // After checking, the values are set to zero. This allows us to
use |
| 258 // ShouldEqual without hardcoding specific archival sizes into t
he results. | 261 // ShouldEqual without hardcoding specific archival sizes into t
he results. |
| 259 hasStreams := func(log, index, data bool) bool { | 262 hasStreams := func(log, index, data bool) bool { |
| 260 So(archiveRequest, ShouldNotBeNil) | 263 So(archiveRequest, ShouldNotBeNil) |
| 261 if (log && archiveRequest.StreamSize <= 0) || | 264 if (log && archiveRequest.StreamSize <= 0) || |
| 262 (index && archiveRequest.IndexSize <= 0) || | 265 (index && archiveRequest.IndexSize <= 0) || |
| 263 (data && archiveRequest.DataSize <= 0) { | 266 (data && archiveRequest.DataSize <= 0) { |
| 264 return false | 267 return false |
| 265 } | 268 } |
| 266 | 269 |
| 267 archiveRequest.StreamSize = 0 | 270 archiveRequest.StreamSize = 0 |
| 268 archiveRequest.IndexSize = 0 | 271 archiveRequest.IndexSize = 0 |
| 269 archiveRequest.DataSize = 0 | 272 archiveRequest.DataSize = 0 |
| 270 return true | 273 return true |
| 271 } | 274 } |
| 272 | 275 |
| 273 Convey(`Will fail to archive if the specified stream state could
not be loaded.`, func() { | 276 Convey(`Will fail to archive if the specified stream state could
not be loaded.`, func() { |
| 274 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { | 277 sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog
.LoadStreamResponse, error) { |
| 275 return nil, errors.New("does not exist") | 278 return nil, errors.New("does not exist") |
| 276 } | 279 } |
| 277 » » » So(ar.Archive(c, &task), ShouldErrLike, "does not exist"
) | 280 » » » So(ar.Archive(c, &task, 0), ShouldErrLike, "does not exi
st") |
| 278 }) | 281 }) |
| 279 | 282 |
| 280 Convey(`Will refrain from archiving if the stream is already arc
hived.`, func() { | 283 Convey(`Will refrain from archiving if the stream is already arc
hived.`, func() { |
| 281 stream.State.Archived = true | 284 stream.State.Archived = true |
| 282 » » » So(ar.Archive(c, &task), ShouldBeNil) | 285 » » » So(ar.Archive(c, &task, 0), ShouldBeNil) |
| 283 So(archiveRequest, ShouldBeNil) | 286 So(archiveRequest, ShouldBeNil) |
| 284 }) | 287 }) |
| 285 | 288 |
| 286 Convey(`Will refrain from archiving if the stream is purged.`, f
unc() { | 289 Convey(`Will refrain from archiving if the stream is purged.`, f
unc() { |
| 287 stream.State.Purged = true | 290 stream.State.Purged = true |
| 288 » » » So(ar.Archive(c, &task), ShouldBeNil) | 291 » » » So(ar.Archive(c, &task, 0), ShouldBeNil) |
| 289 So(archiveRequest, ShouldBeNil) | 292 So(archiveRequest, ShouldBeNil) |
| 290 }) | 293 }) |
| 291 | 294 |
| 292 // Weird case: the log has been marked for archival, has not bee
n | 295 // Weird case: the log has been marked for archival, has not bee
n |
| 293 » » // terminated, and is within its completeness delay. This task w
ill not | 296 » » // terminated, and is within its completeness delay. This task s
hould not |
| 294 » » // have been dispatched by our archive cron, but let's assert th
at it | 297 » » // have been dispatched by our expired archive cron, but let's a
ssert that |
| 295 » » // behaves correctly regardless. | 298 » » // it behaves correctly regardless. |
| 296 » » Convey(`Will succeed if the log stream had no entries and no ter
minal index.`, func() { | 299 » » Convey(`Will refuse to archive a complete steram with no termina
l index.`, func() { |
| 297 » » » So(ar.Archive(c, &task), ShouldBeNil) | 300 » » » err := ar.Archive(c, &task, 0) |
| 298 | 301 » » » So(err, ShouldErrLike, "cannot archive complete stream w
ith no terminal index") |
| 299 » » » So(hasStreams(true, true, false), ShouldBeTrue) | 302 » » » So(errors.IsTransient(err), ShouldBeTrue) |
| 300 » » » So(archiveRequest, ShouldResemble, &logdog.ArchiveStream
Request{ | |
| 301 » » » » Path: task.Path, | |
| 302 » » » » Complete: true, | |
| 303 » » » » TerminalIndex: -1, | |
| 304 | |
| 305 » » » » StreamUrl: gsURL("logstream.entries"), | |
| 306 » » » » IndexUrl: gsURL("logstream.index"), | |
| 307 » » » » DataUrl: gsURL("data.bin"), | |
| 308 » » » }) | |
| 309 }) | 303 }) |
| 310 | 304 |
| 311 Convey(`With terminal index "3"`, func() { | 305 Convey(`With terminal index "3"`, func() { |
| 312 stream.State.TerminalIndex = 3 | 306 stream.State.TerminalIndex = 3 |
| 313 | 307 |
| 314 » » » Convey(`Will fail if the log stream had a terminal index
and no entries.`, func() { | 308 » » » Convey(`Will fail transiently if the log stream had no e
ntries.`, func() { |
| 315 » » » » So(ar.Archive(c, &task), ShouldErrLike, "stream
finished short of terminal index") | 309 » » » » err := ar.Archive(c, &task, 0) |
| 310 » » » » So(err, ShouldErrLike, "stream has missing entri
es") |
| 311 » » » » So(errors.IsTransient(err), ShouldBeTrue) |
| 316 }) | 312 }) |
| 317 | 313 |
| 318 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { | 314 Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`
, func() { |
| 319 addTestEntry(0, 1, 2, 4) | 315 addTestEntry(0, 1, 2, 4) |
| 320 » » » » So(ar.Archive(c, &task), ShouldErrLike, "non-con
tiguous log stream") | 316 » » » » So(ar.Archive(c, &task, 0), ShouldErrLike, "stre
am has missing entries") |
| 321 }) | 317 }) |
| 322 | 318 |
| 323 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { | 319 Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopp
ing at the terminal index.`, func() { |
| 324 addTestEntry(0, 1, 2, 3, 4) | 320 addTestEntry(0, 1, 2, 3, 4) |
| 325 » » » » So(ar.Archive(c, &task), ShouldBeNil) | 321 » » » » So(ar.Archive(c, &task, 0), ShouldBeNil) |
| 326 | 322 |
| 327 So(hasStreams(true, true, true), ShouldBeTrue) | 323 So(hasStreams(true, true, true), ShouldBeTrue) |
| 328 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ | 324 So(archiveRequest, ShouldResemble, &logdog.Archi
veStreamRequest{ |
| 329 Path: task.Path, | 325 Path: task.Path, |
| 330 » » » » » Complete: true, | 326 » » » » » LogEntryCount: 4, |
| 331 TerminalIndex: 3, | 327 TerminalIndex: 3, |
| 332 | 328 |
| 333 StreamUrl: gsURL("logstream.entries"), | 329 StreamUrl: gsURL("logstream.entries"), |
| 334 IndexUrl: gsURL("logstream.index"), | 330 IndexUrl: gsURL("logstream.index"), |
| 335 DataUrl: gsURL("data.bin"), | 331 DataUrl: gsURL("data.bin"), |
| 336 }) | 332 }) |
| 337 }) | 333 }) |
| 334 |
| 335 Convey(`When a transient archival error occurs, will ret
urn it.`, func() { |
| 336 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.WrapTransient(errors.New("test error")) } |
| 337 |
| 338 err := ar.Archive(c, &task, 0) |
| 339 So(err, ShouldErrLike, "test error") |
| 340 So(errors.IsTransient(err), ShouldBeTrue) |
| 341 }) |
| 342 |
| 343 Convey(`When a non-transient archival error occurs, will
report it`, func() { |
| 344 gsc.newWriterErr = func(*testGSWriter) error { r
eturn errors.New("test error") } |
| 345 |
| 346 Convey(`If remote returns an error, forwards a t
ransient error.`, func() { |
| 347 archiveStreamErr = errors.New("test erro
r") |
| 348 |
| 349 err := ar.Archive(c, &task, 0) |
| 350 So(err, ShouldErrLike, "failed to report
archive state") |
| 351 So(errors.IsTransient(err), ShouldBeTrue
) |
| 352 |
| 353 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 354 Path: task.Path, |
| 355 Error: true, |
| 356 TerminalIndex: -1, |
| 357 }) |
| 358 }) |
| 359 |
| 360 Convey(`If remote returns success, returns nil.`
, func() { |
| 361 So(ar.Archive(c, &task, 0), ShouldBeNil) |
| 362 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 363 Path: task.Path, |
| 364 Error: true, |
| 365 TerminalIndex: -1, |
| 366 }) |
| 367 }) |
| 368 }) |
| 338 }) | 369 }) |
| 339 | 370 |
| 340 Convey(`When not enforcing stream completeness`, func() { | 371 Convey(`When not enforcing stream completeness`, func() { |
| 341 task.Complete = false | |
| 342 | 372 |
| 343 Convey(`With no terminal index`, func() { | 373 Convey(`With no terminal index`, func() { |
| 344 Convey(`Will successfully archive if there are n
o entries.`, func() { | 374 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 345 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 375 » » » » » So(ar.Archive(c, &task, expired), Should
BeNil) |
| 346 | 376 |
| 347 So(hasStreams(true, true, false), Should
BeTrue) | 377 So(hasStreams(true, true, false), Should
BeTrue) |
| 348 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 378 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 349 Path: task.Path, | 379 Path: task.Path, |
| 350 » » » » » » Complete: true, | 380 » » » » » » LogEntryCount: 0, |
| 351 TerminalIndex: -1, | 381 TerminalIndex: -1, |
| 352 | 382 |
| 353 StreamUrl: gsURL("logstream.entr
ies"), | 383 StreamUrl: gsURL("logstream.entr
ies"), |
| 354 IndexUrl: gsURL("logstream.inde
x"), | 384 IndexUrl: gsURL("logstream.inde
x"), |
| 355 DataUrl: gsURL("data.bin"), | 385 DataUrl: gsURL("data.bin"), |
| 356 }) | 386 }) |
| 357 }) | 387 }) |
| 358 | 388 |
| 359 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { | 389 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index.`, func() { |
| 360 addTestEntry(0, 1, 2, 4) | 390 addTestEntry(0, 1, 2, 4) |
| 361 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 391 » » » » » So(ar.Archive(c, &task, expired), Should
BeNil) |
| 362 | 392 |
| 363 So(hasStreams(true, true, true), ShouldB
eTrue) | 393 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 364 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 394 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 365 Path: task.Path, | 395 Path: task.Path, |
| 366 » » » » » » Complete: false, | 396 » » » » » » LogEntryCount: 4, |
| 367 TerminalIndex: 4, | 397 TerminalIndex: 4, |
| 368 | 398 |
| 369 StreamUrl: gsURL("logstream.entr
ies"), | 399 StreamUrl: gsURL("logstream.entr
ies"), |
| 370 IndexUrl: gsURL("logstream.inde
x"), | 400 IndexUrl: gsURL("logstream.inde
x"), |
| 371 DataUrl: gsURL("data.bin"), | 401 DataUrl: gsURL("data.bin"), |
| 372 }) | 402 }) |
| 373 }) | 403 }) |
| 374 }) | 404 }) |
| 375 | 405 |
| 376 Convey(`With terminal index 3`, func() { | 406 Convey(`With terminal index 3`, func() { |
| 377 stream.State.TerminalIndex = 3 | 407 stream.State.TerminalIndex = 3 |
| 378 | 408 |
| 379 Convey(`Will successfully archive if there are n
o entries.`, func() { | 409 Convey(`Will successfully archive if there are n
o entries.`, func() { |
| 380 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 410 » » » » » So(ar.Archive(c, &task, expired), Should
BeNil) |
| 381 | 411 |
| 382 So(hasStreams(true, true, false), Should
BeTrue) | 412 So(hasStreams(true, true, false), Should
BeTrue) |
| 383 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 413 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 384 Path: task.Path, | 414 Path: task.Path, |
| 385 » » » » » » Complete: true, | 415 » » » » » » LogEntryCount: 0, |
| 386 TerminalIndex: -1, | 416 TerminalIndex: -1, |
| 387 | 417 |
| 388 StreamUrl: gsURL("logstream.entr
ies"), | 418 StreamUrl: gsURL("logstream.entr
ies"), |
| 389 IndexUrl: gsURL("logstream.inde
x"), | 419 IndexUrl: gsURL("logstream.inde
x"), |
| 390 DataUrl: gsURL("data.bin"), | 420 DataUrl: gsURL("data.bin"), |
| 391 }) | 421 }) |
| 392 }) | 422 }) |
| 393 | 423 |
| 394 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { | 424 Convey(`With {0, 1, 2, 4} (incomplete) will arch
ive the stream and update its terminal index to 2.`, func() { |
| 395 addTestEntry(0, 1, 2, 4) | 425 addTestEntry(0, 1, 2, 4) |
| 396 » » » » » So(ar.Archive(c, &task), ShouldBeNil) | 426 » » » » » So(ar.Archive(c, &task, expired), Should
BeNil) |
| 397 | 427 |
| 398 So(hasStreams(true, true, true), ShouldB
eTrue) | 428 So(hasStreams(true, true, true), ShouldB
eTrue) |
| 399 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ | 429 So(archiveRequest, ShouldResemble, &logd
og.ArchiveStreamRequest{ |
| 400 Path: task.Path, | 430 Path: task.Path, |
| 401 » » » » » » Complete: false, | 431 » » » » » » LogEntryCount: 3, |
| 402 TerminalIndex: 2, | 432 TerminalIndex: 2, |
| 403 | 433 |
| 404 StreamUrl: gsURL("logstream.entr
ies"), | 434 StreamUrl: gsURL("logstream.entr
ies"), |
| 405 IndexUrl: gsURL("logstream.inde
x"), | 435 IndexUrl: gsURL("logstream.inde
x"), |
| 406 DataUrl: gsURL("data.bin"), | 436 DataUrl: gsURL("data.bin"), |
| 407 }) | 437 }) |
| 408 }) | 438 }) |
| 409 }) | 439 }) |
| 410 }) | 440 }) |
| 411 | 441 |
| 412 // Simulate failures during the various stream generation operat
ions. | 442 // Simulate failures during the various stream generation operat
ions. |
| 413 Convey(`Stream generation failures`, func() { | 443 Convey(`Stream generation failures`, func() { |
| 414 stream.State.TerminalIndex = 3 | 444 stream.State.TerminalIndex = 3 |
| 415 addTestEntry(0, 1, 2, 3) | 445 addTestEntry(0, 1, 2, 3) |
| 416 | 446 |
| 417 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { | 447 for _, failName := range []string{"/logstream.entries",
"/logstream.index", "/data.bin"} { |
| 418 for _, testCase := range []struct { | 448 for _, testCase := range []struct { |
| 419 » » » » » name string | 449 » » » » » name string |
| 420 » » » » » setup func() | 450 » » » » » transient bool |
| 451 » » » » » setup func() |
| 421 }{ | 452 }{ |
| 422 » » » » » {"delete failure", func() { | 453 » » » » » {"delete failure", false, func() { |
| 423 gsc.deleteErr = func(b, p string
) error { | 454 gsc.deleteErr = func(b, p string
) error { |
| 424 if strings.HasSuffix(p,
failName) { | 455 if strings.HasSuffix(p,
failName) { |
| 425 return errors.Ne
w("test error") | 456 return errors.Ne
w("test error") |
| 426 } | 457 } |
| 427 return nil | 458 return nil |
| 428 } | 459 } |
| 429 }}, | 460 }}, |
| 430 | 461 |
| 431 » » » » » {"writer create failure", func() { | 462 » » » » » {"writer create failure", false, func()
{ |
| 432 gsc.newWriterErr = func(w *testG
SWriter) error { | 463 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 433 if strings.HasSuffix(w.p
ath, failName) { | 464 if strings.HasSuffix(w.p
ath, failName) { |
| 434 return errors.Ne
w("test error") | 465 return errors.Ne
w("test error") |
| 435 } | 466 } |
| 436 return nil | 467 return nil |
| 437 } | 468 } |
| 438 }}, | 469 }}, |
| 439 | 470 |
| 440 » » » » » {"write failure", func() { | 471 » » » » » {"write failure", false, func() { |
| 441 gsc.newWriterErr = func(w *testG
SWriter) error { | 472 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 442 if strings.HasSuffix(w.p
ath, failName) { | 473 if strings.HasSuffix(w.p
ath, failName) { |
| 443 w.writeErr = err
ors.New("test error") | 474 w.writeErr = err
ors.New("test error") |
| 444 } | 475 } |
| 445 return nil | 476 return nil |
| 446 } | 477 } |
| 447 }}, | 478 }}, |
| 448 | 479 |
| 449 » » » » » {"close failure", func() { | 480 » » » » » {"close failure", false, func() { |
| 450 gsc.newWriterErr = func(w *testG
SWriter) error { | 481 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 451 if strings.HasSuffix(w.p
ath, failName) { | 482 if strings.HasSuffix(w.p
ath, failName) { |
| 452 w.closeErr = err
ors.New("test error") | 483 w.closeErr = err
ors.New("test error") |
| 453 } | 484 } |
| 454 return nil | 485 return nil |
| 455 } | 486 } |
| 456 }}, | 487 }}, |
| 457 | 488 |
| 458 » » » » » {"delete on fail failure (double-failure
)", func() { | 489 » » » » » {"delete on fail failure (double-failure
)", false, func() { |
| 459 failed := false | 490 failed := false |
| 460 | 491 |
| 461 // Simulate a write failure. Thi
s is the error that will actually | 492 // Simulate a write failure. Thi
s is the error that will actually |
| 462 // be returned. | 493 // be returned. |
| 463 gsc.newWriterErr = func(w *testG
SWriter) error { | 494 gsc.newWriterErr = func(w *testG
SWriter) error { |
| 464 if strings.HasSuffix(w.p
ath, failName) { | 495 if strings.HasSuffix(w.p
ath, failName) { |
| 465 w.writeErr = err
ors.New("test error") | 496 w.writeErr = err
ors.New("test error") |
| 466 } | 497 } |
| 467 return nil | 498 return nil |
| 468 } | 499 } |
| 469 | 500 |
| 470 // This will trigger twice per s
tream, once on create and once on | 501 // This will trigger twice per s
tream, once on create and once on |
| 471 // cleanup after the write fails
. We want to return an error in | 502 // cleanup after the write fails
. We want to return an error in |
| 472 // the latter case. | 503 // the latter case. |
| 473 gsc.deleteErr = func(b, p string
) error { | 504 gsc.deleteErr = func(b, p string
) error { |
| 474 if strings.HasSuffix(p,
failName) { | 505 if strings.HasSuffix(p,
failName) { |
| 475 if failed { | 506 if failed { |
| 476 return e
rrors.New("other error") | 507 return e
rrors.New("other error") |
| 477 } | 508 } |
| 478 | 509 |
| 479 // First delete
(on create). | 510 // First delete
(on create). |
| 480 failed = true | 511 failed = true |
| 481 } | 512 } |
| 482 return nil | 513 return nil |
| 483 } | 514 } |
| 484 }}, | 515 }}, |
| 485 } { | 516 } { |
| 486 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
`, testCase.name, failName), func() { | 517 » » » » » Convey(fmt.Sprintf(`Can handle %s for %s
, and will not archive.`, testCase.name, failName), func() { |
| 487 testCase.setup() | 518 testCase.setup() |
| 488 » » » » » » So(ar.Archive(c, &task), ShouldE
rrLike, "test error") | 519 |
| 520 » » » » » » err := ar.Archive(c, &task, 0) |
| 521 » » » » » » if testCase.transient { |
| 522 » » » » » » » // Transient errors are
directly returned without consulting the |
| 523 » » » » » » » // Coordinator service. |
| 524 » » » » » » » So(err, ShouldErrLike, "
test error") |
| 525 » » » » » » » So(errors.IsTransient(er
r), ShouldBeTrue) |
| 526 » » » » » » » So(archiveRequest, Shoul
dBeNil) |
| 527 » » » » » » } else { |
| 528 » » » » » » » So(err, ShouldBeNil) |
| 529 » » » » » » » So(archiveRequest, Shoul
dNotBeNil) |
| 530 » » » » » » » So(archiveRequest.Error,
ShouldBeTrue) |
| 531 » » » » » » } |
| 489 }) | 532 }) |
| 490 } | 533 } |
| 491 } | 534 } |
| 492 }) | 535 }) |
| 493 }) | 536 }) |
| 494 } | 537 } |
| OLD | NEW |