Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(752)

Side by Side Diff: common/logdog/butlerproto/proto.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package butlerproto 5 package butlerproto
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "compress/zlib" 9 "compress/zlib"
10 "errors" 10 "errors"
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
119 return err 119 return err
120 } 120 }
121 121
122 switch r.Metadata.Type { 122 switch r.Metadata.Type {
123 case protocol.ButlerMetadata_ButlerLogBundle: 123 case protocol.ButlerMetadata_ButlerLogBundle:
124 data, err := r.readData(fr) 124 data, err := r.readData(fr)
125 if err != nil { 125 if err != nil {
126 return fmt.Errorf("butlerproto: failed to read Bundle da ta: %s", err) 126 return fmt.Errorf("butlerproto: failed to read Bundle da ta: %s", err)
127 } 127 }
128 128
129 » » if r.Metadata.ProtoVersion != protocol.Version { 129 » » if r.Metadata.ProtoVersion == protocol.Version {
dnj (Google) 2016/01/21 04:36:24 Instead of rejecting messages without a known vers
130 » » » return fmt.Errorf("butlerproto: unknown protobuf version (%q != %q)", 130 » » » bundle := protocol.ButlerLogBundle{}
131 » » » » r.Metadata.ProtoVersion, protocol.Version) 131 » » » if err := proto.Unmarshal(data, &bundle); err != nil {
132 » » » » return fmt.Errorf("butlerproto: failed to unmars hal Bundle frame: %s", err)
133 » » » }
134 » » » r.Bundle = &bundle
132 } 135 }
133
134 bundle := protocol.ButlerLogBundle{}
135 if err := proto.Unmarshal(data, &bundle); err != nil {
136 return fmt.Errorf("butlerproto: failed to unmarshal Bund le frame: %s", err)
137 }
138 r.Bundle = &bundle
139 return nil 136 return nil
140 137
141 default: 138 default:
142 return fmt.Errorf("butlerproto: unknown data type: %s", r.Metada ta.Type) 139 return fmt.Errorf("butlerproto: unknown data type: %s", r.Metada ta.Type)
143 } 140 }
144 } 141 }
145 142
146 // limitErrorReader is similar to io.LimitReader, except that it returns 143 // limitErrorReader is similar to io.LimitReader, except that it returns
147 // a custom error instead of io.EOF. 144 // a custom error instead of io.EOF.
148 // 145 //
(...skipping 13 matching lines...) Expand all
162 } 159 }
163 n, err := r.Reader.Read(p) 160 n, err := r.Reader.Read(p)
164 r.limit -= int64(n) 161 r.limit -= int64(n)
165 return n, err 162 return n, err
166 } 163 }
167 164
168 // Writer writes Butler messages that the Reader can read. 165 // Writer writes Butler messages that the Reader can read.
169 type Writer struct { 166 type Writer struct {
170 protoBase 167 protoBase
171 168
169 // ProtoVersion is the protocol version string to use. If empty, the cur rent
dnj (Google) 2016/01/21 04:36:24 Useful for testing: "hey, pretend that you wrote t
170 // ProtoVersion will be used.
171 ProtoVersion string
172
172 // Compress, if true, allows the Writer to choose to compress data when 173 // Compress, if true, allows the Writer to choose to compress data when
173 // applicable. 174 // applicable.
174 Compress bool 175 Compress bool
175 176
176 // CompressThreshold is the minimum size that data must be in order to 177 // CompressThreshold is the minimum size that data must be in order to
177 CompressThreshold int 178 CompressThreshold int
178 179
179 compressBuf bytes.Buffer 180 compressBuf bytes.Buffer
180 compressWriter *zlib.Writer 181 compressWriter *zlib.Writer
181 } 182 }
182 183
183 func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_Content Type, data []byte) error { 184 func (w *Writer) writeData(fw recordio.Writer, t protocol.ButlerMetadata_Content Type, data []byte) error {
184 if int64(len(data)) > w.getMaxSize() { 185 if int64(len(data)) > w.getMaxSize() {
185 return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize()) 186 return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize())
186 } 187 }
187 188
189 pv := w.ProtoVersion
190 if pv == "" {
191 pv = protocol.Version
192 }
188 md := protocol.ButlerMetadata{ 193 md := protocol.ButlerMetadata{
189 Type: t, 194 Type: t,
190 » » ProtoVersion: protocol.Version, 195 » » ProtoVersion: pv,
191 } 196 }
192 197
193 // If we're configured to compress and the data is below our threshold, 198 // If we're configured to compress and the data is below our threshold,
194 // compress. 199 // compress.
195 if w.Compress && len(data) >= w.CompressThreshold { 200 if w.Compress && len(data) >= w.CompressThreshold {
196 w.compressBuf.Reset() 201 w.compressBuf.Reset()
197 if w.compressWriter == nil { 202 if w.compressWriter == nil {
198 w.compressWriter = zlib.NewWriter(&w.compressBuf) 203 w.compressWriter = zlib.NewWriter(&w.compressBuf)
199 } else { 204 } else {
200 w.compressWriter.Reset(&w.compressBuf) 205 w.compressWriter.Reset(&w.compressBuf)
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 249
245 // WriteWith writes a ButlerLogBundle to the supplied recordio.Writer. 250 // WriteWith writes a ButlerLogBundle to the supplied recordio.Writer.
246 func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) erro r { 251 func (w *Writer) WriteWith(fw recordio.Writer, b *protocol.ButlerLogBundle) erro r {
247 data, err := proto.Marshal(b) 252 data, err := proto.Marshal(b)
248 if err != nil { 253 if err != nil {
249 return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b ) 254 return fmt.Errorf("butlerproto: failed to marshal Bundle: %s", b )
250 } 255 }
251 256
252 return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data) 257 return w.writeData(fw, protocol.ButlerMetadata_ButlerLogBundle, data)
253 } 258 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698