Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Unified Diff: client/logdog/butlerlib/streamclient/stream.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Fixed datagram check, now with unit tests! Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/logdog/butlerlib/streamclient/stream.go
diff --git a/client/logdog/butlerlib/streamclient/stream.go b/client/logdog/butlerlib/streamclient/stream.go
new file mode 100644
index 0000000000000000000000000000000000000000..d867a3e18feb84aa735aab10e68798040a6580a0
--- /dev/null
+++ b/client/logdog/butlerlib/streamclient/stream.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package streamclient
+
+import (
+ "errors"
+ "io"
+
+ "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/recordio"
+)
+
+// Stream is an individual LogDog Butler stream.
+type Stream interface {
estaab 2015/11/06 22:09:44 So the caller doesn't control when opening of the
dnj (Google) 2015/11/07 16:47:42 Hmm? The caller creates a new stream via "Client.
+ io.WriteCloser
+
+ // WriteDatagram writes a LogDog Butler streaming datagram to the underlying
+ // Writer.
+ WriteDatagram([]byte) error
+}
+
+// streamImpl is the standard implementation of the Stream interface.
+type streamImpl struct {
+ *streamproto.Properties
+ io.WriteCloser
+
+ // rioW is a recordio.Writer bound to the WriteCloser. This will be
+ // initialized on the first writeRecord invocation.
+ rioW recordio.Writer
+}
+
+var _ Stream = (*streamImpl)(nil)
estaab 2015/11/06 22:09:44 What does this do?
dnj (Google) 2015/11/07 16:47:42 This is a compile-time assert that the "streamImpl
+
+func (s *streamImpl) WriteDatagram(dg []byte) error {
+ if !s.isDatagramStream() {
+ return errors.New("not a datagram stream")
+ }
+
+ return s.writeRecord(dg)
+}
+
+func (s *streamImpl) Write(data []byte) (int, error) {
+ if s.isDatagramStream() {
+ return 0, errors.New("cannot use Write with datagram stream")
+ }
+
+ return s.writeRaw(data)
+}
+
+func (s *streamImpl) writeRaw(data []byte) (int, error) {
+ return s.WriteCloser.Write(data)
+}
+
+func (s *streamImpl) writeRecord(r []byte) error {
+ if s.rioW == nil {
+ s.rioW = recordio.NewWriter(s.WriteCloser)
estaab 2015/11/06 22:09:44 Can this fail?
dnj (Google) 2015/11/07 16:47:41 Nope, this is a data structure initialization, so
+ }
+ if _, err := s.rioW.Write(r); err != nil {
+ return err
+ }
+ return s.rioW.Flush()
+}
+
+func (s *streamImpl) isDatagramStream() bool {
+ return s.StreamType == protocol.LogStreamDescriptor_DATAGRAM
+}

Powered by Google App Engine
This is Rietveld 408576698