| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamserver |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "net" |
| 13 "testing" |
| 14 "time" |
| 15 |
| 16 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 17 . "github.com/smartystreets/goconvey/convey" |
| 18 "golang.org/x/net/context" |
| 19 ) |
| 20 |
| 21 // Testing interface for OS-level test routine abstraction. |
| 22 type namedPipeTester interface { |
| 23 // Decorator to create and destroy a NamedPipeServer instance. |
| 24 withServer(func(i *namedPipeTestInstance)) func() |
| 25 } |
| 26 |
| 27 // Base class for a named pipe test instance, bound to a single server. |
| 28 type namedPipeTestInstance struct { |
| 29 S *namedPipeServer |
| 30 connect func() io.WriteCloser |
| 31 } |
| 32 |
| 33 type testAddr string |
| 34 |
| 35 func (a testAddr) Network() string { return string(a) } |
| 36 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network())
} |
| 37 |
| 38 type testConn struct { |
| 39 bytes.Buffer |
| 40 |
| 41 readDeadline time.Time |
| 42 writeDeadline time.Time |
| 43 } |
| 44 |
| 45 func (c *testConn) Close() error { return nil } |
| 46 func (c *testConn) LocalAddr() net.Addr { return testAddr("local") } |
| 47 func (c *testConn) RemoteAddr() net.Addr { return testAddr("remote") } |
| 48 |
| 49 func (c *testConn) SetReadDeadline(t time.Time) error { |
| 50 c.readDeadline = t |
| 51 return nil |
| 52 } |
| 53 |
| 54 func (c *testConn) SetWriteDeadline(t time.Time) error { |
| 55 c.writeDeadline = t |
| 56 return nil |
| 57 } |
| 58 |
| 59 func (c *testConn) SetDeadline(t time.Time) error { |
| 60 c.readDeadline = t |
| 61 c.writeDeadline = t |
| 62 return nil |
| 63 } |
| 64 |
| 65 func testNamedPipeServer(t *testing.T, npt namedPipeTester) { |
| 66 Convey(`In a testing environment`, t, func() { |
| 67 c := context.Background() |
| 68 hb := handshakeBuilder{ |
| 69 magic: streamproto.ProtocolFrameHeaderMagic, |
| 70 } |
| 71 |
| 72 Convey(`A named pipe server will panic if closed without listeni
ng.`, func() { |
| 73 s := &namedPipeServer{} |
| 74 So(func() { s.Close() }, ShouldPanic) |
| 75 }) |
| 76 |
| 77 Convey(`A listening named pipe server will panic if double-close
d.`, |
| 78 npt.withServer(func(i *namedPipeTestInstance) { |
| 79 i.S.Close() |
| 80 So(func() { i.S.Close() }, ShouldPanic) |
| 81 })) |
| 82 |
| 83 Convey(`A test client connection`, func() { |
| 84 tc := testConn{} |
| 85 npc := &namedPipeConn{ |
| 86 id: 1337, |
| 87 conn: &tc, |
| 88 name: "test", |
| 89 } |
| 90 |
| 91 Convey(`Will reject an invalid handshake magic.`, func()
{ |
| 92 hb.magic = []byte(`NOT A HANDSHAKE MAGIC`) |
| 93 hb.writeTo(&tc, "", nil) |
| 94 So(func() { |
| 95 npc.negotiate(c) |
| 96 }, ShouldPanic) |
| 97 }) |
| 98 |
| 99 Convey(`Will reject an invalid handshake.`, func() { |
| 100 hb.writeTo(&tc, "CLEARLY NOT JSON", nil) |
| 101 So(func() { |
| 102 npc.negotiate(c) |
| 103 }, ShouldPanic) |
| 104 }) |
| 105 }) |
| 106 |
| 107 Convey(`A listening named pipe server`, npt.withServer(func(i *n
amedPipeTestInstance) { |
| 108 // Setup a goroutine to pipe test data through a client
connection. |
| 109 readerC := make(chan io.Reader) |
| 110 defer close(readerC) |
| 111 |
| 112 doneC := make(chan struct{}) |
| 113 go func() { |
| 114 defer close(doneC) |
| 115 |
| 116 // Create a connection to our server instance. |
| 117 w := i.connect() |
| 118 if w == nil { |
| 119 return |
| 120 } |
| 121 defer w.Close() |
| 122 |
| 123 // Write supplied data to the client connection. |
| 124 r := <-readerC |
| 125 if r != nil { |
| 126 io.Copy(w, r) |
| 127 } |
| 128 }() |
| 129 |
| 130 Convey(`Can receive stream data.`, func() { |
| 131 // Write our handshake and data to the stream. |
| 132 handshake := `{"name": "test", "contentType": "a
pplication/octet-stream"}` |
| 133 content := bytes.Repeat([]byte("THIS IS A TEST S
TREAM "), 100) |
| 134 readerC <- hb.reader(handshake, content) |
| 135 |
| 136 // Retrieve the ensuing stream. |
| 137 stream, props := i.S.Next() |
| 138 So(stream, ShouldNotBeNil) |
| 139 defer stream.Close() |
| 140 So(props, ShouldNotBeNil) |
| 141 |
| 142 <-doneC |
| 143 |
| 144 // Consume all of the data in the stream. |
| 145 recvData, _ := ioutil.ReadAll(stream) |
| 146 So(recvData, ShouldResemble, content) |
| 147 }) |
| 148 |
| 149 Convey(`Will exit Next if closed.`, func() { |
| 150 type streamBundle struct { |
| 151 rc io.ReadCloser |
| 152 props *streamproto.Properties |
| 153 } |
| 154 streamC := make(chan *streamBundle) |
| 155 defer close(streamC) |
| 156 |
| 157 // Get the stream. |
| 158 go func() { |
| 159 rc, props := i.S.Next() |
| 160 streamC <- &streamBundle{rc, props} |
| 161 }() |
| 162 |
| 163 // Close the stream server. |
| 164 i.S.Close() |
| 165 |
| 166 // Next must exit with nil. |
| 167 bundle := <-streamC |
| 168 So(bundle.rc, ShouldBeNil) |
| 169 So(bundle.props, ShouldBeNil) |
| 170 }) |
| 171 })) |
| 172 }) |
| 173 } |
| OLD | NEW |