| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamclient |
| 6 |
| 7 import ( |
| 8 "encoding/json" |
| 9 "fmt" |
| 10 "io" |
| 11 |
| 12 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 13 ) |
| 14 |
| 15 type clientFactory func(string) (Client, error) |
| 16 |
| 17 // Client is a client to a LogDog Butler StreamServer. A Client will connect |
| 18 // to a StreamServer, negotiate a stream configuration, and return an active |
| 19 // stream object that can be written to. |
| 20 type Client interface { |
| 21 // NewStream creates a new stream with the supplied stream properties. |
| 22 NewStream(f streamproto.Flags) (Stream, error) |
| 23 } |
| 24 |
| 25 // streamFactory is a factory method to generate an io.WriteCloser stream for |
| 26 // the current clientImpl. |
| 27 type streamFactory func() (io.WriteCloser, error) |
| 28 |
| 29 // clientImpl is an implementation of the Client interface using a net.Conn |
| 30 // factory to generate an individual stream. |
| 31 type clientImpl struct { |
| 32 // network is the connection path to the stream server. |
| 33 factory streamFactory |
| 34 } |
| 35 |
| 36 // New instantiates a new Client instance. This type of instance will be parsed |
| 37 // from the supplied path string, which takes the form: |
| 38 // <protocol>:<protocol-specific-spec> |
| 39 // |
| 40 // Supported protocols and their respective specs are: |
| 41 // - unix:/path/to/socket describes a stream server listening on UNIX domain |
| 42 // socket at "/path/to/socket". |
| 43 // |
| 44 // Windows-only: |
| 45 // - net.pipe:name describes a stream server listening on Windows named pipe |
| 46 // "\\.\pipe\name". |
| 47 func New(path string) (Client, error) { |
| 48 return defaultRegistry.newClient(path) |
| 49 } |
| 50 |
| 51 func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) { |
| 52 p := f.Properties() |
| 53 if err := p.Validate(); err != nil { |
| 54 return nil, fmt.Errorf("streamclient: invalid stream properties:
%s", err) |
| 55 } |
| 56 |
| 57 client, err := c.factory() |
| 58 if err != nil { |
| 59 return nil, err |
| 60 } |
| 61 ownsClient := true |
| 62 defer func() { |
| 63 // If we haven't written out the connection, close. |
| 64 if ownsClient { |
| 65 client.Close() |
| 66 } |
| 67 }() |
| 68 |
| 69 data, err := json.Marshal(f) |
| 70 if err != nil { |
| 71 return nil, fmt.Errorf("failed to marshal properties JSON: %s",
err) |
| 72 } |
| 73 |
| 74 // Perform the handshake: magic + size(data) + data. |
| 75 s := &streamImpl{ |
| 76 Properties: p, |
| 77 WriteCloser: client, |
| 78 } |
| 79 if _, err := s.writeRaw(streamproto.ProtocolFrameHeaderMagic); err != ni
l { |
| 80 return nil, fmt.Errorf("failed to write magic number: %s", err) |
| 81 } |
| 82 if err := s.writeRecord(data); err != nil { |
| 83 return nil, fmt.Errorf("failed to write properties: %s", err) |
| 84 } |
| 85 |
| 86 ownsClient = false // Passing ownership to caller. |
| 87 return s, nil |
| 88 } |
| OLD | NEW |