| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 | 10 |
| 11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 miloProto "github.com/luci/luci-go/common/proto/milo" | 12 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" | 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" |
| 14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
| 15 "github.com/luci/luci-go/milo/appengine/logdog" | 15 "github.com/luci/luci-go/milo/appengine/logdog" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 // In-memory datastructure to hold a fake butler client. | 18 // In-memory datastructure to hold a fake butler client. |
| 19 type memoryStream struct { | 19 type memoryStream struct { |
| 20 » *streamproto.Properties | 20 » props *streamproto.Properties |
| 21 | 21 |
| 22 closed bool | 22 closed bool |
| 23 buf bytes.Buffer | 23 buf bytes.Buffer |
| 24 isDatagram bool | 24 isDatagram bool |
| 25 } | 25 } |
| 26 | 26 |
| 27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { | 27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { |
| 28 result := &logdog.Stream{ | 28 result := &logdog.Stream{ |
| 29 Closed: s.closed, | 29 Closed: s.closed, |
| 30 IsDatagram: s.isDatagram, | 30 IsDatagram: s.isDatagram, |
| 31 » » Path: s.Name, | 31 » » Path: s.props.Name, |
| 32 » » Prefix: s.Prefix, | 32 » » Prefix: s.props.Prefix, |
| 33 } | 33 } |
| 34 | 34 |
| 35 if s.isDatagram { | 35 if s.isDatagram { |
| 36 result.Data = &miloProto.Step{} | 36 result.Data = &miloProto.Step{} |
| 37 // Assume this is a miloProto.Step. | 37 // Assume this is a miloProto.Step. |
| 38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { | 38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { |
| 39 return nil, err | 39 return nil, err |
| 40 } | 40 } |
| 41 } else { | 41 } else { |
| 42 result.Text = s.buf.String() | 42 result.Text = s.buf.String() |
| (...skipping 12 matching lines...) Expand all Loading... |
| 55 } | 55 } |
| 56 | 56 |
| 57 func (s *memoryStream) WriteDatagram(b []byte) error { | 57 func (s *memoryStream) WriteDatagram(b []byte) error { |
| 58 s.isDatagram = true | 58 s.isDatagram = true |
| 59 | 59 |
| 60 s.buf.Reset() | 60 s.buf.Reset() |
| 61 _, err := s.buf.Write(b) | 61 _, err := s.buf.Write(b) |
| 62 return err | 62 return err |
| 63 } | 63 } |
| 64 | 64 |
| 65 func (s *memoryStream) Properties() *streamproto.Properties { return s.props.Clo
ne() } |
| 66 |
| 65 type memoryClient struct { | 67 type memoryClient struct { |
| 66 stream map[string]*memoryStream | 68 stream map[string]*memoryStream |
| 67 } | 69 } |
| 68 | 70 |
| 69 func (c *memoryClient) NewStream(f streamproto.Flags) (streamclient.Stream, erro
r) { | 71 func (c *memoryClient) NewStream(f streamproto.Flags) (streamclient.Stream, erro
r) { |
| 70 props := f.Properties() | 72 props := f.Properties() |
| 71 if _, ok := c.stream[props.Name]; ok { | 73 if _, ok := c.stream[props.Name]; ok { |
| 72 return nil, fmt.Errorf("duplicate stream, %q", props.Name) | 74 return nil, fmt.Errorf("duplicate stream, %q", props.Name) |
| 73 } | 75 } |
| 74 s := memoryStream{ | 76 s := memoryStream{ |
| 75 » » Properties: props, | 77 » » props: props, |
| 76 } | 78 } |
| 77 if c.stream == nil { | 79 if c.stream == nil { |
| 78 c.stream = map[string]*memoryStream{} | 80 c.stream = map[string]*memoryStream{} |
| 79 } | 81 } |
| 80 » c.stream[s.Name] = &s | 82 » c.stream[s.props.Name] = &s |
| 81 return &s, nil | 83 return &s, nil |
| 82 } | 84 } |
| 83 | 85 |
| 84 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd
ogStream) error { | 86 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd
ogStream) error { |
| 85 var keys []string | 87 var keys []string |
| 86 for k := range c.stream { | 88 for k := range c.stream { |
| 87 keys = append(keys, k) | 89 keys = append(keys, k) |
| 88 } | 90 } |
| 89 ms, ok := c.stream[ls.Name] | 91 ms, ok := c.stream[ls.Name] |
| 90 if !ok { | 92 if !ok { |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 181 for k := range result.Streams { | 183 for k := range result.Streams { |
| 182 lk = append(lk, k) | 184 lk = append(lk, k) |
| 183 } | 185 } |
| 184 return nil, fmt.Errorf( | 186 return nil, fmt.Errorf( |
| 185 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", | 187 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", |
| 186 len(c.stream), len(result.Streams), mk, lk) | 188 len(c.stream), len(result.Streams), mk, lk) |
| 187 } | 189 } |
| 188 | 190 |
| 189 return result, nil | 191 return result, nil |
| 190 } | 192 } |
| OLD | NEW |