Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: client/internal/logdog/butler/streamserver/handshake.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Bind POSIX test to POSIX domains. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | client/internal/logdog/butler/streamserver/handshake_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "bytes"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "io"
13
14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/iotools"
17 "github.com/luci/luci-go/common/logdog/protocol"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/proto/google"
20 "github.com/luci/luci-go/common/recordio"
21 "golang.org/x/net/context"
22 )
23
24 // handshakeProtocol is an implementation of a Butler handshake protocol V1
25 // reader. It identifies with streamproto.ProtocolFrameHeaderMagic, and uses a
26 // JSON blob to describe the stream.
27 type handshakeProtocol struct {
28 forceVerbose bool // (Testing) force verbose code path.
29 }
30
31 const (
32 // The maximum size of the header (1MB).
33 maxHeaderSize = 1 * 1024 * 1024
34 )
35
36 func (p *handshakeProtocol) defaultFlags() *streamproto.Flags {
37 return &streamproto.Flags{
38 Type: streamproto.StreamType(protocol.LogStreamDescriptor_TEXT),
39 Tee: streamproto.TeeNone,
40 }
41 }
42
43 func (p *handshakeProtocol) Handshake(ctx context.Context, r io.Reader) (*stream proto.Properties, error) {
44 // Read the frame header magic number (version)
45 magic := make([]byte, len(streamproto.ProtocolFrameHeaderMagic))
46 if n, err := io.ReadFull(r, magic); (n != len(magic)) || (err != nil) {
47 log.Fields{
48 log.ErrorKey: err,
49 }.Errorf(ctx, "Failed to read frame header magic number.")
50 return nil, errors.New("handshake: failed to read frame header m agic number")
51 }
52
53 // Check the magic number/version
54 if !bytes.Equal(magic, streamproto.ProtocolFrameHeaderMagic) {
55 log.Fields{
56 "magic": fmt.Sprintf("%#X", magic),
57 }.Errorf(ctx, "Unrecognized frame header magic number.")
58 return nil, errors.New("handshake: Unknown protocol magic in fra me header")
59 }
60
61 // Load the JSON into our descriptor field.
62 flags, err := p.loadFlags(ctx, r)
63 if err != nil {
64 return nil, err
65 }
66
67 props := flags.Properties()
68 if props.Timestamp == nil {
69 props.Timestamp = google.NewTimestamp(clock.Now(ctx))
70 }
71 if err := props.Validate(); err != nil {
72 return nil, err
73 }
74
75 return props, nil
76 }
77
78 func (p *handshakeProtocol) loadFlags(ctx context.Context, r io.Reader) (*stream proto.Flags, error) {
79 fr := recordio.NewReader(r, maxHeaderSize)
80
81 // Read the header frame.
82 frameSize, hr, err := fr.ReadFrame()
83 if err != nil {
84 log.Errorf(log.SetError(ctx, err), "Failed to read header frame. ")
85 return nil, err
86 }
87
88 // When tracing, buffer the JSON data locally so we can emit it via log.
89 headerBuf := bytes.Buffer{}
90 captureHeader := log.IsLogging(ctx, log.Debug) || p.forceVerbose
91 if captureHeader {
92 hr = io.TeeReader(hr, &headerBuf)
93 }
94
95 // When we hand the header reader to the "json" library, we want to coun t how
96 // many bytes it reads from it. We will assert that it has read the full set
97 // of bytes.
98 chr := &iotools.CountingReader{Reader: hr}
99
100 // Decode into our protocol description structure. Note that extra field s
101 // are ignored (no error) and missing fields retain their zero value.
102 f := p.defaultFlags()
103 err = json.NewDecoder(chr).Decode(f)
104 if captureHeader {
105 log.Fields{
106 "frameSize": frameSize,
107 "decodeSize": headerBuf.Len(),
108 }.Debugf(ctx, "Read JSON header:\n%s", headerBuf.String())
109 }
110 if err != nil {
111 log.Fields{
112 log.ErrorKey: err,
113 }.Errorf(ctx, "Failed to decode stream description data JSON.")
114 return nil, err
115 }
116
117 // Make sure that this consumed the full JSON size that was specified.
118 //
119 // We use a countReader because the 'json' library doesn't give us a way to
120 // know how many bytes it consumed when it decoded.
121 if chr.Count() != frameSize {
122 log.Fields{
123 "blockSize": chr.Count(),
124 "frameSize": frameSize,
125 }.Errorf(ctx, "Stream description block was not fully consumed." )
126 return nil, errors.New("handshake: stream description block was not fully consumed")
127 }
128
129 return f, nil
130 }
OLDNEW
« no previous file with comments | « no previous file | client/internal/logdog/butler/streamserver/handshake_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698