OLD | NEW |
| (Empty) |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package swarming | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 | |
11 "github.com/golang/protobuf/proto" | |
12 miloProto "github.com/luci/luci-go/common/proto/milo" | |
13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" | |
14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | |
15 "github.com/luci/luci-go/milo/appengine/logdog" | |
16 ) | |
17 | |
18 // In-memory datastructure to hold a fake butler client. | |
19 type memoryStream struct { | |
20 props *streamproto.Properties | |
21 | |
22 closed bool | |
23 buf bytes.Buffer | |
24 isDatagram bool | |
25 } | |
26 | |
27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { | |
28 result := &logdog.Stream{ | |
29 Closed: s.closed, | |
30 IsDatagram: s.isDatagram, | |
31 Path: s.props.Name, | |
32 Prefix: s.props.Prefix, | |
33 } | |
34 | |
35 if s.isDatagram { | |
36 result.Data = &miloProto.Step{} | |
37 // Assume this is a miloProto.Step. | |
38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { | |
39 return nil, err | |
40 } | |
41 } else { | |
42 result.Text = s.buf.String() | |
43 } | |
44 | |
45 return result, nil | |
46 } | |
47 | |
48 func (s *memoryStream) Write(b []byte) (int, error) { | |
49 return s.buf.Write(b) | |
50 } | |
51 | |
52 func (s *memoryStream) Close() error { | |
53 s.closed = true | |
54 return nil | |
55 } | |
56 | |
57 func (s *memoryStream) WriteDatagram(b []byte) error { | |
58 s.isDatagram = true | |
59 | |
60 s.buf.Reset() | |
61 _, err := s.buf.Write(b) | |
62 return err | |
63 } | |
64 | |
65 func (s *memoryStream) Properties() *streamproto.Properties { return s.props.Clo
ne() } | |
66 | |
67 type memoryClient struct { | |
68 stream map[string]*memoryStream | |
69 } | |
70 | |
71 func (c *memoryClient) NewStream(f streamproto.Flags) (streamclient.Stream, erro
r) { | |
72 props := f.Properties() | |
73 if _, ok := c.stream[props.Name]; ok { | |
74 return nil, fmt.Errorf("duplicate stream, %q", props.Name) | |
75 } | |
76 s := memoryStream{ | |
77 props: props, | |
78 } | |
79 if c.stream == nil { | |
80 c.stream = map[string]*memoryStream{} | |
81 } | |
82 c.stream[s.props.Name] = &s | |
83 return &s, nil | |
84 } | |
85 | |
86 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd
ogStream) error { | |
87 var keys []string | |
88 for k := range c.stream { | |
89 keys = append(keys, k) | |
90 } | |
91 ms, ok := c.stream[ls.Name] | |
92 if !ok { | |
93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name,
keys) | |
94 } | |
95 lds, err := ms.ToLogDogStream() | |
96 if err != nil { | |
97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls
.Name, err, keys) | |
98 } | |
99 if lds.IsDatagram { | |
100 return fmt.Errorf("Expected %s to be a text stream, got a data s
tream", ls.Name) | |
101 } | |
102 s.Streams[ls.Name] = lds | |
103 return nil | |
104 } | |
105 | |
106 // addToStreams adds the set of stream with a given base path to the logdog | |
107 // stream map. A base path is assumed to have a stream named "annotations". | |
108 func (c *memoryClient) addToStreams(s *logdog.Streams, anno *miloProto.Step) err
or { | |
109 if lds := anno.StdoutStream; lds != nil { | |
110 if err := c.addLogDogTextStream(s, lds); err != nil { | |
111 return fmt.Errorf( | |
112 "Encountered error while processing step streams
for STDOUT: %s", err) | |
113 } | |
114 } | |
115 if lds := anno.StderrStream; lds != nil { | |
116 if err := c.addLogDogTextStream(s, lds); err != nil { | |
117 return fmt.Errorf( | |
118 "Encountered error while processing step streams
for STDERR: %s", err) | |
119 } | |
120 } | |
121 | |
122 // Look for substream. | |
123 for _, link := range anno.GetOtherLinks() { | |
124 lds := link.GetLogdogStream() | |
125 // Not a logdog stream. | |
126 if lds == nil { | |
127 continue | |
128 } | |
129 | |
130 if err := c.addLogDogTextStream(s, lds); err != nil { | |
131 return fmt.Errorf( | |
132 "Encountered error while processing step streams
for '%s'\n%s", lds.Name, err) | |
133 } | |
134 } | |
135 | |
136 // Now do substeps. | |
137 for _, subStepEntry := range anno.Substep { | |
138 substep := subStepEntry.GetStep() | |
139 if substep == nil { | |
140 continue | |
141 } | |
142 | |
143 if err := c.addToStreams(s, substep); err != nil { | |
144 return err | |
145 } | |
146 } | |
147 | |
148 return nil | |
149 } | |
150 | |
151 func (c *memoryClient) ToLogDogStreams() (*logdog.Streams, error) { | |
152 result := &logdog.Streams{} | |
153 result.Streams = map[string]*logdog.Stream{} | |
154 | |
155 // Register annotation stream. | |
156 const annotationStreamName = "annotations" | |
157 ms, ok := c.stream[annotationStreamName] | |
158 if !ok { | |
159 return nil, fmt.Errorf("Could not find stream %s", annotationStr
eamName) | |
160 } | |
161 ls, err := ms.ToLogDogStream() | |
162 if err != nil { | |
163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno
tationStreamName, err) | |
164 } | |
165 if !ls.IsDatagram { | |
166 return nil, fmt.Errorf( | |
167 "Annotation stream %s is not a datagram\nText: %s", | |
168 annotationStreamName, ls.Text) | |
169 } | |
170 result.Streams[annotationStreamName] = ls | |
171 | |
172 // Register any referenced LogDog streams. | |
173 if err := c.addToStreams(result, ls.Data); err != nil { | |
174 return nil, err | |
175 } | |
176 result.MainStream = ls | |
177 | |
178 if len(c.stream) != len(result.Streams) { | |
179 var mk, lk []string | |
180 for k := range c.stream { | |
181 mk = append(mk, k) | |
182 } | |
183 for k := range result.Streams { | |
184 lk = append(lk, k) | |
185 } | |
186 return nil, fmt.Errorf( | |
187 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", | |
188 len(c.stream), len(result.Streams), mk, lk) | |
189 } | |
190 | |
191 return result, nil | |
192 } | |
OLD | NEW |