Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(537)

Side by Side Diff: logdog/client/butler/butler_test.go

Issue 2456953003: LogDog: Update client/bootstrap to generate URLs. (Closed)
Patch Set: Winders Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/butler/butler.go ('k') | logdog/client/butler/streamserver/handshake_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package butler 5 package butler
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 err error 122 err error
123 } 123 }
124 124
125 type testStream struct { 125 type testStream struct {
126 inC chan *testStreamData 126 inC chan *testStreamData
127 closedC chan struct{} 127 closedC chan struct{}
128 128
129 properties *streamproto.Properties 129 properties *streamproto.Properties
130 } 130 }
131 131
132 func newTestStream(p streamproto.Properties) *testStream {
133 return &testStream{
134 inC: make(chan *testStreamData, 16),
135 closedC: make(chan struct{}),
136 properties: &p,
137 }
138 }
139
140 func (ts *testStream) data(d []byte, err error) { 132 func (ts *testStream) data(d []byte, err error) {
141 ts.inC <- &testStreamData{ 133 ts.inC <- &testStreamData{
142 data: d, 134 data: d,
143 err: err, 135 err: err,
144 } 136 }
145 if err == io.EOF { 137 if err == io.EOF {
146 // If EOF is hit, continue reporting EOF. 138 // If EOF is hit, continue reporting EOF.
147 close(ts.inC) 139 close(ts.inC)
148 } 140 }
149 } 141 }
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
310 }) 302 })
311 303
312 Convey(`Will retain the first error and ignore duplicate shutdow ns.`, func() { 304 Convey(`Will retain the first error and ignore duplicate shutdow ns.`, func() {
313 b := mkb(c, conf) 305 b := mkb(c, conf)
314 b.shutdown(errors.New("first error")) 306 b.shutdown(errors.New("first error"))
315 b.shutdown(errors.New("second error")) 307 b.shutdown(errors.New("second error"))
316 So(b.Wait(), ShouldErrLike, "first error") 308 So(b.Wait(), ShouldErrLike, "first error")
317 }) 309 })
318 310
319 Convey(`Using a generic stream Properties`, func() { 311 Convey(`Using a generic stream Properties`, func() {
320 » » » props := streamproto.Properties{ 312 » » » newTestStream := func(setup func(p *streamproto.Properti es)) *testStream {
321 » » » » LogStreamDescriptor: logpb.LogStreamDescriptor{ 313 » » » » props := streamproto.Properties{
322 » » » » » Name: "test", 314 » » » » » LogStreamDescriptor: &logpb.LogStreamDes criptor{
323 » » » » » StreamType: logpb.StreamType_TEXT, 315 » » » » » » Name: "test",
324 » » » » » ContentType: string(types.ContentTypeTex t), 316 » » » » » » StreamType: logpb.StreamType_TE XT,
325 » » » » }, 317 » » » » » » ContentType: string(types.Conten tTypeText),
318 » » » » » },
319 » » » » }
320 » » » » if setup != nil {
321 » » » » » setup(&props)
322 » » » » }
323
324 » » » » return &testStream{
325 » » » » » inC: make(chan *testStreamData, 1 6),
326 » » » » » closedC: make(chan struct{}),
327 » » » » » properties: &props,
328 » » » » }
326 } 329 }
327 330
328 Convey(`Will not add a stream with an invalid configurat ion.`, func() { 331 Convey(`Will not add a stream with an invalid configurat ion.`, func() {
329 // No content type. 332 // No content type.
330 » » » » props.ContentType = "" 333 » » » » s := newTestStream(func(p *streamproto.Propertie s) {
331 334 » » » » » p.ContentType = ""
332 » » » » s := newTestStream(props) 335 » » » » })
333 b := mkb(c, conf) 336 b := mkb(c, conf)
334 » » » » So(b.AddStream(s, *s.properties), ShouldNotBeNil ) 337 » » » » So(b.AddStream(s, s.properties), ShouldNotBeNil)
335 }) 338 })
336 339
337 Convey(`Will not add a stream with a duplicate stream na me.`, func() { 340 Convey(`Will not add a stream with a duplicate stream na me.`, func() {
338 b := mkb(c, conf) 341 b := mkb(c, conf)
339 342
340 » » » » s0 := newTestStream(props) 343 » » » » s0 := newTestStream(nil)
341 » » » » So(b.AddStream(s0, *s0.properties), ShouldBeNil) 344 » » » » So(b.AddStream(s0, s0.properties), ShouldBeNil)
342 345
343 » » » » s1 := newTestStream(props) 346 » » » » s1 := newTestStream(nil)
344 » » » » So(b.AddStream(s1, *s1.properties), ShouldErrLik e, "a stream has already been registered") 347 » » » » So(b.AddStream(s1, s1.properties), ShouldErrLike , "a stream has already been registered")
345 }) 348 })
346 349
347 Convey(`Will not accept invalid tee configuration`, func () { 350 Convey(`Will not accept invalid tee configuration`, func () {
348 conf.TeeStdout = nil 351 conf.TeeStdout = nil
349 conf.TeeStderr = nil 352 conf.TeeStderr = nil
350 353
351 for _, tc := range []struct { 354 for _, tc := range []struct {
352 tee streamproto.TeeType 355 tee streamproto.TeeType
353 err string 356 err string
354 }{ 357 }{
355 {streamproto.TeeStdout, "no STDOUT is co nfigured"}, 358 {streamproto.TeeStdout, "no STDOUT is co nfigured"},
356 {streamproto.TeeStderr, "no STDERR is co nfigured"}, 359 {streamproto.TeeStderr, "no STDERR is co nfigured"},
357 {streamproto.TeeType(0xFFFFFFFF), "inval id tee value"}, 360 {streamproto.TeeType(0xFFFFFFFF), "inval id tee value"},
358 } { 361 } {
359 Convey(fmt.Sprintf(`Rejects stream with TeeType [%v], when no tee outputs are configured.`, tc.tee), func() { 362 Convey(fmt.Sprintf(`Rejects stream with TeeType [%v], when no tee outputs are configured.`, tc.tee), func() {
360 props.Tee = tc.tee
361
362 b := mkb(c, conf) 363 b := mkb(c, conf)
363 » » » » » » s := newTestStream(props) 364 » » » » » » s := newTestStream(func(p *strea mproto.Properties) {
364 » » » » » » So(b.AddStream(s, *s.properties) , ShouldErrLike, tc.err) 365 » » » » » » » p.Tee = tc.tee
366 » » » » » » })
367 » » » » » » So(b.AddStream(s, s.properties), ShouldErrLike, tc.err)
365 }) 368 })
366 } 369 }
367 }) 370 })
368 371
369 Convey(`When adding a stream configured to tee through S TDOUT/STDERR, tees.`, func() { 372 Convey(`When adding a stream configured to tee through S TDOUT/STDERR, tees.`, func() {
370 » » » » props.Name = "stdout" 373 » » » » stdout := newTestStream(func(p *streamproto.Prop erties) {
371 » » » » props.Tee = streamproto.TeeStdout 374 » » » » » p.Name = "stdout"
372 » » » » stdout := newTestStream(props) 375 » » » » » p.Tee = streamproto.TeeStdout
376 » » » » })
373 377
374 » » » » props.Name = "stderr" 378 » » » » stderr := newTestStream(func(p *streamproto.Prop erties) {
375 » » » » props.Tee = streamproto.TeeStderr 379 » » » » » p.Name = "stderr"
376 » » » » stderr := newTestStream(props) 380 » » » » » p.Tee = streamproto.TeeStderr
381 » » » » })
377 382
378 b := mkb(c, conf) 383 b := mkb(c, conf)
379 » » » » So(b.AddStream(stdout, *stdout.properties), Shou ldBeNil) 384 » » » » So(b.AddStream(stdout, stdout.properties), Shoul dBeNil)
380 » » » » So(b.AddStream(stderr, *stderr.properties), Shou ldBeNil) 385 » » » » So(b.AddStream(stderr, stderr.properties), Shoul dBeNil)
381 386
382 stdout.data([]byte("Hello, STDOUT"), io.EOF) 387 stdout.data([]byte("Hello, STDOUT"), io.EOF)
383 stderr.data([]byte("Hello, STDERR"), io.EOF) 388 stderr.data([]byte("Hello, STDERR"), io.EOF)
384 389
385 b.Activate() 390 b.Activate()
386 So(b.Wait(), ShouldBeNil) 391 So(b.Wait(), ShouldBeNil)
387 392
388 So(teeStdout.String(), ShouldEqual, "Hello, STDO UT") 393 So(teeStdout.String(), ShouldEqual, "Hello, STDO UT")
389 So(to.logs("stdout"), shouldHaveTextLogs, "Hello , STDOUT") 394 So(to.logs("stdout"), shouldHaveTextLogs, "Hello , STDOUT")
390 So(to.isTerminal("stdout"), ShouldBeTrue) 395 So(to.isTerminal("stdout"), ShouldBeTrue)
391 396
392 So(teeStderr.String(), ShouldEqual, "Hello, STDE RR") 397 So(teeStderr.String(), ShouldEqual, "Hello, STDE RR")
393 So(to.logs("stderr"), shouldHaveTextLogs, "Hello , STDERR") 398 So(to.logs("stderr"), shouldHaveTextLogs, "Hello , STDERR")
394 So(to.isTerminal("stderr"), ShouldBeTrue) 399 So(to.isTerminal("stderr"), ShouldBeTrue)
395 }) 400 })
396 401
397 Convey(`Run with 256 streams, stream{0..256} will deplet e and finish.`, func() { 402 Convey(`Run with 256 streams, stream{0..256} will deplet e and finish.`, func() {
398 b := mkb(c, conf) 403 b := mkb(c, conf)
399 streams := make([]*testStream, 256) 404 streams := make([]*testStream, 256)
400 for i := range streams { 405 for i := range streams {
401 » » » » » props.Name = fmt.Sprintf("stream%d", i) 406 » » » » » streams[i] = newTestStream(func(p *strea mproto.Properties) {
402 » » » » » streams[i] = newTestStream(props) 407 » » » » » » p.Name = fmt.Sprintf("stream%d", i)
408 » » » » » })
403 } 409 }
404 410
405 for _, s := range streams { 411 for _, s := range streams {
406 » » » » » So(b.AddStream(s, *s.properties), Should BeNil) 412 » » » » » So(b.AddStream(s, s.properties), ShouldB eNil)
407 s.data([]byte("stream data 0!\n"), nil) 413 s.data([]byte("stream data 0!\n"), nil)
408 s.data([]byte("stream data 1!\n"), nil) 414 s.data([]byte("stream data 1!\n"), nil)
409 } 415 }
410 416
411 // Add data to the streams after shutdown. 417 // Add data to the streams after shutdown.
412 for _, s := range streams { 418 for _, s := range streams {
413 s.data([]byte("stream data 2!\n"), io.EO F) 419 s.data([]byte("stream data 2!\n"), io.EO F)
414 } 420 }
415 421
416 b.Activate() 422 b.Activate()
417 So(b.Wait(), ShouldBeNil) 423 So(b.Wait(), ShouldBeNil)
418 424
419 for _, s := range streams { 425 for _, s := range streams {
420 name := string(s.properties.Name) 426 name := string(s.properties.Name)
421 427
422 So(to.logs(name), shouldHaveTextLogs, "s tream data 0!", "stream data 1!", "stream data 2!") 428 So(to.logs(name), shouldHaveTextLogs, "s tream data 0!", "stream data 1!", "stream data 2!")
423 So(to.isTerminal(name), ShouldBeTrue) 429 So(to.isTerminal(name), ShouldBeTrue)
424 } 430 }
425 }) 431 })
426 432
427 Convey(`Shutdown with 256 in-progress streams, stream{0. .256} will terminate if they emitted logs.`, func() { 433 Convey(`Shutdown with 256 in-progress streams, stream{0. .256} will terminate if they emitted logs.`, func() {
428 b := mkb(c, conf) 434 b := mkb(c, conf)
429 streams := make([]*testStream, 256) 435 streams := make([]*testStream, 256)
430 for i := range streams { 436 for i := range streams {
431 » » » » » props.Name = fmt.Sprintf("stream%d", i) 437 » » » » » streams[i] = newTestStream(func(p *strea mproto.Properties) {
432 » » » » » streams[i] = newTestStream(props) 438 » » » » » » p.Name = fmt.Sprintf("stream%d", i)
439 » » » » » })
433 } 440 }
434 441
435 for _, s := range streams { 442 for _, s := range streams {
436 » » » » » So(b.AddStream(s, *s.properties), Should BeNil) 443 » » » » » So(b.AddStream(s, s.properties), ShouldB eNil)
437 s.data([]byte("stream data!\n"), nil) 444 s.data([]byte("stream data!\n"), nil)
438 } 445 }
439 446
440 b.shutdown(errors.New("test shutdown")) 447 b.shutdown(errors.New("test shutdown"))
441 So(b.Wait(), ShouldErrLike, "test shutdown") 448 So(b.Wait(), ShouldErrLike, "test shutdown")
442 449
443 for _, s := range streams { 450 for _, s := range streams {
444 if len(to.logs(s.properties.Name)) > 0 { 451 if len(to.logs(s.properties.Name)) > 0 {
445 So(to.isTerminal(string(s.proper ties.Name)), ShouldBeTrue) 452 So(to.isTerminal(string(s.proper ties.Name)), ShouldBeTrue)
446 } else { 453 } else {
447 So(to.isTerminal(string(s.proper ties.Name)), ShouldBeFalse) 454 So(to.isTerminal(string(s.proper ties.Name)), ShouldBeFalse)
448 } 455 }
449 } 456 }
450 }) 457 })
451 458
452 Convey(`Using ten test stream servers`, func() { 459 Convey(`Using ten test stream servers`, func() {
453 servers := make([]*testStreamServer, 10) 460 servers := make([]*testStreamServer, 10)
454 for i := range servers { 461 for i := range servers {
455 servers[i] = newTestStreamServer() 462 servers[i] = newTestStreamServer()
456 } 463 }
457 streams := []*testStream(nil) 464 streams := []*testStream(nil)
458 465
459 Convey(`Can register both before Run and will re tain streams.`, func() { 466 Convey(`Can register both before Run and will re tain streams.`, func() {
460 b := mkb(c, conf) 467 b := mkb(c, conf)
461 for i, tss := range servers { 468 for i, tss := range servers {
462 b.AddStreamServer(tss) 469 b.AddStreamServer(tss)
463 470
464 » » » » » » props.Name = fmt.Sprintf("stream %d", i) 471 » » » » » » s := newTestStream(func(p *strea mproto.Properties) {
465 » » » » » » s := newTestStream(props) 472 » » » » » » » p.Name = fmt.Sprintf("st ream%d", i)
473 » » » » » » })
466 streams = append(streams, s) 474 streams = append(streams, s)
467 s.data([]byte("test data"), io.E OF) 475 s.data([]byte("test data"), io.E OF)
468 tss.enqueue(s) 476 tss.enqueue(s)
469 } 477 }
470 478
471 b.Activate() 479 b.Activate()
472 So(b.Wait(), ShouldBeNil) 480 So(b.Wait(), ShouldBeNil)
473 481
474 for _, s := range streams { 482 for _, s := range streams {
475 So(to.logs(s.properties.Name), s houldHaveTextLogs, "test data") 483 So(to.logs(s.properties.Name), s houldHaveTextLogs, "test data")
476 So(to.isTerminal(s.properties.Na me), ShouldBeTrue) 484 So(to.isTerminal(s.properties.Na me), ShouldBeTrue)
477 } 485 }
478 }) 486 })
479 487
480 Convey(`Can register both during Run and will re tain streams.`, func() { 488 Convey(`Can register both during Run and will re tain streams.`, func() {
481 b := mkb(c, conf) 489 b := mkb(c, conf)
482 for i, tss := range servers { 490 for i, tss := range servers {
483 b.AddStreamServer(tss) 491 b.AddStreamServer(tss)
484 492
485 » » » » » » props.Name = fmt.Sprintf("stream %d", i) 493 » » » » » » s := newTestStream(func(p *strea mproto.Properties) {
486 » » » » » » s := newTestStream(props) 494 » » » » » » » p.Name = fmt.Sprintf("st ream%d", i)
495 » » » » » » })
487 streams = append(streams, s) 496 streams = append(streams, s)
488 s.data([]byte("test data"), io.E OF) 497 s.data([]byte("test data"), io.E OF)
489 tss.enqueue(s) 498 tss.enqueue(s)
490 } 499 }
491 500
492 b.Activate() 501 b.Activate()
493 So(b.Wait(), ShouldBeNil) 502 So(b.Wait(), ShouldBeNil)
494 503
495 for _, s := range streams { 504 for _, s := range streams {
496 So(to.logs(s.properties.Name), s houldHaveTextLogs, "test data") 505 So(to.logs(s.properties.Name), s houldHaveTextLogs, "test data")
497 So(to.isTerminal(s.properties.Na me), ShouldBeTrue) 506 So(to.isTerminal(s.properties.Na me), ShouldBeTrue)
498 } 507 }
499 }) 508 })
500 }) 509 })
501 510
502 Convey(`Will ignore stream registration errors, allowing re-registration.`, func() { 511 Convey(`Will ignore stream registration errors, allowing re-registration.`, func() {
503 tss := newTestStreamServer() 512 tss := newTestStreamServer()
504 513
505 // Generate an invalid stream for "tss" to regis ter. 514 // Generate an invalid stream for "tss" to regis ter.
506 » » » » sGood := newTestStream(props) 515 » » » » sGood := newTestStream(nil)
507 sGood.data([]byte("good test data"), io.EOF) 516 sGood.data([]byte("good test data"), io.EOF)
508 517
509 » » » » props.ContentType = "" 518 » » » » sBad := newTestStream(func(p *streamproto.Proper ties) {
510 » » » » sBad := newTestStream(props) 519 » » » » » p.ContentType = ""
520 » » » » })
511 sBad.data([]byte("bad test data"), io.EOF) 521 sBad.data([]byte("bad test data"), io.EOF)
512 522
513 b := mkb(c, conf) 523 b := mkb(c, conf)
514 b.AddStreamServer(tss) 524 b.AddStreamServer(tss)
515 tss.enqueue(sBad) 525 tss.enqueue(sBad)
516 tss.enqueue(sGood) 526 tss.enqueue(sGood)
517 b.Activate() 527 b.Activate()
518 So(b.Wait(), ShouldBeNil) 528 So(b.Wait(), ShouldBeNil)
519 529
520 So(sBad.isClosed(), ShouldBeTrue) 530 So(sBad.isClosed(), ShouldBeTrue)
521 So(sGood.isClosed(), ShouldBeTrue) 531 So(sGood.isClosed(), ShouldBeTrue)
522 » » » » So(to.logs(props.Name), shouldHaveTextLogs, "goo d test data") 532 » » » » So(to.logs("test"), shouldHaveTextLogs, "good te st data")
523 » » » » So(to.isTerminal(props.Name), ShouldBeTrue) 533 » » » » So(to.isTerminal("test"), ShouldBeTrue)
524 }) 534 })
525 }) 535 })
526 536
527 Convey(`Will terminate if the stream server panics.`, func() { 537 Convey(`Will terminate if the stream server panics.`, func() {
528 tss := newTestStreamServer() 538 tss := newTestStreamServer()
529 tss.onNext = func() { 539 tss.onNext = func() {
530 panic("test panic") 540 panic("test panic")
531 } 541 }
532 542
533 b := mkb(c, conf) 543 b := mkb(c, conf)
534 b.AddStreamServer(tss) 544 b.AddStreamServer(tss)
535 So(b.Wait(), ShouldErrLike, "test panic") 545 So(b.Wait(), ShouldErrLike, "test panic")
536 }) 546 })
537 }) 547 })
538 } 548 }
OLDNEW
« no previous file with comments | « logdog/client/butler/butler.go ('k') | logdog/client/butler/streamserver/handshake_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698