| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package streamserver | 5 package streamserver |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "io/ioutil" | 11 "io/ioutil" |
| 12 "net" | 12 "net" |
| 13 "testing" | 13 "testing" |
| 14 "time" | 14 "time" |
| 15 | 15 |
| 16 "github.com/luci/luci-go/common/clock/clockflag" |
| 17 "github.com/luci/luci-go/common/clock/testclock" |
| 18 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" |
| 16 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 19 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
| 20 |
| 21 "golang.org/x/net/context" |
| 22 |
| 17 . "github.com/smartystreets/goconvey/convey" | 23 . "github.com/smartystreets/goconvey/convey" |
| 18 "golang.org/x/net/context" | |
| 19 ) | 24 ) |
| 20 | 25 |
| 21 type testAddr string | 26 type testAddr string |
| 22 | 27 |
| 23 func (a testAddr) Network() string { return string(a) } | 28 func (a testAddr) Network() string { return string(a) } |
| 24 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network())
} | 29 func (a testAddr) String() string { return fmt.Sprintf("test(%s)", a.Network())
} |
| 25 | 30 |
| 26 type testListener struct { | 31 type testListener struct { |
| 27 err error | 32 err error |
| 28 connC chan *testListenerConn | 33 connC chan *testListenerConn |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 98 t.Parallel() | 103 t.Parallel() |
| 99 | 104 |
| 100 Convey(`A stream server using a testing Listener`, t, func() { | 105 Convey(`A stream server using a testing Listener`, t, func() { |
| 101 hb := handshakeBuilder{ | 106 hb := handshakeBuilder{ |
| 102 magic: streamproto.ProtocolFrameHeaderMagic, | 107 magic: streamproto.ProtocolFrameHeaderMagic, |
| 103 } | 108 } |
| 104 | 109 |
| 105 var tl *testListener | 110 var tl *testListener |
| 106 s := &listenerStreamServer{ | 111 s := &listenerStreamServer{ |
| 107 Context: context.Background(), | 112 Context: context.Background(), |
| 108 » » » gen: func() (net.Listener, error) { | 113 » » » gen: func() (net.Listener, string, error) { |
| 109 if tl != nil { | 114 if tl != nil { |
| 110 panic("gen called more than once") | 115 panic("gen called more than once") |
| 111 } | 116 } |
| 112 tl = newTestListener() | 117 tl = newTestListener() |
| 113 » » » » return tl, nil | 118 » » » » return tl, "test", nil |
| 114 }, | 119 }, |
| 115 } | 120 } |
| 116 | 121 |
| 117 Convey(`Will panic if closed without listening.`, func() { | 122 Convey(`Will panic if closed without listening.`, func() { |
| 118 So(func() { s.Close() }, ShouldPanic) | 123 So(func() { s.Close() }, ShouldPanic) |
| 119 }) | 124 }) |
| 120 | 125 |
| 121 Convey(`Will fail to Listen if the Listener could not be created
.`, func() { | 126 Convey(`Will fail to Listen if the Listener could not be created
.`, func() { |
| 122 » » » s.gen = func() (net.Listener, error) { | 127 » » » s.gen = func() (net.Listener, string, error) { |
| 123 » » » » return nil, errors.New("test error") | 128 » » » » return nil, "", errors.New("test error") |
| 124 } | 129 } |
| 125 So(s.Listen(), ShouldNotBeNil) | 130 So(s.Listen(), ShouldNotBeNil) |
| 126 }) | 131 }) |
| 127 | 132 |
| 128 Convey(`Can Listen for connections.`, func() { | 133 Convey(`Can Listen for connections.`, func() { |
| 129 shouldClose := true | 134 shouldClose := true |
| 130 So(s.Listen(), ShouldBeNil) | 135 So(s.Listen(), ShouldBeNil) |
| 131 defer func() { | 136 defer func() { |
| 132 if shouldClose { | 137 if shouldClose { |
| 133 s.Close() | 138 s.Close() |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 221 tl.connect(tc) | 226 tl.connect(tc) |
| 222 s.Close() | 227 s.Close() |
| 223 shouldClose = false | 228 shouldClose = false |
| 224 | 229 |
| 225 So(<-s.discardC, ShouldNotBeNil) | 230 So(<-s.discardC, ShouldNotBeNil) |
| 226 }) | 231 }) |
| 227 | 232 |
| 228 }) | 233 }) |
| 229 }) | 234 }) |
| 230 } | 235 } |
| 236 |
| 237 // testClientServer tests to ensure that a client can create streams with a |
| 238 // server. |
| 239 // |
| 240 // svr must be in listening state when this is called. |
| 241 func testClientServer(t *testing.T, svr StreamServer, client streamclient.Client
) { |
| 242 flags := streamproto.Flags{ |
| 243 Name: "foo/bar", |
| 244 Timestamp: clockflag.Time(testclock.TestTimeLocal), |
| 245 } |
| 246 data := []byte("ohaithere") |
| 247 |
| 248 clientDoneC := make(chan error) |
| 249 go func() { |
| 250 var err error |
| 251 defer func() { |
| 252 clientDoneC <- err |
| 253 }() |
| 254 |
| 255 var stream streamclient.Stream |
| 256 if stream, err = client.NewStream(flags); err != nil { |
| 257 return |
| 258 } |
| 259 defer func() { |
| 260 if closeErr := stream.Close(); closeErr != nil && err ==
nil { |
| 261 err = closeErr |
| 262 } |
| 263 }() |
| 264 |
| 265 if _, err = stream.Write(data); err != nil { |
| 266 return |
| 267 } |
| 268 }() |
| 269 |
| 270 rc, props := svr.Next() |
| 271 defer rc.Close() |
| 272 |
| 273 So(props, ShouldResemble, flags.Properties()) |
| 274 |
| 275 var buf bytes.Buffer |
| 276 _, err := buf.ReadFrom(rc) |
| 277 So(err, ShouldBeNil) |
| 278 So(buf.Bytes(), ShouldResemble, data) |
| 279 |
| 280 So(<-clientDoneC, ShouldBeNil) |
| 281 } |
| OLD | NEW |