Chromium Code Reviews| Index: client/logdog/butlerlib/streamclient/client.go |
| diff --git a/client/logdog/butlerlib/streamclient/client.go b/client/logdog/butlerlib/streamclient/client.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..65d1f55179141756e7758832edbe7b08cf2eb1f1 |
| --- /dev/null |
| +++ b/client/logdog/butlerlib/streamclient/client.go |
| @@ -0,0 +1,133 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package streamclient |
| + |
| +import ( |
| + "encoding/json" |
| + "fmt" |
| + "io" |
| + "strings" |
| + "sync" |
| + |
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| +) |
| + |
| +type clientFactory func(string) (Client, error) |
| + |
| +var ( |
| + // ProtocolFrameHeaderMagic is the number at the beginning of streams that |
| + // identifies the stream handshake version. |
| + ProtocolFrameHeaderMagic = []byte("BTLR1\x1E") |
|
estaab
2015/11/06 22:09:44
If and when will this change, and what would it ch
dnj (Google)
2015/11/07 16:47:41
No idea - some other byte string that doesn't begi
|
| + |
| + // defaultRegistry is the default protocol registry. |
| + defaultRegistry = &protocolRegistry{} |
| +) |
| + |
| +type protocolRegistry struct { |
| + sync.Mutex |
| + |
| + // protocols is the set of registered protocols. Each client should register |
| + // via registerProtocol in its init() method. |
| + protocols map[string]clientFactory |
| +} |
| + |
| +func (r *protocolRegistry) register(name string, f clientFactory) { |
| + r.Lock() |
| + defer r.Unlock() |
| + |
| + if _, ok := r.protocols[name]; ok { |
| + panic(fmt.Errorf("streamclient: protocol already registered for [%s]", name)) |
| + } |
| + if r.protocols == nil { |
| + r.protocols = make(map[string]clientFactory) |
| + } |
| + r.protocols[name] = f |
| +} |
| + |
| +func (r *protocolRegistry) newClient(path string) (Client, error) { |
|
estaab
2015/11/06 22:09:44
It's not clear from just the name and type what th
|
| + parts := strings.SplitN(path, ":", 2) |
| + params := "" |
| + if len(parts) == 2 { |
| + params = parts[1] |
| + } |
| + |
| + if f, ok := r.protocols[parts[0]]; ok { |
|
estaab
2015/11/06 22:09:44
Do we not need to hold a reader lock when reading
dnj (Google)
2015/11/07 16:47:41
As long as nothing's writing, we don't need to hol
|
| + return f(params) |
| + } |
| + return nil, fmt.Errorf("streamclient: no protocol registered for [%s]", parts[0]) |
| +} |
| + |
| +func registerProtocol(name string, f clientFactory) { |
| + defaultRegistry.register(name, f) |
| +} |
| + |
| +// Client is a client to a LogDog Butler StreamServer. A Client will connect |
| +// to a StreamServer, negotiate a stream configuration, and return an active |
| +// stream object that can be written to. |
| +type Client interface { |
|
estaab
2015/11/06 22:09:44
Are there any style guidelines around ordering of
dnj (Google)
2015/11/07 16:47:41
Guidelines are for readability. I split the regist
|
| + // NewStream creates a new stream with the supplied stream properties. |
| + NewStream(f streamproto.Flags) (Stream, error) |
| +} |
| + |
| +// streamFactory is a factory method to generate an io.WriteCloser stream for |
| +// the current clientImpl. |
| +type streamFactory func() (io.WriteCloser, error) |
| + |
| +// clientImpl is an implementation of the Client interface using a net.Conn |
| +// factory to generate an individual stream. |
| +type clientImpl struct { |
| + // network is the connection path to the stream server. |
| + factory streamFactory |
| +} |
| + |
| +// New instantiates a new Client instance. This type of instance will be parsed |
| +// from the supplied path string, which takes the form: |
| +// <protocol>:<protocol-specific-spec> |
| +// |
| +// Supported protocols and their respective specs are: |
| +// - `tcp4:[port]` (The stream server is listening on localhost at the specified |
| +// port.) |
| +func New(path string) (Client, error) { |
| + return defaultRegistry.newClient(path) |
| +} |
| + |
| +func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { |
| + p := f.Properties() |
| + if err := p.Validate(); err != nil { |
| + return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err) |
| + } |
| + |
| + client, err := c.factory() |
| + if err != nil { |
| + return nil, err |
| + } |
| + passing := false |
| + defer func() { |
| + // If we haven't written out the connection, close. |
| + if !passing { |
| + client.Close() |
| + } |
| + }() |
| + |
| + data, err := json.Marshal(f) |
| + if err != nil { |
| + return nil, fmt.Errorf("failed to marshal properties JSON: %s", err) |
| + } |
| + |
| + // Perform the handshake: magic + size(data) + data. |
| + s := &streamImpl{ |
| + Properties: p, |
| + WriteCloser: client, |
| + } |
| + if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil { |
| + return nil, fmt.Errorf("failed to write magic number: %s", err) |
| + } |
| + if err := s.writeRecord(data); err != nil { |
| + return nil, fmt.Errorf("failed to write properties: %s", err) |
| + } |
| + |
| + passing = true |
| + return s, nil |
| +} |