| Index: client/internal/logdog/butler/streamserver/handshake.go
|
| diff --git a/client/internal/logdog/butler/streamserver/handshake.go b/client/internal/logdog/butler/streamserver/handshake.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..9e097458ceb3b0ef2f52f7dc5b475b2ed8299777
|
| --- /dev/null
|
| +++ b/client/internal/logdog/butler/streamserver/handshake.go
|
| @@ -0,0 +1,130 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package streamserver
|
| +
|
| +import (
|
| + "bytes"
|
| + "encoding/json"
|
| + "errors"
|
| + "fmt"
|
| + "io"
|
| +
|
| + "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/iotools"
|
| + "github.com/luci/luci-go/common/logdog/protocol"
|
| + log "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| + "github.com/luci/luci-go/common/recordio"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +// handshakeProtocol is an implementation of a Butler handshake protocol V1
|
| +// reader. It identifies with streamproto.ProtocolFrameHeaderMagic, and uses a
|
| +// JSON blob to describe the stream.
|
| +type handshakeProtocol struct {
|
| + forceVerbose bool // (Testing) force verbose code path.
|
| +}
|
| +
|
| +const (
|
| + // The maximum size of the header (1MB).
|
| + maxHeaderSize = 1 * 1024 * 1024
|
| +)
|
| +
|
| +func (p *handshakeProtocol) defaultFlags() *streamproto.Flags {
|
| + return &streamproto.Flags{
|
| + Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT),
|
| + Tee: streamproto.TeeNone,
|
| + }
|
| +}
|
| +
|
| +func (p *handshakeProtocol) Handshake(ctx context.Context, r io.Reader) (*streamproto.Properties, error) {
|
| + // Read the frame header magic number (version)
|
| + magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic))
|
| + if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + }.Errorf(ctx, "Failed to read frame header magic number.")
|
| + return nil, errors.New("handshake: failed to read frame header magic number")
|
| + }
|
| +
|
| + // Check the magic number/version
|
| + if !bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) {
|
| + log.Fields{
|
| + "magic": fmt.Sprintf("%#X", magic),
|
| + }.Errorf(ctx, "Unrecognized frame header magic number.")
|
| + return nil, errors.New("handshake: Unknown protocol magic in frame header")
|
| + }
|
| +
|
| + // Load the JSON into our descriptor field.
|
| + flags, err := p.loadFlags(ctx, r)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + props := flags.Properties()
|
| + if props.Timestamp == nil {
|
| + props.Timestamp = google.NewTimestamp(clock.Now(ctx))
|
| + }
|
| + if err := props.Validate(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + return props, nil
|
| +}
|
| +
|
| +func (p *handshakeProtocol) loadFlags(ctx context.Context, r io.Reader) (*streamproto.Flags, error) {
|
| + fr := recordio.NewReader(r, maxHeaderSize)
|
| +
|
| + // Read the header frame.
|
| + frameSize, hr, err := fr.ReadFrame()
|
| + if err != nil {
|
| + log.Errorf(log.SetError(ctx, err), "Failed to read header frame.")
|
| + return nil, err
|
| + }
|
| +
|
| + // When tracing, buffer the JSON data locally so we can emit it via log.
|
| + headerBuf := bytes.Buffer{}
|
| + captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose
|
| + if captureHeader {
|
| + hr = io.TeeReader(hr, &headerBuf)
|
| + }
|
| +
|
| + // When we hand the header reader to the "json" library, we want to count how
|
| + // many bytes it reads from it. We will assert that it has read the full set
|
| + // of bytes.
|
| + chr := &iotools.CountingReader{Reader: hr}
|
| +
|
| + // Decode into our protocol description structure. Note that extra fields
|
| + // are ignored (no error) and missing fields retain their zero value.
|
| + f := p.defaultFlags()
|
| + err = json.NewDecoder(chr).Decode(f)
|
| + if captureHeader {
|
| + log.Fields{
|
| + "frameSize": frameSize,
|
| + "decodeSize": headerBuf.Len(),
|
| + }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String())
|
| + }
|
| + if err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + }.Errorf(ctx, "Failed to decode stream description data JSON.")
|
| + return nil, err
|
| + }
|
| +
|
| + // Make sure that this consumed the full JSON size that was specified.
|
| + //
|
| + // We use a countReader because the 'json' library doesn't give us a way to
|
| + // know how many bytes it consumed when it decoded.
|
| + if chr.Count() != frameSize {
|
| + log.Fields{
|
| + "blockSize": chr.Count(),
|
| + "frameSize": frameSize,
|
| + }.Errorf(ctx, "Stream description block was not fully consumed.")
|
| + return nil, errors.New("handshake: stream description block was not fully consumed")
|
| + }
|
| +
|
| + return f, nil
|
| +}
|
|
|