Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Unified Diff: client/logdog/butlerproto/proto.go

Issue 1211053004: LogDog: Add Butler Output package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Relocate butlerproto to common, document. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/logdog/butlerproto/proto.go
diff --git a/client/logdog/butlerproto/proto.go b/client/logdog/butlerproto/proto.go
deleted file mode 100644
index d5c0cf1cbc5a78bd7b6f236c97b0a2c3943ff0ad..0000000000000000000000000000000000000000
--- a/client/logdog/butlerproto/proto.go
+++ /dev/null
@@ -1,247 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package butlerproto
-
-import (
- "bytes"
- "compress/zlib"
- "errors"
- "fmt"
- "io"
-
- "github.com/golang/protobuf/proto"
- "github.com/luci/luci-go/common/logdog/protocol"
- "github.com/luci/luci-go/common/logdog/types"
- "github.com/luci/luci-go/common/recordio"
-)
-
-const (
- // DefaultCompressThreshold is the byte size threshold for compressing message
- // data. Messages whose byte count is less than or equal to this threshold
- // will not be compressed.
- //
- // This is the value used by Akamai for its compression threshold:
- // "The reasons 860 bytes is the minimum size for compression is twofold:
- // (1) The overhead of compressing an object under 860 bytes outweighs
- // performance gain. (2) Objects under 860 bytes can be transmitted via a
- // single packet anyway, so there isn't a compelling reason to compress them."
- DefaultCompressThreshold = 860
-)
-
-// protoBase is the base type of protocol reader/writer objects.
-type protoBase struct {
- // maxSize is the maximum Butler protocol data size. By default, it is
- // types.MaxButlerLogBundleSize. However, it can be overridden for testing
- // here.
- maxSize int64
-}
-
-func (p *protoBase) getMaxSize() int64 {
- if p.maxSize == 0 {
- return types.MaxButlerLogBundleSize
- }
- return p.maxSize
-}
-
-// Reader is a protocol reader instance.
-type Reader struct {
- protoBase
-
- // Metadata is the unpacked ButlerMetadata. It is populated when the
- // metadata has been read.
- Metadata *protocol.ButlerMetadata
-
- // Bundle is the unpacked ButlerLogBundle. It is populated when the
- // protocol data has been read and the Metadata indicates a ButlerLogBundle
- // type.
- Bundle *protocol.ButlerLogBundle
-}
-
-// ReadMetadata reads the metadata header frame.
-func (r *Reader) readMetadata(fr recordio.Reader) error {
- data, err := fr.ReadFrameAll()
- if err != nil {
- return err
- }
-
- md := protocol.ButlerMetadata{}
- if err := proto.Unmarshal(data, &md); err != nil {
- return fmt.Errorf("butlerproto: failed to unmarshal Metadata frame: %s", err)
- }
- r.Metadata = &md
- return nil
-}
-
-func (r *Reader) readData(fr recordio.Reader) ([]byte, error) {
- size, br, err := fr.ReadFrame()
- if err != nil {
- return nil, fmt.Errorf("failed to read bundle frame: %s", err)
- }
-
- // Read the frame through a zlib reader.
- switch r.Metadata.Compression {
- case protocol.ButlerMetadata_NONE:
- break
-
- case protocol.ButlerMetadata_ZLIB:
- br, err = zlib.NewReader(br)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize zlib reader: %s", err)
- }
-
- default:
- return nil, fmt.Errorf("unknown compression type: %v", r.Metadata.Compression)
- }
-
- // Wrap our reader in a limitErrorReader so we don't pull data beyond our
- // soft maximum.
- br = &limitErrorReader{
- Reader: br,
- limit: r.getMaxSize(),
- }
-
- buf := bytes.Buffer{}
- buf.Grow(int(size))
- _, err = buf.ReadFrom(br)
- if err != nil {
- return nil, fmt.Errorf("butlerproto: failed to buffer bundle frame: %s", err)
- }
- return buf.Bytes(), nil
-}
-
-func (r *Reader) Read(ir io.Reader) error {
- fr := recordio.NewReader(ir, r.getMaxSize())
-
- // Ensure that we have our Metadata.
- if err := r.readMetadata(fr); err != nil {
- return err
- }
-
- switch r.Metadata.Type {
- case protocol.ButlerMetadata_ButlerLogBundle:
- data, err := r.readData(fr)
- if err != nil {
- return fmt.Errorf("butlerproto: failed to read Bundle data: %s", err)
- }
-
- bundle := protocol.ButlerLogBundle{}
- if err := proto.Unmarshal(data, &bundle); err != nil {
- return fmt.Errorf("butlerproto: failed to unmarshal Bundle frame: %s", err)
- }
- r.Bundle = &bundle
- return nil
-
- default:
- return fmt.Errorf("butlerproto: unknown data type: %s", r.Metadata.Type)
- }
-}
-
-// limitErrorReader is similar to io.LimitReader, except that it returns
-// a custom error instead of io.EOF.
-//
-// This is important, as it allows us to distinguish between the end of
-// the compressed reader's data and a limit being hit.
-type limitErrorReader struct {
- io.Reader // underlying reader
- limit int64 // max bytes remaining
-}
-
-func (r *limitErrorReader) Read(p []byte) (int, error) {
- if r.limit <= 0 {
- return 0, errors.New("limit exceeded")
- }
- if int64(len(p)) > r.limit {
- p = p[0:r.limit]
- }
- n, err := r.Reader.Read(p)
- r.limit -= int64(n)
- return n, err
-}
-
-// Writer writes Butler messages that the Reader can read.
-type Writer struct {
- protoBase
-
- // Compress, if true, allows the Writer to choose to compress data when
- // applicable.
- Compress bool
-
- // CompressThreshold is the minimum size that data must be in order to
- CompressThreshold int
-
- compressBuf bytes.Buffer
- compressWriter *zlib.Writer
-}
-
-func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_ContentType, data []byte) error {
- if int64(len(data)) > w.getMaxSize() {
- return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize())
- }
-
- md := protocol.ButlerMetadata{
- Type: t,
- }
-
- // If we're configured to compress and the data is below our threshold,
- // compress.
- if w.Compress && len(data) >= w.CompressThreshold {
- w.compressBuf.Reset()
- if w.compressWriter == nil {
- w.compressWriter = zlib.NewWriter(&w.compressBuf)
- } else {
- w.compressWriter.Reset(&w.compressBuf)
- }
- if _, err := w.compressWriter.Write(data); err != nil {
- return err
- }
- if err := w.compressWriter.Close(); err != nil {
- return err
- }
-
- compressed := true
- if compressed {
- md.Compression = protocol.ButlerMetadata_ZLIB
- }
- data = w.compressBuf.Bytes()
- }
-
- // Write metadata frame.
- mdData, err := proto.Marshal(&md)
- if err != nil {
- return fmt.Errorf("butlerproto: failed to marshal Metadata: %s", err)
- }
- _, err = fw.Write(mdData)
- if err != nil {
- return fmt.Errorf("butlerproto: failed to write Metadata frame: %s", err)
- }
- if err := fw.Flush(); err != nil {
- return fmt.Errorf("butlerproto: failed to flush Metadata frame: %s", err)
- }
-
- // Write data frame.
- _, err = fw.Write(data)
- if err != nil {
- return fmt.Errorf("butlerproto: failed to write data frame: %s", err)
- }
- if err := fw.Flush(); err != nil {
- return fmt.Errorf("butlerproto: failed to flush data frame: %s", err)
- }
- return nil
-}
-
-// WriteWith writes a ButlerLogBundle to the supplied Writer.
-func (w *Writer) Write(iw io.Writer, b *protocol.ButlerLogBundle) error {
- return w.WriteWith(recordio.NewWriter(iw), b)
-}
-
-// WriteWith writes a ButlerLogBundle to the supplied recordio.Writer.
-func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) error {
- data, err := proto.Marshal(b)
- if err != nil {
- return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b)
- }
-
- return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data)
-}
« no previous file with comments | « client/logdog/butlerproto/doc.go ('k') | client/logdog/butlerproto/proto_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698