| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package butler | 5 package butler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 122 err error | 122 err error |
| 123 } | 123 } |
| 124 | 124 |
| 125 type testStream struct { | 125 type testStream struct { |
| 126 inC chan *testStreamData | 126 inC chan *testStreamData |
| 127 closedC chan struct{} | 127 closedC chan struct{} |
| 128 | 128 |
| 129 properties *streamproto.Properties | 129 properties *streamproto.Properties |
| 130 } | 130 } |
| 131 | 131 |
| 132 func newTestStream(p streamproto.Properties) *testStream { | |
| 133 return &testStream{ | |
| 134 inC: make(chan *testStreamData, 16), | |
| 135 closedC: make(chan struct{}), | |
| 136 properties: &p, | |
| 137 } | |
| 138 } | |
| 139 | |
| 140 func (ts *testStream) data(d []byte, err error) { | 132 func (ts *testStream) data(d []byte, err error) { |
| 141 ts.inC <- &testStreamData{ | 133 ts.inC <- &testStreamData{ |
| 142 data: d, | 134 data: d, |
| 143 err: err, | 135 err: err, |
| 144 } | 136 } |
| 145 if err == io.EOF { | 137 if err == io.EOF { |
| 146 // If EOF is hit, continue reporting EOF. | 138 // If EOF is hit, continue reporting EOF. |
| 147 close(ts.inC) | 139 close(ts.inC) |
| 148 } | 140 } |
| 149 } | 141 } |
| (...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 310 }) | 302 }) |
| 311 | 303 |
| 312 Convey(`Will retain the first error and ignore duplicate shutdow
ns.`, func() { | 304 Convey(`Will retain the first error and ignore duplicate shutdow
ns.`, func() { |
| 313 b := mkb(c, conf) | 305 b := mkb(c, conf) |
| 314 b.shutdown(errors.New("first error")) | 306 b.shutdown(errors.New("first error")) |
| 315 b.shutdown(errors.New("second error")) | 307 b.shutdown(errors.New("second error")) |
| 316 So(b.Wait(), ShouldErrLike, "first error") | 308 So(b.Wait(), ShouldErrLike, "first error") |
| 317 }) | 309 }) |
| 318 | 310 |
| 319 Convey(`Using a generic stream Properties`, func() { | 311 Convey(`Using a generic stream Properties`, func() { |
| 320 » » » props := streamproto.Properties{ | 312 » » » newTestStream := func(setup func(p *streamproto.Properti
es)) *testStream { |
| 321 » » » » LogStreamDescriptor: logpb.LogStreamDescriptor{ | 313 » » » » props := streamproto.Properties{ |
| 322 » » » » » Name: "test", | 314 » » » » » LogStreamDescriptor: &logpb.LogStreamDes
criptor{ |
| 323 » » » » » StreamType: logpb.StreamType_TEXT, | 315 » » » » » » Name: "test", |
| 324 » » » » » ContentType: string(types.ContentTypeTex
t), | 316 » » » » » » StreamType: logpb.StreamType_TE
XT, |
| 325 » » » » }, | 317 » » » » » » ContentType: string(types.Conten
tTypeText), |
| 318 » » » » » }, |
| 319 » » » » } |
| 320 » » » » if setup != nil { |
| 321 » » » » » setup(&props) |
| 322 » » » » } |
| 323 |
| 324 » » » » return &testStream{ |
| 325 » » » » » inC: make(chan *testStreamData, 1
6), |
| 326 » » » » » closedC: make(chan struct{}), |
| 327 » » » » » properties: &props, |
| 328 » » » » } |
| 326 } | 329 } |
| 327 | 330 |
| 328 Convey(`Will not add a stream with an invalid configurat
ion.`, func() { | 331 Convey(`Will not add a stream with an invalid configurat
ion.`, func() { |
| 329 // No content type. | 332 // No content type. |
| 330 » » » » props.ContentType = "" | 333 » » » » s := newTestStream(func(p *streamproto.Propertie
s) { |
| 331 | 334 » » » » » p.ContentType = "" |
| 332 » » » » s := newTestStream(props) | 335 » » » » }) |
| 333 b := mkb(c, conf) | 336 b := mkb(c, conf) |
| 334 » » » » So(b.AddStream(s, *s.properties), ShouldNotBeNil
) | 337 » » » » So(b.AddStream(s, s.properties), ShouldNotBeNil) |
| 335 }) | 338 }) |
| 336 | 339 |
| 337 Convey(`Will not add a stream with a duplicate stream na
me.`, func() { | 340 Convey(`Will not add a stream with a duplicate stream na
me.`, func() { |
| 338 b := mkb(c, conf) | 341 b := mkb(c, conf) |
| 339 | 342 |
| 340 » » » » s0 := newTestStream(props) | 343 » » » » s0 := newTestStream(nil) |
| 341 » » » » So(b.AddStream(s0, *s0.properties), ShouldBeNil) | 344 » » » » So(b.AddStream(s0, s0.properties), ShouldBeNil) |
| 342 | 345 |
| 343 » » » » s1 := newTestStream(props) | 346 » » » » s1 := newTestStream(nil) |
| 344 » » » » So(b.AddStream(s1, *s1.properties), ShouldErrLik
e, "a stream has already been registered") | 347 » » » » So(b.AddStream(s1, s1.properties), ShouldErrLike
, "a stream has already been registered") |
| 345 }) | 348 }) |
| 346 | 349 |
| 347 Convey(`Will not accept invalid tee configuration`, func
() { | 350 Convey(`Will not accept invalid tee configuration`, func
() { |
| 348 conf.TeeStdout = nil | 351 conf.TeeStdout = nil |
| 349 conf.TeeStderr = nil | 352 conf.TeeStderr = nil |
| 350 | 353 |
| 351 for _, tc := range []struct { | 354 for _, tc := range []struct { |
| 352 tee streamproto.TeeType | 355 tee streamproto.TeeType |
| 353 err string | 356 err string |
| 354 }{ | 357 }{ |
| 355 {streamproto.TeeStdout, "no STDOUT is co
nfigured"}, | 358 {streamproto.TeeStdout, "no STDOUT is co
nfigured"}, |
| 356 {streamproto.TeeStderr, "no STDERR is co
nfigured"}, | 359 {streamproto.TeeStderr, "no STDERR is co
nfigured"}, |
| 357 {streamproto.TeeType(0xFFFFFFFF), "inval
id tee value"}, | 360 {streamproto.TeeType(0xFFFFFFFF), "inval
id tee value"}, |
| 358 } { | 361 } { |
| 359 Convey(fmt.Sprintf(`Rejects stream with
TeeType [%v], when no tee outputs are configured.`, tc.tee), func() { | 362 Convey(fmt.Sprintf(`Rejects stream with
TeeType [%v], when no tee outputs are configured.`, tc.tee), func() { |
| 360 props.Tee = tc.tee | |
| 361 | |
| 362 b := mkb(c, conf) | 363 b := mkb(c, conf) |
| 363 » » » » » » s := newTestStream(props) | 364 » » » » » » s := newTestStream(func(p *strea
mproto.Properties) { |
| 364 » » » » » » So(b.AddStream(s, *s.properties)
, ShouldErrLike, tc.err) | 365 » » » » » » » p.Tee = tc.tee |
| 366 » » » » » » }) |
| 367 » » » » » » So(b.AddStream(s, s.properties),
ShouldErrLike, tc.err) |
| 365 }) | 368 }) |
| 366 } | 369 } |
| 367 }) | 370 }) |
| 368 | 371 |
| 369 Convey(`When adding a stream configured to tee through S
TDOUT/STDERR, tees.`, func() { | 372 Convey(`When adding a stream configured to tee through S
TDOUT/STDERR, tees.`, func() { |
| 370 » » » » props.Name = "stdout" | 373 » » » » stdout := newTestStream(func(p *streamproto.Prop
erties) { |
| 371 » » » » props.Tee = streamproto.TeeStdout | 374 » » » » » p.Name = "stdout" |
| 372 » » » » stdout := newTestStream(props) | 375 » » » » » p.Tee = streamproto.TeeStdout |
| 376 » » » » }) |
| 373 | 377 |
| 374 » » » » props.Name = "stderr" | 378 » » » » stderr := newTestStream(func(p *streamproto.Prop
erties) { |
| 375 » » » » props.Tee = streamproto.TeeStderr | 379 » » » » » p.Name = "stderr" |
| 376 » » » » stderr := newTestStream(props) | 380 » » » » » p.Tee = streamproto.TeeStderr |
| 381 » » » » }) |
| 377 | 382 |
| 378 b := mkb(c, conf) | 383 b := mkb(c, conf) |
| 379 » » » » So(b.AddStream(stdout, *stdout.properties), Shou
ldBeNil) | 384 » » » » So(b.AddStream(stdout, stdout.properties), Shoul
dBeNil) |
| 380 » » » » So(b.AddStream(stderr, *stderr.properties), Shou
ldBeNil) | 385 » » » » So(b.AddStream(stderr, stderr.properties), Shoul
dBeNil) |
| 381 | 386 |
| 382 stdout.data([]byte("Hello, STDOUT"), io.EOF) | 387 stdout.data([]byte("Hello, STDOUT"), io.EOF) |
| 383 stderr.data([]byte("Hello, STDERR"), io.EOF) | 388 stderr.data([]byte("Hello, STDERR"), io.EOF) |
| 384 | 389 |
| 385 b.Activate() | 390 b.Activate() |
| 386 So(b.Wait(), ShouldBeNil) | 391 So(b.Wait(), ShouldBeNil) |
| 387 | 392 |
| 388 So(teeStdout.String(), ShouldEqual, "Hello, STDO
UT") | 393 So(teeStdout.String(), ShouldEqual, "Hello, STDO
UT") |
| 389 So(to.logs("stdout"), shouldHaveTextLogs, "Hello
, STDOUT") | 394 So(to.logs("stdout"), shouldHaveTextLogs, "Hello
, STDOUT") |
| 390 So(to.isTerminal("stdout"), ShouldBeTrue) | 395 So(to.isTerminal("stdout"), ShouldBeTrue) |
| 391 | 396 |
| 392 So(teeStderr.String(), ShouldEqual, "Hello, STDE
RR") | 397 So(teeStderr.String(), ShouldEqual, "Hello, STDE
RR") |
| 393 So(to.logs("stderr"), shouldHaveTextLogs, "Hello
, STDERR") | 398 So(to.logs("stderr"), shouldHaveTextLogs, "Hello
, STDERR") |
| 394 So(to.isTerminal("stderr"), ShouldBeTrue) | 399 So(to.isTerminal("stderr"), ShouldBeTrue) |
| 395 }) | 400 }) |
| 396 | 401 |
| 397 Convey(`Run with 256 streams, stream{0..256} will deplet
e and finish.`, func() { | 402 Convey(`Run with 256 streams, stream{0..256} will deplet
e and finish.`, func() { |
| 398 b := mkb(c, conf) | 403 b := mkb(c, conf) |
| 399 streams := make([]*testStream, 256) | 404 streams := make([]*testStream, 256) |
| 400 for i := range streams { | 405 for i := range streams { |
| 401 » » » » » props.Name = fmt.Sprintf("stream%d", i) | 406 » » » » » streams[i] = newTestStream(func(p *strea
mproto.Properties) { |
| 402 » » » » » streams[i] = newTestStream(props) | 407 » » » » » » p.Name = fmt.Sprintf("stream%d",
i) |
| 408 » » » » » }) |
| 403 } | 409 } |
| 404 | 410 |
| 405 for _, s := range streams { | 411 for _, s := range streams { |
| 406 » » » » » So(b.AddStream(s, *s.properties), Should
BeNil) | 412 » » » » » So(b.AddStream(s, s.properties), ShouldB
eNil) |
| 407 s.data([]byte("stream data 0!\n"), nil) | 413 s.data([]byte("stream data 0!\n"), nil) |
| 408 s.data([]byte("stream data 1!\n"), nil) | 414 s.data([]byte("stream data 1!\n"), nil) |
| 409 } | 415 } |
| 410 | 416 |
| 411 // Add data to the streams after shutdown. | 417 // Add data to the streams after shutdown. |
| 412 for _, s := range streams { | 418 for _, s := range streams { |
| 413 s.data([]byte("stream data 2!\n"), io.EO
F) | 419 s.data([]byte("stream data 2!\n"), io.EO
F) |
| 414 } | 420 } |
| 415 | 421 |
| 416 b.Activate() | 422 b.Activate() |
| 417 So(b.Wait(), ShouldBeNil) | 423 So(b.Wait(), ShouldBeNil) |
| 418 | 424 |
| 419 for _, s := range streams { | 425 for _, s := range streams { |
| 420 name := string(s.properties.Name) | 426 name := string(s.properties.Name) |
| 421 | 427 |
| 422 So(to.logs(name), shouldHaveTextLogs, "s
tream data 0!", "stream data 1!", "stream data 2!") | 428 So(to.logs(name), shouldHaveTextLogs, "s
tream data 0!", "stream data 1!", "stream data 2!") |
| 423 So(to.isTerminal(name), ShouldBeTrue) | 429 So(to.isTerminal(name), ShouldBeTrue) |
| 424 } | 430 } |
| 425 }) | 431 }) |
| 426 | 432 |
| 427 Convey(`Shutdown with 256 in-progress streams, stream{0.
.256} will terminate if they emitted logs.`, func() { | 433 Convey(`Shutdown with 256 in-progress streams, stream{0.
.256} will terminate if they emitted logs.`, func() { |
| 428 b := mkb(c, conf) | 434 b := mkb(c, conf) |
| 429 streams := make([]*testStream, 256) | 435 streams := make([]*testStream, 256) |
| 430 for i := range streams { | 436 for i := range streams { |
| 431 » » » » » props.Name = fmt.Sprintf("stream%d", i) | 437 » » » » » streams[i] = newTestStream(func(p *strea
mproto.Properties) { |
| 432 » » » » » streams[i] = newTestStream(props) | 438 » » » » » » p.Name = fmt.Sprintf("stream%d",
i) |
| 439 » » » » » }) |
| 433 } | 440 } |
| 434 | 441 |
| 435 for _, s := range streams { | 442 for _, s := range streams { |
| 436 » » » » » So(b.AddStream(s, *s.properties), Should
BeNil) | 443 » » » » » So(b.AddStream(s, s.properties), ShouldB
eNil) |
| 437 s.data([]byte("stream data!\n"), nil) | 444 s.data([]byte("stream data!\n"), nil) |
| 438 } | 445 } |
| 439 | 446 |
| 440 b.shutdown(errors.New("test shutdown")) | 447 b.shutdown(errors.New("test shutdown")) |
| 441 So(b.Wait(), ShouldErrLike, "test shutdown") | 448 So(b.Wait(), ShouldErrLike, "test shutdown") |
| 442 | 449 |
| 443 for _, s := range streams { | 450 for _, s := range streams { |
| 444 if len(to.logs(s.properties.Name)) > 0 { | 451 if len(to.logs(s.properties.Name)) > 0 { |
| 445 So(to.isTerminal(string(s.proper
ties.Name)), ShouldBeTrue) | 452 So(to.isTerminal(string(s.proper
ties.Name)), ShouldBeTrue) |
| 446 } else { | 453 } else { |
| 447 So(to.isTerminal(string(s.proper
ties.Name)), ShouldBeFalse) | 454 So(to.isTerminal(string(s.proper
ties.Name)), ShouldBeFalse) |
| 448 } | 455 } |
| 449 } | 456 } |
| 450 }) | 457 }) |
| 451 | 458 |
| 452 Convey(`Using ten test stream servers`, func() { | 459 Convey(`Using ten test stream servers`, func() { |
| 453 servers := make([]*testStreamServer, 10) | 460 servers := make([]*testStreamServer, 10) |
| 454 for i := range servers { | 461 for i := range servers { |
| 455 servers[i] = newTestStreamServer() | 462 servers[i] = newTestStreamServer() |
| 456 } | 463 } |
| 457 streams := []*testStream(nil) | 464 streams := []*testStream(nil) |
| 458 | 465 |
| 459 Convey(`Can register both before Run and will re
tain streams.`, func() { | 466 Convey(`Can register both before Run and will re
tain streams.`, func() { |
| 460 b := mkb(c, conf) | 467 b := mkb(c, conf) |
| 461 for i, tss := range servers { | 468 for i, tss := range servers { |
| 462 b.AddStreamServer(tss) | 469 b.AddStreamServer(tss) |
| 463 | 470 |
| 464 » » » » » » props.Name = fmt.Sprintf("stream
%d", i) | 471 » » » » » » s := newTestStream(func(p *strea
mproto.Properties) { |
| 465 » » » » » » s := newTestStream(props) | 472 » » » » » » » p.Name = fmt.Sprintf("st
ream%d", i) |
| 473 » » » » » » }) |
| 466 streams = append(streams, s) | 474 streams = append(streams, s) |
| 467 s.data([]byte("test data"), io.E
OF) | 475 s.data([]byte("test data"), io.E
OF) |
| 468 tss.enqueue(s) | 476 tss.enqueue(s) |
| 469 } | 477 } |
| 470 | 478 |
| 471 b.Activate() | 479 b.Activate() |
| 472 So(b.Wait(), ShouldBeNil) | 480 So(b.Wait(), ShouldBeNil) |
| 473 | 481 |
| 474 for _, s := range streams { | 482 for _, s := range streams { |
| 475 So(to.logs(s.properties.Name), s
houldHaveTextLogs, "test data") | 483 So(to.logs(s.properties.Name), s
houldHaveTextLogs, "test data") |
| 476 So(to.isTerminal(s.properties.Na
me), ShouldBeTrue) | 484 So(to.isTerminal(s.properties.Na
me), ShouldBeTrue) |
| 477 } | 485 } |
| 478 }) | 486 }) |
| 479 | 487 |
| 480 Convey(`Can register both during Run and will re
tain streams.`, func() { | 488 Convey(`Can register both during Run and will re
tain streams.`, func() { |
| 481 b := mkb(c, conf) | 489 b := mkb(c, conf) |
| 482 for i, tss := range servers { | 490 for i, tss := range servers { |
| 483 b.AddStreamServer(tss) | 491 b.AddStreamServer(tss) |
| 484 | 492 |
| 485 » » » » » » props.Name = fmt.Sprintf("stream
%d", i) | 493 » » » » » » s := newTestStream(func(p *strea
mproto.Properties) { |
| 486 » » » » » » s := newTestStream(props) | 494 » » » » » » » p.Name = fmt.Sprintf("st
ream%d", i) |
| 495 » » » » » » }) |
| 487 streams = append(streams, s) | 496 streams = append(streams, s) |
| 488 s.data([]byte("test data"), io.E
OF) | 497 s.data([]byte("test data"), io.E
OF) |
| 489 tss.enqueue(s) | 498 tss.enqueue(s) |
| 490 } | 499 } |
| 491 | 500 |
| 492 b.Activate() | 501 b.Activate() |
| 493 So(b.Wait(), ShouldBeNil) | 502 So(b.Wait(), ShouldBeNil) |
| 494 | 503 |
| 495 for _, s := range streams { | 504 for _, s := range streams { |
| 496 So(to.logs(s.properties.Name), s
houldHaveTextLogs, "test data") | 505 So(to.logs(s.properties.Name), s
houldHaveTextLogs, "test data") |
| 497 So(to.isTerminal(s.properties.Na
me), ShouldBeTrue) | 506 So(to.isTerminal(s.properties.Na
me), ShouldBeTrue) |
| 498 } | 507 } |
| 499 }) | 508 }) |
| 500 }) | 509 }) |
| 501 | 510 |
| 502 Convey(`Will ignore stream registration errors, allowing
re-registration.`, func() { | 511 Convey(`Will ignore stream registration errors, allowing
re-registration.`, func() { |
| 503 tss := newTestStreamServer() | 512 tss := newTestStreamServer() |
| 504 | 513 |
| 505 // Generate an invalid stream for "tss" to regis
ter. | 514 // Generate an invalid stream for "tss" to regis
ter. |
| 506 » » » » sGood := newTestStream(props) | 515 » » » » sGood := newTestStream(nil) |
| 507 sGood.data([]byte("good test data"), io.EOF) | 516 sGood.data([]byte("good test data"), io.EOF) |
| 508 | 517 |
| 509 » » » » props.ContentType = "" | 518 » » » » sBad := newTestStream(func(p *streamproto.Proper
ties) { |
| 510 » » » » sBad := newTestStream(props) | 519 » » » » » p.ContentType = "" |
| 520 » » » » }) |
| 511 sBad.data([]byte("bad test data"), io.EOF) | 521 sBad.data([]byte("bad test data"), io.EOF) |
| 512 | 522 |
| 513 b := mkb(c, conf) | 523 b := mkb(c, conf) |
| 514 b.AddStreamServer(tss) | 524 b.AddStreamServer(tss) |
| 515 tss.enqueue(sBad) | 525 tss.enqueue(sBad) |
| 516 tss.enqueue(sGood) | 526 tss.enqueue(sGood) |
| 517 b.Activate() | 527 b.Activate() |
| 518 So(b.Wait(), ShouldBeNil) | 528 So(b.Wait(), ShouldBeNil) |
| 519 | 529 |
| 520 So(sBad.isClosed(), ShouldBeTrue) | 530 So(sBad.isClosed(), ShouldBeTrue) |
| 521 So(sGood.isClosed(), ShouldBeTrue) | 531 So(sGood.isClosed(), ShouldBeTrue) |
| 522 » » » » So(to.logs(props.Name), shouldHaveTextLogs, "goo
d test data") | 532 » » » » So(to.logs("test"), shouldHaveTextLogs, "good te
st data") |
| 523 » » » » So(to.isTerminal(props.Name), ShouldBeTrue) | 533 » » » » So(to.isTerminal("test"), ShouldBeTrue) |
| 524 }) | 534 }) |
| 525 }) | 535 }) |
| 526 | 536 |
| 527 Convey(`Will terminate if the stream server panics.`, func() { | 537 Convey(`Will terminate if the stream server panics.`, func() { |
| 528 tss := newTestStreamServer() | 538 tss := newTestStreamServer() |
| 529 tss.onNext = func() { | 539 tss.onNext = func() { |
| 530 panic("test panic") | 540 panic("test panic") |
| 531 } | 541 } |
| 532 | 542 |
| 533 b := mkb(c, conf) | 543 b := mkb(c, conf) |
| 534 b.AddStreamServer(tss) | 544 b.AddStreamServer(tss) |
| 535 So(b.Wait(), ShouldErrLike, "test panic") | 545 So(b.Wait(), ShouldErrLike, "test panic") |
| 536 }) | 546 }) |
| 537 }) | 547 }) |
| 538 } | 548 } |
| OLD | NEW |