| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package swarming | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 | |
| 11 "github.com/golang/protobuf/proto" | |
| 12 miloProto "github.com/luci/luci-go/common/proto/milo" | |
| 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" | |
| 14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | |
| 15 "github.com/luci/luci-go/milo/appengine/logdog" | |
| 16 ) | |
| 17 | |
| 18 // In-memory datastructure to hold a fake butler client. | |
| 19 type memoryStream struct { | |
| 20 props *streamproto.Properties | |
| 21 | |
| 22 closed bool | |
| 23 buf bytes.Buffer | |
| 24 isDatagram bool | |
| 25 } | |
| 26 | |
| 27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { | |
| 28 result := &logdog.Stream{ | |
| 29 Closed: s.closed, | |
| 30 IsDatagram: s.isDatagram, | |
| 31 Path: s.props.Name, | |
| 32 Prefix: s.props.Prefix, | |
| 33 } | |
| 34 | |
| 35 if s.isDatagram { | |
| 36 result.Data = &miloProto.Step{} | |
| 37 // Assume this is a miloProto.Step. | |
| 38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { | |
| 39 return nil, err | |
| 40 } | |
| 41 } else { | |
| 42 result.Text = s.buf.String() | |
| 43 } | |
| 44 | |
| 45 return result, nil | |
| 46 } | |
| 47 | |
| 48 func (s *memoryStream) Write(b []byte) (int, error) { | |
| 49 return s.buf.Write(b) | |
| 50 } | |
| 51 | |
| 52 func (s *memoryStream) Close() error { | |
| 53 s.closed = true | |
| 54 return nil | |
| 55 } | |
| 56 | |
| 57 func (s *memoryStream) WriteDatagram(b []byte) error { | |
| 58 s.isDatagram = true | |
| 59 | |
| 60 s.buf.Reset() | |
| 61 _, err := s.buf.Write(b) | |
| 62 return err | |
| 63 } | |
| 64 | |
| 65 func (s *memoryStream) Properties() *streamproto.Properties { return s.props.Clo
ne() } | |
| 66 | |
| 67 type memoryClient struct { | |
| 68 stream map[string]*memoryStream | |
| 69 } | |
| 70 | |
| 71 func (c *memoryClient) NewStream(f streamproto.Flags) (streamclient.Stream, erro
r) { | |
| 72 props := f.Properties() | |
| 73 if _, ok := c.stream[props.Name]; ok { | |
| 74 return nil, fmt.Errorf("duplicate stream, %q", props.Name) | |
| 75 } | |
| 76 s := memoryStream{ | |
| 77 props: props, | |
| 78 } | |
| 79 if c.stream == nil { | |
| 80 c.stream = map[string]*memoryStream{} | |
| 81 } | |
| 82 c.stream[s.props.Name] = &s | |
| 83 return &s, nil | |
| 84 } | |
| 85 | |
| 86 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd
ogStream) error { | |
| 87 var keys []string | |
| 88 for k := range c.stream { | |
| 89 keys = append(keys, k) | |
| 90 } | |
| 91 ms, ok := c.stream[ls.Name] | |
| 92 if !ok { | |
| 93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name,
keys) | |
| 94 } | |
| 95 lds, err := ms.ToLogDogStream() | |
| 96 if err != nil { | |
| 97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls
.Name, err, keys) | |
| 98 } | |
| 99 if lds.IsDatagram { | |
| 100 return fmt.Errorf("Expected %s to be a text stream, got a data s
tream", ls.Name) | |
| 101 } | |
| 102 s.Streams[ls.Name] = lds | |
| 103 return nil | |
| 104 } | |
| 105 | |
| 106 // addToStreams adds the set of stream with a given base path to the logdog | |
| 107 // stream map. A base path is assumed to have a stream named "annotations". | |
| 108 func (c *memoryClient) addToStreams(s *logdog.Streams, anno *miloProto.Step) err
or { | |
| 109 if lds := anno.StdoutStream; lds != nil { | |
| 110 if err := c.addLogDogTextStream(s, lds); err != nil { | |
| 111 return fmt.Errorf( | |
| 112 "Encountered error while processing step streams
for STDOUT: %s", err) | |
| 113 } | |
| 114 } | |
| 115 if lds := anno.StderrStream; lds != nil { | |
| 116 if err := c.addLogDogTextStream(s, lds); err != nil { | |
| 117 return fmt.Errorf( | |
| 118 "Encountered error while processing step streams
for STDERR: %s", err) | |
| 119 } | |
| 120 } | |
| 121 | |
| 122 // Look for substream. | |
| 123 for _, link := range anno.GetOtherLinks() { | |
| 124 lds := link.GetLogdogStream() | |
| 125 // Not a logdog stream. | |
| 126 if lds == nil { | |
| 127 continue | |
| 128 } | |
| 129 | |
| 130 if err := c.addLogDogTextStream(s, lds); err != nil { | |
| 131 return fmt.Errorf( | |
| 132 "Encountered error while processing step streams
for '%s'\n%s", lds.Name, err) | |
| 133 } | |
| 134 } | |
| 135 | |
| 136 // Now do substeps. | |
| 137 for _, subStepEntry := range anno.Substep { | |
| 138 substep := subStepEntry.GetStep() | |
| 139 if substep == nil { | |
| 140 continue | |
| 141 } | |
| 142 | |
| 143 if err := c.addToStreams(s, substep); err != nil { | |
| 144 return err | |
| 145 } | |
| 146 } | |
| 147 | |
| 148 return nil | |
| 149 } | |
| 150 | |
| 151 func (c *memoryClient) ToLogDogStreams() (*logdog.Streams, error) { | |
| 152 result := &logdog.Streams{} | |
| 153 result.Streams = map[string]*logdog.Stream{} | |
| 154 | |
| 155 // Register annotation stream. | |
| 156 const annotationStreamName = "annotations" | |
| 157 ms, ok := c.stream[annotationStreamName] | |
| 158 if !ok { | |
| 159 return nil, fmt.Errorf("Could not find stream %s", annotationStr
eamName) | |
| 160 } | |
| 161 ls, err := ms.ToLogDogStream() | |
| 162 if err != nil { | |
| 163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno
tationStreamName, err) | |
| 164 } | |
| 165 if !ls.IsDatagram { | |
| 166 return nil, fmt.Errorf( | |
| 167 "Annotation stream %s is not a datagram\nText: %s", | |
| 168 annotationStreamName, ls.Text) | |
| 169 } | |
| 170 result.Streams[annotationStreamName] = ls | |
| 171 | |
| 172 // Register any referenced LogDog streams. | |
| 173 if err := c.addToStreams(result, ls.Data); err != nil { | |
| 174 return nil, err | |
| 175 } | |
| 176 result.MainStream = ls | |
| 177 | |
| 178 if len(c.stream) != len(result.Streams) { | |
| 179 var mk, lk []string | |
| 180 for k := range c.stream { | |
| 181 mk = append(mk, k) | |
| 182 } | |
| 183 for k := range result.Streams { | |
| 184 lk = append(lk, k) | |
| 185 } | |
| 186 return nil, fmt.Errorf( | |
| 187 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", | |
| 188 len(c.stream), len(result.Streams), mk, lk) | |
| 189 } | |
| 190 | |
| 191 return result, nil | |
| 192 } | |
| OLD | NEW |