Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(354)

Side by Side Diff: client/logdog/butlerlib/streamclient/client.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Fixed datagram check, now with unit tests! Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamclient
6
7 import (
8 "encoding/json"
9 "fmt"
10 "io"
11 "strings"
12 "sync"
13
14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
15 )
16
17 type clientFactory func(string) (Client, error)
18
19 var (
20 // ProtocolFrameHeaderMagic is the number at the beginning of streams th at
21 // identifies the stream handshake version.
22 ProtocolFrameHeaderMagic = []byte("BTLR1\x1E")
estaab 2015/11/06 22:09:44 If and when will this change, and what would it ch
dnj (Google) 2015/11/07 16:47:41 No idea - some other byte string that doesn't begi
23
24 // defaultRegistry is the default protocol registry.
25 defaultRegistry = &protocolRegistry{}
26 )
27
28 type protocolRegistry struct {
29 sync.Mutex
30
31 // protocols is the set of registered protocols. Each client should regi ster
32 // via registerProtocol in its init() method.
33 protocols map[string]clientFactory
34 }
35
36 func (r *protocolRegistry) register(name string, f clientFactory) {
37 r.Lock()
38 defer r.Unlock()
39
40 if _, ok := r.protocols[name]; ok {
41 panic(fmt.Errorf("streamclient: protocol already registered for [%s]", name))
42 }
43 if r.protocols == nil {
44 r.protocols = make(map[string]clientFactory)
45 }
46 r.protocols[name] = f
47 }
48
49 func (r *protocolRegistry) newClient(path string) (Client, error) {
estaab 2015/11/06 22:09:44 It's not clear from just the name and type what th
50 parts := strings.SplitN(path, ":", 2)
51 params := ""
52 if len(parts) == 2 {
53 params = parts[1]
54 }
55
56 if f, ok := r.protocols[parts[0]]; ok {
estaab 2015/11/06 22:09:44 Do we not need to hold a reader lock when reading
dnj (Google) 2015/11/07 16:47:41 As long as nothing's writing, we don't need to hol
57 return f(params)
58 }
59 return nil, fmt.Errorf("streamclient: no protocol registered for [%s]", parts[0])
60 }
61
62 func registerProtocol(name string, f clientFactory) {
63 defaultRegistry.register(name, f)
64 }
65
66 // Client is a client to a LogDog Butler StreamServer. A Client will connect
67 // to a StreamServer, negotiate a stream configuration, and return an active
68 // stream object that can be written to.
69 type Client interface {
estaab 2015/11/06 22:09:44 Are there any style guidelines around ordering of
dnj (Google) 2015/11/07 16:47:41 Guidelines are for readability. I split the regist
70 // NewStream creates a new stream with the supplied stream properties.
71 NewStream(f streamproto.Flags) (Stream, error)
72 }
73
74 // streamFactory is a factory method to generate an io.WriteCloser stream for
75 // the current clientImpl.
76 type streamFactory func() (io.WriteCloser, error)
77
78 // clientImpl is an implementation of the Client interface using a net.Conn
79 // factory to generate an individual stream.
80 type clientImpl struct {
81 // network is the connection path to the stream server.
82 factory streamFactory
83 }
84
85 // New instantiates a new Client instance. This type of instance will be parsed
86 // from the supplied path string, which takes the form:
87 // <protocol>:<protocol-specific-spec>
88 //
89 // Supported protocols and their respective specs are:
90 // - `tcp4:[port]` (The stream server is listening on localhost at the specifi ed
91 // port.)
92 func New(path string) (Client, error) {
93 return defaultRegistry.newClient(path)
94 }
95
96 func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) {
97 p := f.Properties()
98 if err := p.Validate(); err != nil {
99 return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err)
100 }
101
102 client, err := c.factory()
103 if err != nil {
104 return nil, err
105 }
106 passing := false
107 defer func() {
108 // If we haven't written out the connection, close.
109 if !passing {
110 client.Close()
111 }
112 }()
113
114 data, err := json.Marshal(f)
115 if err != nil {
116 return nil, fmt.Errorf("failed to marshal properties JSON: %s", err)
117 }
118
119 // Perform the handshake: magic + size(data) + data.
120 s := &streamImpl{
121 Properties: p,
122 WriteCloser: client,
123 }
124 if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil {
125 return nil, fmt.Errorf("failed to write magic number: %s", err)
126 }
127 if err := s.writeRecord(data); err != nil {
128 return nil, fmt.Errorf("failed to write properties: %s", err)
129 }
130
131 passing = true
132 return s, nil
133 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698