Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package butlerproto | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "compress/zlib" | |
| 10 "errors" | |
| 11 "fmt" | |
| 12 "io" | |
| 13 | |
| 14 "github.com/golang/protobuf/proto" | |
| 15 "github.com/luci/luci-go/common/logdog/protocol" | |
| 16 "github.com/luci/luci-go/common/logdog/types" | |
| 17 "github.com/luci/luci-go/common/recordio" | |
| 18 ) | |
| 19 | |
| 20 const ( | |
| 21 // DefaultCompressThreshold is the byte size threshold for compressing m essage | |
| 22 // data. Messages whose byte count is less than or equal to this thresho ld | |
| 23 // will not be compressed. | |
| 24 // | |
| 25 // This is the value used by Akamai for its compression threshold: | |
| 26 // "The reasons 860 bytes is the minimum size for compression is twofold : | |
| 27 // (1) The overhead of compressing an object under 860 bytes outweighs | |
| 28 // performance gain. (2) Objects under 860 bytes can be transmitted via a | |
| 29 // single packet anyway, so there isn't a compelling reason to compress them." | |
| 30 DefaultCompressThreshold = 860 | |
| 31 ) | |
| 32 | |
| 33 // protoBase is the base type of protocol reader/writer objects. | |
| 34 type protoBase struct { | |
| 35 // maxSize is the maximum Butler protocol data size. By default, it is | |
| 36 // types.MaxButlerLogBundleSize. However, it can be overridden for testi ng | |
| 37 // here. | |
| 38 maxSize int64 | |
| 39 } | |
| 40 | |
| 41 func (p *protoBase) getMaxSize() int64 { | |
| 42 if p.maxSize == 0 { | |
| 43 return types.MaxButlerLogBundleSize | |
| 44 } | |
| 45 return p.maxSize | |
| 46 } | |
| 47 | |
| 48 // Reader is a protocol reader instance. | |
| 49 type Reader struct { | |
| 50 protoBase | |
| 51 | |
| 52 // Metadata is the unpacked ButlerMetadata. It is populated when the | |
| 53 // metadata has been read. | |
| 54 Metadata *protocol.ButlerMetadata | |
| 55 | |
| 56 // Bundle is the unpacked ButlerLogBundle. It is populated when the | |
| 57 // protocol data has been read and the Metadata indicates a ButlerLogBun dle | |
| 58 // type. | |
| 59 Bundle *protocol.ButlerLogBundle | |
| 60 } | |
| 61 | |
| 62 // ReadMetadata reads the metadata header frame. | |
| 63 func (r *Reader) readMetadata(fr recordio.Reader) error { | |
| 64 data, err := fr.ReadFrameAll() | |
| 65 if err != nil { | |
| 66 return err | |
| 67 } | |
| 68 | |
| 69 md := protocol.ButlerMetadata{} | |
| 70 if err := proto.Unmarshal(data, &md); err != nil { | |
| 71 return fmt.Errorf("butlerproto: failed to unmarshal Metadata fra me: %s", err) | |
| 72 } | |
| 73 r.Metadata = &md | |
| 74 return nil | |
| 75 } | |
| 76 | |
| 77 func (r *Reader) readData(fr recordio.Reader) ([]byte, error) { | |
| 78 size, br, err := fr.ReadFrame() | |
| 79 if err != nil { | |
| 80 return nil, fmt.Errorf("failed to read bundle frame: %s", err) | |
| 81 } | |
| 82 | |
| 83 // Read the frame through a zlib reader. | |
| 84 switch r.Metadata.Compression { | |
| 85 case protocol.ButlerMetadata_NONE: | |
| 86 break | |
| 87 | |
| 88 case protocol.ButlerMetadata_ZLIB: | |
| 89 br, err = zlib.NewReader(br) | |
| 90 if err != nil { | |
| 91 return nil, fmt.Errorf("failed to initialize zlib reader : %s", err) | |
| 92 } | |
| 93 | |
| 94 default: | |
| 95 return nil, fmt.Errorf("unknown compression type: %v", r.Metadat a.Compression) | |
| 96 } | |
| 97 | |
| 98 // Wrap our reader in a limitErrorReader so we don't pull data beyond ou r | |
| 99 // soft maximum. | |
| 100 br = &limitErrorReader{ | |
| 101 Reader: br, | |
| 102 limit: r.getMaxSize(), | |
| 103 } | |
| 104 | |
| 105 buf := bytes.Buffer{} | |
| 106 buf.Grow(int(size)) | |
| 107 _, err = buf.ReadFrom(br) | |
| 108 if err != nil { | |
| 109 return nil, fmt.Errorf("butlerproto: failed to buffer bundle fra me: %s", err) | |
| 110 } | |
| 111 return buf.Bytes(), nil | |
| 112 } | |
| 113 | |
| 114 func (r *Reader) Read(ir io.Reader) error { | |
| 115 fr := recordio.NewReader(ir, r.getMaxSize()) | |
| 116 | |
| 117 // Ensure that we have our Metadata. | |
| 118 if err := r.readMetadata(fr); err != nil { | |
| 119 return err | |
| 120 } | |
| 121 | |
| 122 switch r.Metadata.Type { | |
| 123 case protocol.ButlerMetadata_ButlerLogBundle: | |
| 124 data, err := r.readData(fr) | |
| 125 if err != nil { | |
| 126 return fmt.Errorf("butlerproto: failed to read Bundle da ta: %s", err) | |
| 127 } | |
| 128 | |
| 129 bundle := protocol.ButlerLogBundle{} | |
| 130 if err := proto.Unmarshal(data, &bundle); err != nil { | |
| 131 return fmt.Errorf("butlerproto: failed to unmarshal Bund le frame: %s", err) | |
| 132 } | |
| 133 r.Bundle = &bundle | |
| 134 return nil | |
| 135 | |
| 136 default: | |
| 137 return fmt.Errorf("butlerproto: unknown data type: %s", r.Metada ta.Type) | |
| 138 } | |
| 139 } | |
| 140 | |
| 141 // limitErrorReader is similar to io.LimitReader, except that it returns | |
| 142 // a custom error instead of io.EOF. | |
| 143 // | |
| 144 // This is important, as it allows us to distinguish between the end of | |
| 145 // the compressed reader's data and a limit being hit. | |
| 146 type limitErrorReader struct { | |
| 147 io.Reader // underlying reader | |
| 148 limit int64 // max bytes remaining | |
| 149 } | |
| 150 | |
| 151 func (r *limitErrorReader) Read(p []byte) (int, error) { | |
| 152 if r.limit <= 0 { | |
| 153 return 0, errors.New("limit exceeded") | |
| 154 } | |
| 155 if int64(len(p)) > r.limit { | |
|
estaab
2015/11/04 12:33:24
Isn't this delaying hitting the limit to the next
dnj
2015/11/04 18:19:28
This should be fine b/c I reduce "r.limit" each pa
| |
| 156 p = p[0:r.limit] | |
| 157 } | |
| 158 n, err := r.Reader.Read(p) | |
| 159 r.limit -= int64(n) | |
| 160 return n, err | |
| 161 } | |
| 162 | |
| 163 // Writer writes Butler messages that the Reader can read. | |
| 164 type Writer struct { | |
| 165 protoBase | |
| 166 | |
| 167 // Compress, if true, allows the Writer to choose to compress data when | |
| 168 // applicable. | |
| 169 Compress bool | |
| 170 | |
| 171 // CompressThreshold is the minimum size that data must be in order to | |
| 172 CompressThreshold int | |
| 173 | |
| 174 compressBuf bytes.Buffer | |
| 175 compressWriter *zlib.Writer | |
| 176 } | |
| 177 | |
| 178 func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_Content Type, data []byte) error { | |
| 179 if int64(len(data)) > w.getMaxSize() { | |
| 180 return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize()) | |
| 181 } | |
| 182 | |
| 183 md := protocol.ButlerMetadata{ | |
| 184 Type: t, | |
| 185 } | |
| 186 | |
| 187 // If we're configured to compress and the data is below our threshold, | |
| 188 // compress. | |
| 189 if w.Compress && len(data) >= w.CompressThreshold { | |
| 190 w.compressBuf.Reset() | |
| 191 if w.compressWriter == nil { | |
| 192 w.compressWriter = zlib.NewWriter(&w.compressBuf) | |
| 193 } else { | |
| 194 w.compressWriter.Reset(&w.compressBuf) | |
| 195 } | |
| 196 if _, err := w.compressWriter.Write(data); err != nil { | |
| 197 return err | |
| 198 } | |
| 199 if err := w.compressWriter.Close(); err != nil { | |
| 200 return err | |
| 201 } | |
| 202 | |
| 203 compressed := true | |
| 204 if compressed { | |
| 205 md.Compression = protocol.ButlerMetadata_ZLIB | |
| 206 } | |
| 207 data = w.compressBuf.Bytes() | |
| 208 } | |
| 209 | |
| 210 // Write metadata frame. | |
| 211 mdData, err := proto.Marshal(&md) | |
| 212 if err != nil { | |
| 213 return fmt.Errorf("butlerproto: failed to marshal Metadata: %s", err) | |
| 214 } | |
| 215 _, err = fw.Write(mdData) | |
| 216 if err != nil { | |
| 217 return fmt.Errorf("butlerproto: failed to write Metadata frame: %s", err) | |
| 218 } | |
| 219 if err := fw.Flush(); err != nil { | |
| 220 return fmt.Errorf("butlerproto: failed to flush Metadata frame: %s", err) | |
| 221 } | |
| 222 | |
| 223 // Write data frame. | |
| 224 _, err = fw.Write(data) | |
| 225 if err != nil { | |
| 226 return fmt.Errorf("butlerproto: failed to write data frame: %s", err) | |
| 227 } | |
| 228 if err := fw.Flush(); err != nil { | |
| 229 return fmt.Errorf("butlerproto: failed to flush data frame: %s", err) | |
| 230 } | |
| 231 return nil | |
| 232 } | |
| 233 | |
| 234 // WriteWith writes a ButlerLogBundle to the supplied Writer. | |
| 235 func (w *Writer) Write(iw io.Writer, b *protocol.ButlerLogBundle) error { | |
| 236 return w.WriteWith(recordio.NewWriter(iw), b) | |
| 237 } | |
| 238 | |
| 239 // WriteWith writes a ButlerLogBundle to the supplied recordio.Writer. | |
| 240 func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) erro r { | |
| 241 data, err := proto.Marshal(b) | |
| 242 if err != nil { | |
| 243 return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b ) | |
| 244 } | |
| 245 | |
| 246 return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data) | |
| 247 } | |
| OLD | NEW |