Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(74)

Side by Side Diff: client/logdog/butlerproto/proto.go

Issue 1321273002: LogDog: Add butler protocol reader/write library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-frame
Patch Set: Updates. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package butlerproto
6
7 import (
8 "bytes"
9 "compress/zlib"
10 "errors"
11 "fmt"
12 "io"
13
14 "github.com/golang/protobuf/proto"
15 "github.com/luci/luci-go/common/logdog/protocol"
16 "github.com/luci/luci-go/common/logdog/types"
17 "github.com/luci/luci-go/common/recordio"
18 )
19
20 const (
21 // DefaultCompressThreshold is the byte size threshold for compressing m essage
22 // data. Messages whose byte count is less than or equal to this thresho ld
23 // will not be compressed.
24 //
25 // This is the value used by Akamai for its compression threshold:
26 // "The reasons 860 bytes is the minimum size for compression is twofold :
27 // (1) The overhead of compressing an object under 860 bytes outweighs
28 // performance gain. (2) Objects under 860 bytes can be transmitted via a
29 // single packet anyway, so there isn't a compelling reason to compress them."
30 DefaultCompressThreshold = 860
31 )
32
33 // protoBase is the base type of protocol reader/writer objects.
34 type protoBase struct {
35 // maxSize is the maximum Butler protocol data size. By default, it is
36 // types.MaxButlerLogBundleSize. However, it can be overridden for testi ng
37 // here.
38 maxSize int64
39 }
40
41 func (p *protoBase) getMaxSize() int64 {
42 if p.maxSize == 0 {
43 return types.MaxButlerLogBundleSize
44 }
45 return p.maxSize
46 }
47
48 // Reader is a protocol reader instance.
49 type Reader struct {
50 protoBase
51
52 // Metadata is the unpacked ButlerMetadata. It is populated when the
53 // metadata has been read.
54 Metadata *protocol.ButlerMetadata
55
56 // Bundle is the unpacked ButlerLogBundle. It is populated when the
57 // protocol data has been read and the Metadata indicates a ButlerLogBun dle
58 // type.
59 Bundle *protocol.ButlerLogBundle
60 }
61
62 // ReadMetadata reads the metadata header frame.
63 func (r *Reader) readMetadata(fr recordio.Reader) error {
64 data, err := fr.ReadFrameAll()
65 if err != nil {
66 return err
67 }
68
69 md := protocol.ButlerMetadata{}
70 if err := proto.Unmarshal(data, &md); err != nil {
71 return fmt.Errorf("butlerproto: failed to unmarshal Metadata fra me: %s", err)
72 }
73 r.Metadata = &md
74 return nil
75 }
76
77 func (r *Reader) readData(fr recordio.Reader) ([]byte, error) {
78 size, br, err := fr.ReadFrame()
79 if err != nil {
80 return nil, fmt.Errorf("failed to read bundle frame: %s", err)
81 }
82
83 // Read the frame through a zlib reader.
84 switch r.Metadata.Compression {
85 case protocol.ButlerMetadata_NONE:
86 break
87
88 case protocol.ButlerMetadata_ZLIB:
89 br, err = zlib.NewReader(br)
90 if err != nil {
91 return nil, fmt.Errorf("failed to initialize zlib reader : %s", err)
92 }
93
94 default:
95 return nil, fmt.Errorf("unknown compression type: %v", r.Metadat a.Compression)
96 }
97
98 // Wrap our reader in a limitErrorReader so we don't pull data beyond ou r
99 // soft maximum.
100 br = &limitErrorReader{
101 Reader: br,
102 limit: r.getMaxSize(),
103 }
104
105 buf := bytes.Buffer{}
106 buf.Grow(int(size))
107 _, err = buf.ReadFrom(br)
108 if err != nil {
109 return nil, fmt.Errorf("butlerproto: failed to buffer bundle fra me: %s", err)
110 }
111 return buf.Bytes(), nil
112 }
113
114 func (r *Reader) Read(ir io.Reader) error {
115 fr := recordio.NewReader(ir, r.getMaxSize())
116
117 // Ensure that we have our Metadata.
118 if err := r.readMetadata(fr); err != nil {
119 return err
120 }
121
122 switch r.Metadata.Type {
123 case protocol.ButlerMetadata_ButlerLogBundle:
124 data, err := r.readData(fr)
125 if err != nil {
126 return fmt.Errorf("butlerproto: failed to read Bundle da ta: %s", err)
127 }
128
129 bundle := protocol.ButlerLogBundle{}
130 if err := proto.Unmarshal(data, &bundle); err != nil {
131 return fmt.Errorf("butlerproto: failed to unmarshal Bund le frame: %s", err)
132 }
133 r.Bundle = &bundle
134 return nil
135
136 default:
137 return fmt.Errorf("butlerproto: unknown data type: %s", r.Metada ta.Type)
138 }
139 }
140
141 // limitErrorReader is similar to io.LimitReader, except that it returns
142 // a custom error instead of io.EOF.
143 //
144 // This is important, as it allows us to distinguish between the end of
145 // the compressed reader's data and a limit being hit.
146 type limitErrorReader struct {
147 io.Reader // underlying reader
148 limit int64 // max bytes remaining
149 }
150
151 func (r *limitErrorReader) Read(p []byte) (int, error) {
152 if r.limit <= 0 {
153 return 0, errors.New("limit exceeded")
154 }
155 if int64(len(p)) > r.limit {
estaab 2015/11/04 12:33:24 Isn't this delaying hitting the limit to the next
dnj 2015/11/04 18:19:28 This should be fine b/c I reduce "r.limit" each pa
156 p = p[0:r.limit]
157 }
158 n, err := r.Reader.Read(p)
159 r.limit -= int64(n)
160 return n, err
161 }
162
163 // Writer writes Butler messages that the Reader can read.
164 type Writer struct {
165 protoBase
166
167 // Compress, if true, allows the Writer to choose to compress data when
168 // applicable.
169 Compress bool
170
171 // CompressThreshold is the minimum size that data must be in order to
172 CompressThreshold int
173
174 compressBuf bytes.Buffer
175 compressWriter *zlib.Writer
176 }
177
178 func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_Content Type, data []byte) error {
179 if int64(len(data)) > w.getMaxSize() {
180 return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize())
181 }
182
183 md := protocol.ButlerMetadata{
184 Type: t,
185 }
186
187 // If we're configured to compress and the data is below our threshold,
188 // compress.
189 if w.Compress && len(data) >= w.CompressThreshold {
190 w.compressBuf.Reset()
191 if w.compressWriter == nil {
192 w.compressWriter = zlib.NewWriter(&w.compressBuf)
193 } else {
194 w.compressWriter.Reset(&w.compressBuf)
195 }
196 if _, err := w.compressWriter.Write(data); err != nil {
197 return err
198 }
199 if err := w.compressWriter.Close(); err != nil {
200 return err
201 }
202
203 compressed := true
204 if compressed {
205 md.Compression = protocol.ButlerMetadata_ZLIB
206 }
207 data = w.compressBuf.Bytes()
208 }
209
210 // Write metadata frame.
211 mdData, err := proto.Marshal(&md)
212 if err != nil {
213 return fmt.Errorf("butlerproto: failed to marshal Metadata: %s", err)
214 }
215 _, err = fw.Write(mdData)
216 if err != nil {
217 return fmt.Errorf("butlerproto: failed to write Metadata frame: %s", err)
218 }
219 if err := fw.Flush(); err != nil {
220 return fmt.Errorf("butlerproto: failed to flush Metadata frame: %s", err)
221 }
222
223 // Write data frame.
224 _, err = fw.Write(data)
225 if err != nil {
226 return fmt.Errorf("butlerproto: failed to write data frame: %s", err)
227 }
228 if err := fw.Flush(); err != nil {
229 return fmt.Errorf("butlerproto: failed to flush data frame: %s", err)
230 }
231 return nil
232 }
233
234 // WriteWith writes a ButlerLogBundle to the supplied Writer.
235 func (w *Writer) Write(iw io.Writer, b *protocol.ButlerLogBundle) error {
236 return w.WriteWith(recordio.NewWriter(iw), b)
237 }
238
239 // WriteWith writes a ButlerLogBundle to the supplied recordio.Writer.
240 func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) erro r {
241 data, err := proto.Marshal(b)
242 if err != nil {
243 return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b )
244 }
245
246 return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data)
247 }
OLDNEW
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698