Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Unified Diff: client/internal/logdog/butler/streamserver/namedPipe_test.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Bind POSIX test to POSIX domains. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/streamserver/namedPipe_test.go
diff --git a/client/internal/logdog/butler/streamserver/namedPipe_test.go b/client/internal/logdog/butler/streamserver/namedPipe_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..5ba9be1705136c7ad3412f9d114d1843e155d913
--- /dev/null
+++ b/client/internal/logdog/butler/streamserver/namedPipe_test.go
@@ -0,0 +1,173 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package streamserver
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "testing"
+ "time"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+ . "github.com/smartystreets/goconvey/convey"
+ "golang.org/x/net/context"
+)
+
+// Testing interface for OS-level test routine abstraction.
+type namedPipeTester interface {
+ // Decorator to create and destroy a NamedPipeServer instance.
+ withServer(func(i *namedPipeTestInstance)) func()
+}
+
+// Base class for a named pipe test instance, bound to a single server.
+type namedPipeTestInstance struct {
+ S *namedPipeServer
+ connect func() io.WriteCloser
+}
+
+type testAddr string
+
+func (a testAddr) Network() string { return string(a) }
+func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network()) }
+
+type testConn struct {
+ bytes.Buffer
+
+ readDeadline time.Time
+ writeDeadline time.Time
+}
+
+func (c *testConn) Close() error { return nil }
+func (c *testConn) LocalAddr() net.Addr { return testAddr("local") }
+func (c *testConn) RemoteAddr() net.Addr { return testAddr("remote") }
+
+func (c *testConn) SetReadDeadline(t time.Time) error {
+ c.readDeadline = t
+ return nil
+}
+
+func (c *testConn) SetWriteDeadline(t time.Time) error {
+ c.writeDeadline = t
+ return nil
+}
+
+func (c *testConn) SetDeadline(t time.Time) error {
+ c.readDeadline = t
+ c.writeDeadline = t
+ return nil
+}
+
+func testNamedPipeServer(t *testing.T, npt namedPipeTester) {
+ Convey(`In a testing environment`, t, func() {
+ c := context.Background()
+ hb := handshakeBuilder{
+ magic: streamproto.ProtocolFrameHeaderMagic,
+ }
+
+ Convey(`A named pipe server will panic if closed without listening.`, func() {
+ s := &namedPipeServer{}
+ So(func() { s.Close() }, ShouldPanic)
+ })
+
+ Convey(`A listening named pipe server will panic if double-closed.`,
+ npt.withServer(func(i *namedPipeTestInstance) {
+ i.S.Close()
+ So(func() { i.S.Close() }, ShouldPanic)
+ }))
+
+ Convey(`A test client connection`, func() {
+ tc := testConn{}
+ npc := &namedPipeConn{
+ id: 1337,
+ conn: &tc,
+ name: "test",
+ }
+
+ Convey(`Will reject an invalid handshake magic.`, func() {
+ hb.magic = []byte(`NOT A HANDSHAKE MAGIC`)
+ hb.writeTo(&tc, "", nil)
+ So(func() {
+ npc.negotiate(c)
+ }, ShouldPanic)
+ })
+
+ Convey(`Will reject an invalid handshake.`, func() {
+ hb.writeTo(&tc, "CLEARLY NOT JSON", nil)
+ So(func() {
+ npc.negotiate(c)
+ }, ShouldPanic)
+ })
+ })
+
+ Convey(`A listening named pipe server`, npt.withServer(func(i *namedPipeTestInstance) {
+ // Setup a goroutine to pipe test data through a client connection.
+ readerC := make(chan io.Reader)
+ defer close(readerC)
+
+ doneC := make(chan struct{})
+ go func() {
+ defer close(doneC)
+
+ // Create a connection to our server instance.
+ w := i.connect()
+ if w == nil {
+ return
+ }
+ defer w.Close()
+
+ // Write supplied data to the client connection.
+ r := <-readerC
+ if r != nil {
+ io.Copy(w, r)
+ }
+ }()
+
+ Convey(`Can receive stream data.`, func() {
+ // Write our handshake and data to the stream.
+ handshake := `{"name": "test", "contentType": "application/octet-stream"}`
+ content := bytes.Repeat([]byte("THIS IS A TEST STREAM "), 100)
+ readerC <- hb.reader(handshake, content)
+
+ // Retrieve the ensuing stream.
+ stream, props := i.S.Next()
+ So(stream, ShouldNotBeNil)
+ defer stream.Close()
+ So(props, ShouldNotBeNil)
+
+ <-doneC
+
+ // Consume all of the data in the stream.
+ recvData, _ := ioutil.ReadAll(stream)
+ So(recvData, ShouldResemble, content)
+ })
+
+ Convey(`Will exit Next if closed.`, func() {
+ type streamBundle struct {
+ rc io.ReadCloser
+ props *streamproto.Properties
+ }
+ streamC := make(chan *streamBundle)
+ defer close(streamC)
+
+ // Get the stream.
+ go func() {
+ rc, props := i.S.Next()
+ streamC <- &streamBundle{rc, props}
+ }()
+
+ // Close the stream server.
+ i.S.Close()
+
+ // Next must exit with nil.
+ bundle := <-streamC
+ So(bundle.rc, ShouldBeNil)
+ So(bundle.props, ShouldBeNil)
+ })
+ }))
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698