Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: logdog/client/butlerlib/streamclient/stream.go

Issue 2737603003: Butler stream servers can generate client address. (Closed)
Patch Set: better comment Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package streamclient 5 package streamclient
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "io" 9 "io"
10 10
11 "github.com/luci/luci-go/common/data/recordio" 11 "github.com/luci/luci-go/common/data/recordio"
12 "github.com/luci/luci-go/logdog/api/logpb" 12 "github.com/luci/luci-go/logdog/api/logpb"
13 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto"
14 ) 14 )
15 15
16 // Stream is an individual LogDog Butler stream. 16 // Stream is an individual LogDog Butler stream.
17 type Stream interface { 17 type Stream interface {
18 io.WriteCloser 18 io.WriteCloser
19 19
20 // WriteDatagram writes a LogDog Butler streaming datagram to the underl ying 20 // WriteDatagram writes a LogDog Butler streaming datagram to the underl ying
21 // Writer. 21 // Writer.
22 WriteDatagram([]byte) error 22 WriteDatagram([]byte) error
23 23
24 // Properties returns a copy of this Stream's properties. 24 // Properties returns a copy of this Stream's properties.
25 Properties() *streamproto.Properties 25 Properties() *streamproto.Properties
26 } 26 }
27 27
28 // streamImpl is the standard implementation of the Stream interface. 28 // BaseStream is the standard implementation of the Stream interface.
29 type streamImpl struct { 29 type BaseStream struct {
30 » // WriteCloser (required) is the stream's underlying io.WriteCloser.
30 io.WriteCloser 31 io.WriteCloser
31 32
32 » // props is this stream's properties. 33 » // P (required) is this stream's properties.
33 » props *streamproto.Properties 34 » P *streamproto.Properties
34 35
35 // rioW is a recordio.Writer bound to the WriteCloser. This will be 36 // rioW is a recordio.Writer bound to the WriteCloser. This will be
36 // initialized on the first writeRecord invocation. 37 // initialized on the first writeRecord invocation.
37 rioW recordio.Writer 38 rioW recordio.Writer
38 } 39 }
39 40
40 var _ Stream = (*streamImpl)(nil) 41 var _ Stream = (*BaseStream)(nil)
41 42
42 func (s *streamImpl) WriteDatagram(dg []byte) error { 43 // WriteDatagram implements StreamClient.
44 func (s *BaseStream) WriteDatagram(dg []byte) error {
43 if !s.isDatagramStream() { 45 if !s.isDatagramStream() {
44 return errors.New("not a datagram stream") 46 return errors.New("not a datagram stream")
45 } 47 }
46 48
47 return s.writeRecord(dg) 49 return s.writeRecord(dg)
48 } 50 }
49 51
50 func (s *streamImpl) Write(data []byte) (int, error) { 52 // Write implements StreamClient.
53 func (s *BaseStream) Write(data []byte) (int, error) {
51 if s.isDatagramStream() { 54 if s.isDatagramStream() {
52 return 0, errors.New("cannot use Write with datagram stream") 55 return 0, errors.New("cannot use Write with datagram stream")
53 } 56 }
54 57
55 return s.writeRaw(data) 58 return s.writeRaw(data)
56 } 59 }
57 60
58 func (s *streamImpl) writeRaw(data []byte) (int, error) { 61 func (s *BaseStream) writeRaw(data []byte) (int, error) {
59 return s.WriteCloser.Write(data) 62 return s.WriteCloser.Write(data)
60 } 63 }
61 64
62 func (s *streamImpl) writeRecord(r []byte) error { 65 func (s *BaseStream) writeRecord(r []byte) error {
63 if s.rioW == nil { 66 if s.rioW == nil {
64 s.rioW = recordio.NewWriter(s.WriteCloser) 67 s.rioW = recordio.NewWriter(s.WriteCloser)
65 } 68 }
66 if _, err := s.rioW.Write(r); err != nil { 69 if _, err := s.rioW.Write(r); err != nil {
67 return err 70 return err
68 } 71 }
69 return s.rioW.Flush() 72 return s.rioW.Flush()
70 } 73 }
71 74
72 func (s *streamImpl) isDatagramStream() bool { 75 func (s *BaseStream) isDatagramStream() bool {
73 » return s.props.StreamType == logpb.StreamType_DATAGRAM 76 » return s.P.StreamType == logpb.StreamType_DATAGRAM
74 } 77 }
75 78
76 func (s *streamImpl) Properties() *streamproto.Properties { return s.props.Clone () } 79 // Properties implements StreamClient.
80 func (s *BaseStream) Properties() *streamproto.Properties { return s.P.Clone() }
OLDNEW
« no previous file with comments | « logdog/client/butlerlib/streamclient/local.go ('k') | logdog/client/butlerlib/streamclient/stream_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698