OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamserver |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "encoding/json" |
| 10 "errors" |
| 11 "fmt" |
| 12 "io" |
| 13 |
| 14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/iotools" |
| 17 "github.com/luci/luci-go/common/logdog/protocol" |
| 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/proto/google" |
| 20 "github.com/luci/luci-go/common/recordio" |
| 21 "golang.org/x/net/context" |
| 22 ) |
| 23 |
| 24 // handshakeProtocol is an implementation of a Butler handshake protocol V1 |
| 25 // reader. It identifies with streamproto.ProtocolFrameHeaderMagic, and uses a |
| 26 // JSON blob to describe the stream. |
| 27 type handshakeProtocol struct { |
| 28 forceVerbose bool // (Testing) force verbose code path. |
| 29 } |
| 30 |
| 31 const ( |
| 32 // The maximum size of the header (1MB). |
| 33 maxHeaderSize = 1 * 1024 * 1024 |
| 34 ) |
| 35 |
| 36 func (p *handshakeProtocol) defaultFlags() *streamproto.Flags { |
| 37 return &streamproto.Flags{ |
| 38 Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT), |
| 39 Tee: streamproto.TeeNone, |
| 40 } |
| 41 } |
| 42 |
| 43 func (p *handshakeProtocol) Handshake(ctx context.Context, r io.Reader) (*stream
proto.Properties, error) { |
| 44 // Read the frame header magic number (version) |
| 45 magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic)) |
| 46 if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) { |
| 47 log.Fields{ |
| 48 log.ErrorKey: err, |
| 49 }.Errorf(ctx, "Failed to read frame header magic number.") |
| 50 return nil, errors.New("handshake: failed to read frame header m
agic number") |
| 51 } |
| 52 |
| 53 // Check the magic number/version |
| 54 if !bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) { |
| 55 log.Fields{ |
| 56 "magic": fmt.Sprintf("%#X", magic), |
| 57 }.Errorf(ctx, "Unrecognized frame header magic number.") |
| 58 return nil, errors.New("handshake: Unknown protocol magic in fra
me header") |
| 59 } |
| 60 |
| 61 // Load the JSON into our descriptor field. |
| 62 flags, err := p.loadFlags(ctx, r) |
| 63 if err != nil { |
| 64 return nil, err |
| 65 } |
| 66 |
| 67 props := flags.Properties() |
| 68 if props.Timestamp == nil { |
| 69 props.Timestamp = google.NewTimestamp(clock.Now(ctx)) |
| 70 } |
| 71 if err := props.Validate(); err != nil { |
| 72 return nil, err |
| 73 } |
| 74 |
| 75 return props, nil |
| 76 } |
| 77 |
| 78 func (p *handshakeProtocol) loadFlags(ctx context.Context, r io.Reader) (*stream
proto.Flags, error) { |
| 79 fr := recordio.NewReader(r, maxHeaderSize) |
| 80 |
| 81 // Read the header frame. |
| 82 frameSize, hr, err := fr.ReadFrame() |
| 83 if err != nil { |
| 84 log.Errorf(log.SetError(ctx, err), "Failed to read header frame.
") |
| 85 return nil, err |
| 86 } |
| 87 |
| 88 // When tracing, buffer the JSON data locally so we can emit it via log. |
| 89 headerBuf := bytes.Buffer{} |
| 90 captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose |
| 91 if captureHeader { |
| 92 hr = io.TeeReader(hr, &headerBuf) |
| 93 } |
| 94 |
| 95 // When we hand the header reader to the "json" library, we want to coun
t how |
| 96 // many bytes it reads from it. We will assert that it has read the full
set |
| 97 // of bytes. |
| 98 chr := &iotools.CountingReader{Reader: hr} |
| 99 |
| 100 // Decode into our protocol description structure. Note that extra field
s |
| 101 // are ignored (no error) and missing fields retain their zero value. |
| 102 f := p.defaultFlags() |
| 103 err = json.NewDecoder(chr).Decode(f) |
| 104 if captureHeader { |
| 105 log.Fields{ |
| 106 "frameSize": frameSize, |
| 107 "decodeSize": headerBuf.Len(), |
| 108 }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String()) |
| 109 } |
| 110 if err != nil { |
| 111 log.Fields{ |
| 112 log.ErrorKey: err, |
| 113 }.Errorf(ctx, "Failed to decode stream description data JSON.") |
| 114 return nil, err |
| 115 } |
| 116 |
| 117 // Make sure that this consumed the full JSON size that was specified. |
| 118 // |
| 119 // We use a countReader because the 'json' library doesn't give us a way
to |
| 120 // know how many bytes it consumed when it decoded. |
| 121 if chr.Count() != frameSize { |
| 122 log.Fields{ |
| 123 "blockSize": chr.Count(), |
| 124 "frameSize": frameSize, |
| 125 }.Errorf(ctx, "Stream description block was not fully consumed."
) |
| 126 return nil, errors.New("handshake: stream description block was
not fully consumed") |
| 127 } |
| 128 |
| 129 return f, nil |
| 130 } |
OLD | NEW |