Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(262)

Unified Diff: client/logdog/butlerlib/streamclient/client.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Fixed datagram check, now with unit tests! Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/logdog/butlerlib/streamclient/client.go
diff --git a/client/logdog/butlerlib/streamclient/client.go b/client/logdog/butlerlib/streamclient/client.go
new file mode 100644
index 0000000000000000000000000000000000000000..65d1f55179141756e7758832edbe7b08cf2eb1f1
--- /dev/null
+++ b/client/logdog/butlerlib/streamclient/client.go
@@ -0,0 +1,133 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package streamclient
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "strings"
+ "sync"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+)
+
+type clientFactory func(string) (Client, error)
+
+var (
+ // ProtocolFrameHeaderMagic is the number at the beginning of streams that
+ // identifies the stream handshake version.
+ ProtocolFrameHeaderMagic = []byte("BTLR1\x1E")
estaab 2015/11/06 22:09:44 If and when will this change, and what would it ch
dnj (Google) 2015/11/07 16:47:41 No idea - some other byte string that doesn't begi
+
+ // defaultRegistry is the default protocol registry.
+ defaultRegistry = &protocolRegistry{}
+)
+
+type protocolRegistry struct {
+ sync.Mutex
+
+ // protocols is the set of registered protocols. Each client should register
+ // via registerProtocol in its init() method.
+ protocols map[string]clientFactory
+}
+
+func (r *protocolRegistry) register(name string, f clientFactory) {
+ r.Lock()
+ defer r.Unlock()
+
+ if _, ok := r.protocols[name]; ok {
+ panic(fmt.Errorf("streamclient: protocol already registered for [%s]", name))
+ }
+ if r.protocols == nil {
+ r.protocols = make(map[string]clientFactory)
+ }
+ r.protocols[name] = f
+}
+
+func (r *protocolRegistry) newClient(path string) (Client, error) {
estaab 2015/11/06 22:09:44 It's not clear from just the name and type what th
+ parts := strings.SplitN(path, ":", 2)
+ params := ""
+ if len(parts) == 2 {
+ params = parts[1]
+ }
+
+ if f, ok := r.protocols[parts[0]]; ok {
estaab 2015/11/06 22:09:44 Do we not need to hold a reader lock when reading
dnj (Google) 2015/11/07 16:47:41 As long as nothing's writing, we don't need to hol
+ return f(params)
+ }
+ return nil, fmt.Errorf("streamclient: no protocol registered for [%s]", parts[0])
+}
+
+func registerProtocol(name string, f clientFactory) {
+ defaultRegistry.register(name, f)
+}
+
+// Client is a client to a LogDog Butler StreamServer. A Client will connect
+// to a StreamServer, negotiate a stream configuration, and return an active
+// stream object that can be written to.
+type Client interface {
estaab 2015/11/06 22:09:44 Are there any style guidelines around ordering of
dnj (Google) 2015/11/07 16:47:41 Guidelines are for readability. I split the regist
+ // NewStream creates a new stream with the supplied stream properties.
+ NewStream(f streamproto.Flags) (Stream, error)
+}
+
+// streamFactory is a factory method to generate an io.WriteCloser stream for
+// the current clientImpl.
+type streamFactory func() (io.WriteCloser, error)
+
+// clientImpl is an implementation of the Client interface using a net.Conn
+// factory to generate an individual stream.
+type clientImpl struct {
+ // network is the connection path to the stream server.
+ factory streamFactory
+}
+
+// New instantiates a new Client instance. This type of instance will be parsed
+// from the supplied path string, which takes the form:
+// <protocol>:<protocol-specific-spec>
+//
+// Supported protocols and their respective specs are:
+// - `tcp4:[port]` (The stream server is listening on localhost at the specified
+// port.)
+func New(path string) (Client, error) {
+ return defaultRegistry.newClient(path)
+}
+
+func (c *clientImpl) NewStream(f streamproto.Flags) (Stream, error) {
+ p := f.Properties()
+ if err := p.Validate(); err != nil {
+ return nil, fmt.Errorf("streamclient: invalid stream properties: %s", err)
+ }
+
+ client, err := c.factory()
+ if err != nil {
+ return nil, err
+ }
+ passing := false
+ defer func() {
+ // If we haven't written out the connection, close.
+ if !passing {
+ client.Close()
+ }
+ }()
+
+ data, err := json.Marshal(f)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal properties JSON: %s", err)
+ }
+
+ // Perform the handshake: magic + size(data) + data.
+ s := &streamImpl{
+ Properties: p,
+ WriteCloser: client,
+ }
+ if _, err := s.writeRaw(ProtocolFrameHeaderMagic); err != nil {
+ return nil, fmt.Errorf("failed to write magic number: %s", err)
+ }
+ if err := s.writeRecord(data); err != nil {
+ return nil, fmt.Errorf("failed to write properties: %s", err)
+ }
+
+ passing = true
+ return s, nil
+}

Powered by Google App Engine
This is Rietveld 408576698