Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Unified Diff: client/logdog/butlerproto/proto.go

Issue 1321273002: LogDog: Add butler protocol reader/write library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-frame
Patch Set: Updates. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/logdog/butlerproto/proto.go
diff --git a/client/logdog/butlerproto/proto.go b/client/logdog/butlerproto/proto.go
new file mode 100644
index 0000000000000000000000000000000000000000..d5c0cf1cbc5a78bd7b6f236c97b0a2c3943ff0ad
--- /dev/null
+++ b/client/logdog/butlerproto/proto.go
@@ -0,0 +1,247 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package butlerproto
+
+import (
+ "bytes"
+ "compress/zlib"
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/logdog/types"
+ "github.com/luci/luci-go/common/recordio"
+)
+
+const (
+ // DefaultCompressThreshold is the byte size threshold for compressing message
+ // data. Messages whose byte count is less than or equal to this threshold
+ // will not be compressed.
+ //
+ // This is the value used by Akamai for its compression threshold:
+ // "The reasons 860 bytes is the minimum size for compression is twofold:
+ // (1) The overhead of compressing an object under 860 bytes outweighs
+ // performance gain. (2) Objects under 860 bytes can be transmitted via a
+ // single packet anyway, so there isn't a compelling reason to compress them."
+ DefaultCompressThreshold = 860
+)
+
+// protoBase is the base type of protocol reader/writer objects.
+type protoBase struct {
+ // maxSize is the maximum Butler protocol data size. By default, it is
+ // types.MaxButlerLogBundleSize. However, it can be overridden for testing
+ // here.
+ maxSize int64
+}
+
+func (p *protoBase) getMaxSize() int64 {
+ if p.maxSize == 0 {
+ return types.MaxButlerLogBundleSize
+ }
+ return p.maxSize
+}
+
+// Reader is a protocol reader instance.
+type Reader struct {
+ protoBase
+
+ // Metadata is the unpacked ButlerMetadata. It is populated when the
+ // metadata has been read.
+ Metadata *protocol.ButlerMetadata
+
+ // Bundle is the unpacked ButlerLogBundle. It is populated when the
+ // protocol data has been read and the Metadata indicates a ButlerLogBundle
+ // type.
+ Bundle *protocol.ButlerLogBundle
+}
+
+// ReadMetadata reads the metadata header frame.
+func (r *Reader) readMetadata(fr recordio.Reader) error {
+ data, err := fr.ReadFrameAll()
+ if err != nil {
+ return err
+ }
+
+ md := protocol.ButlerMetadata{}
+ if err := proto.Unmarshal(data, &md); err != nil {
+ return fmt.Errorf("butlerproto: failed to unmarshal Metadata frame: %s", err)
+ }
+ r.Metadata = &md
+ return nil
+}
+
+func (r *Reader) readData(fr recordio.Reader) ([]byte, error) {
+ size, br, err := fr.ReadFrame()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read bundle frame: %s", err)
+ }
+
+ // Read the frame through a zlib reader.
+ switch r.Metadata.Compression {
+ case protocol.ButlerMetadata_NONE:
+ break
+
+ case protocol.ButlerMetadata_ZLIB:
+ br, err = zlib.NewReader(br)
+ if err != nil {
+ return nil, fmt.Errorf("failed to initialize zlib reader: %s", err)
+ }
+
+ default:
+ return nil, fmt.Errorf("unknown compression type: %v", r.Metadata.Compression)
+ }
+
+ // Wrap our reader in a limitErrorReader so we don't pull data beyond our
+ // soft maximum.
+ br = &limitErrorReader{
+ Reader: br,
+ limit: r.getMaxSize(),
+ }
+
+ buf := bytes.Buffer{}
+ buf.Grow(int(size))
+ _, err = buf.ReadFrom(br)
+ if err != nil {
+ return nil, fmt.Errorf("butlerproto: failed to buffer bundle frame: %s", err)
+ }
+ return buf.Bytes(), nil
+}
+
+func (r *Reader) Read(ir io.Reader) error {
+ fr := recordio.NewReader(ir, r.getMaxSize())
+
+ // Ensure that we have our Metadata.
+ if err := r.readMetadata(fr); err != nil {
+ return err
+ }
+
+ switch r.Metadata.Type {
+ case protocol.ButlerMetadata_ButlerLogBundle:
+ data, err := r.readData(fr)
+ if err != nil {
+ return fmt.Errorf("butlerproto: failed to read Bundle data: %s", err)
+ }
+
+ bundle := protocol.ButlerLogBundle{}
+ if err := proto.Unmarshal(data, &bundle); err != nil {
+ return fmt.Errorf("butlerproto: failed to unmarshal Bundle frame: %s", err)
+ }
+ r.Bundle = &bundle
+ return nil
+
+ default:
+ return fmt.Errorf("butlerproto: unknown data type: %s", r.Metadata.Type)
+ }
+}
+
+// limitErrorReader is similar to io.LimitReader, except that it returns
+// a custom error instead of io.EOF.
+//
+// This is important, as it allows us to distinguish between the end of
+// the compressed reader's data and a limit being hit.
+type limitErrorReader struct {
+ io.Reader // underlying reader
+ limit int64 // max bytes remaining
+}
+
+func (r *limitErrorReader) Read(p []byte) (int, error) {
+ if r.limit <= 0 {
+ return 0, errors.New("limit exceeded")
+ }
+ if int64(len(p)) > r.limit {
estaab 2015/11/04 12:33:24 Isn't this delaying hitting the limit to the next
dnj 2015/11/04 18:19:28 This should be fine b/c I reduce "r.limit" each pa
+ p = p[0:r.limit]
+ }
+ n, err := r.Reader.Read(p)
+ r.limit -= int64(n)
+ return n, err
+}
+
+// Writer writes Butler messages that the Reader can read.
+type Writer struct {
+ protoBase
+
+ // Compress, if true, allows the Writer to choose to compress data when
+ // applicable.
+ Compress bool
+
+ // CompressThreshold is the minimum size that data must be in order to
+ CompressThreshold int
+
+ compressBuf bytes.Buffer
+ compressWriter *zlib.Writer
+}
+
+func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_ContentType, data []byte) error {
+ if int64(len(data)) > w.getMaxSize() {
+ return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize())
+ }
+
+ md := protocol.ButlerMetadata{
+ Type: t,
+ }
+
+ // If we're configured to compress and the data is below our threshold,
+ // compress.
+ if w.Compress && len(data) >= w.CompressThreshold {
+ w.compressBuf.Reset()
+ if w.compressWriter == nil {
+ w.compressWriter = zlib.NewWriter(&w.compressBuf)
+ } else {
+ w.compressWriter.Reset(&w.compressBuf)
+ }
+ if _, err := w.compressWriter.Write(data); err != nil {
+ return err
+ }
+ if err := w.compressWriter.Close(); err != nil {
+ return err
+ }
+
+ compressed := true
+ if compressed {
+ md.Compression = protocol.ButlerMetadata_ZLIB
+ }
+ data = w.compressBuf.Bytes()
+ }
+
+ // Write metadata frame.
+ mdData, err := proto.Marshal(&md)
+ if err != nil {
+ return fmt.Errorf("butlerproto: failed to marshal Metadata: %s", err)
+ }
+ _, err = fw.Write(mdData)
+ if err != nil {
+ return fmt.Errorf("butlerproto: failed to write Metadata frame: %s", err)
+ }
+ if err := fw.Flush(); err != nil {
+ return fmt.Errorf("butlerproto: failed to flush Metadata frame: %s", err)
+ }
+
+ // Write data frame.
+ _, err = fw.Write(data)
+ if err != nil {
+ return fmt.Errorf("butlerproto: failed to write data frame: %s", err)
+ }
+ if err := fw.Flush(); err != nil {
+ return fmt.Errorf("butlerproto: failed to flush data frame: %s", err)
+ }
+ return nil
+}
+
+// WriteWith writes a ButlerLogBundle to the supplied Writer.
+func (w *Writer) Write(iw io.Writer, b *protocol.ButlerLogBundle) error {
+ return w.WriteWith(recordio.NewWriter(iw), b)
+}
+
+// WriteWith writes a ButlerLogBundle to the supplied recordio.Writer.
+func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) error {
+ data, err := proto.Marshal(b)
+ if err != nil {
+ return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b)
+ }
+
+ return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data)
+}
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698