Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package streamclient | |
| 6 | |
| 7 import ( | |
| 8 "encoding/json" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 "strings" | |
| 12 "sync" | |
| 13 | |
| 14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | |
| 15 ) | |
| 16 | |
| 17 type clientFactory func(string) (Client, error) | |
| 18 | |
| 19 var ( | |
| 20 // ProtocolFrameHeaderMagic is the number at the beginning of streams th at | |
| 21 // identifies the stream handshake version. | |
| 22 ProtocolFrameHeaderMagic = []byte("BTLR1\x1E") | |
|
estaab
2015/11/06 22:09:44
If and when will this change, and what would it ch
dnj (Google)
2015/11/07 16:47:41
No idea - some other byte string that doesn't begi
| |
| 23 | |
| 24 // defaultRegistry is the default protocol registry. | |
| 25 defaultRegistry = &protocolRegistry{} | |
| 26 ) | |
| 27 | |
| 28 type protocolRegistry struct { | |
| 29 sync.Mutex | |
| 30 | |
| 31 // protocols is the set of registered protocols. Each client should regi ster | |
| 32 // via registerProtocol in its init() method. | |
| 33 protocols map[string]clientFactory | |
| 34 } | |
| 35 | |
| 36 func (r *protocolRegistry) register(name string, f clientFactory) { | |
| 37 r.Lock() | |
| 38 defer r.Unlock() | |
| 39 | |
| 40 if _, ok := r.protocols[name]; ok { | |
| 41 panic(fmt.Errorf("streamclient: protocol already registered for [%s]", name)) | |
| 42 } | |
| 43 if r.protocols == nil { | |
| 44 r.protocols = make(map[string]clientFactory) | |
| 45 } | |
| 46 r.protocols[name] = f | |
| 47 } | |
| 48 | |
| 49 func (r *protocolRegistry) newClient(path string) (Client, error) { | |
|
estaab
2015/11/06 22:09:44
It's not clear from just the name and type what th
| |
| 50 parts := strings.SplitN(path, ":", 2) | |
| 51 params := "" | |
| 52 if len(parts) == 2 { | |
| 53 params = parts[1] | |
| 54 } | |
| 55 | |
| 56 if f, ok := r.protocols[parts[0]]; ok { | |
|
estaab
2015/11/06 22:09:44
Do we not need to hold a reader lock when reading
dnj (Google)
2015/11/07 16:47:41
As long as nothing's writing, we don't need to hol
| |
| 57 return f(params) | |
| 58 } | |
| 59 return nil, fmt.Errorf("streamclient: no protocol registered for [%s]", parts[0]) | |
| 60 } | |
| 61 | |
| 62 func registerProtocol(name string, f clientFactory) { | |
| 63 defaultRegistry.register(name, f) | |
| 64 } | |
| 65 | |
| 66 // Client is a client to a LogDog Butler StreamServer. A Client will connect | |
| 67 // to a StreamServer, negotiate a stream configuration, and return an active | |
| 68 // stream object that can be written to. | |
| 69 type Client interface { | |
|
estaab
2015/11/06 22:09:44
Are there any style guidelines around ordering of
dnj (Google)
2015/11/07 16:47:41
Guidelines are for readability. I split the regist
| |
| 70 // NewStream creates a new stream with the supplied stream properties. | |
| 71 NewStream(f streamproto.Flags) (Stream, error) | |
| 72 } | |
| 73 | |
| 74 // streamFactory is a factory method to generate an io.WriteCloser stream for | |
| 75 // the current clientImpl. | |
| 76 type streamFactory func() (io.WriteCloser, error) | |
| 77 | |
| 78 // clientImpl is an implementation of the Client interface using a net.Conn | |
| 79 // factory to generate an individual stream. | |
| 80 type clientImpl struct { | |
| 81 // network is the connection path to the stream server. | |
| 82 factory streamFactory | |
| 83 } | |
| 84 | |
| 85 // New instantiates a new Client instance. This type of instance will be parsed | |
| 86 // from the supplied path string, which takes the form: | |
| 87 // <protocol>:<protocol-specific-spec> | |
| 88 // | |
| 89 // Supported protocols and their respective specs are: | |
| 90 // - `tcp4:[port]` (The stream server is listening on localhost at the specifi ed | |
| 91 // port.) | |
| 92 func New(path string) (Client, error) { | |
| 93 return defaultRegistry.newClient(path) | |
| 94 } | |
| 95 | |
| 96 func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { | |
| 97 p := f.Properties() | |
| 98 if err := p.Validate(); err != nil { | |
| 99 return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err) | |
| 100 } | |
| 101 | |
| 102 client, err := c.factory() | |
| 103 if err != nil { | |
| 104 return nil, err | |
| 105 } | |
| 106 passing := false | |
| 107 defer func() { | |
| 108 // If we haven't written out the connection, close. | |
| 109 if !passing { | |
| 110 client.Close() | |
| 111 } | |
| 112 }() | |
| 113 | |
| 114 data, err := json.Marshal(f) | |
| 115 if err != nil { | |
| 116 return nil, fmt.Errorf("failed to marshal properties JSON: %s", err) | |
| 117 } | |
| 118 | |
| 119 // Perform the handshake: magic + size(data) + data. | |
| 120 s := &streamImpl{ | |
| 121 Properties: p, | |
| 122 WriteCloser: client, | |
| 123 } | |
| 124 if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil { | |
| 125 return nil, fmt.Errorf("failed to write magic number: %s", err) | |
| 126 } | |
| 127 if err := s.writeRecord(data); err != nil { | |
| 128 return nil, fmt.Errorf("failed to write properties: %s", err) | |
| 129 } | |
| 130 | |
| 131 passing = true | |
| 132 return s, nil | |
| 133 } | |
| OLD | NEW |