OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package swarming | 5 package swarming |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 | 10 |
11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
12 miloProto "github.com/luci/luci-go/common/proto/milo" | 12 miloProto "github.com/luci/luci-go/common/proto/milo" |
13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" | 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" |
14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
15 » "github.com/luci/luci-go/milo/appengine/logdog" | 15 » "github.com/luci/luci-go/milo/appengine/job_source/raw_presentation" |
16 ) | 16 ) |
17 | 17 |
18 // In-memory datastructure to hold a fake butler client. | 18 // In-memory datastructure to hold a fake butler client. |
19 type memoryStream struct { | 19 type memoryStream struct { |
20 props *streamproto.Properties | 20 props *streamproto.Properties |
21 | 21 |
22 closed bool | 22 closed bool |
23 buf bytes.Buffer | 23 buf bytes.Buffer |
24 isDatagram bool | 24 isDatagram bool |
25 } | 25 } |
26 | 26 |
27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { | 27 func (s *memoryStream) ToLogDogStream() (*raw_presentation.Stream, error) { |
28 » result := &logdog.Stream{ | 28 » result := &raw_presentation.Stream{ |
29 Closed: s.closed, | 29 Closed: s.closed, |
30 IsDatagram: s.isDatagram, | 30 IsDatagram: s.isDatagram, |
31 Path: s.props.Name, | 31 Path: s.props.Name, |
32 Prefix: s.props.Prefix, | 32 Prefix: s.props.Prefix, |
33 } | 33 } |
34 | 34 |
35 if s.isDatagram { | 35 if s.isDatagram { |
36 result.Data = &miloProto.Step{} | 36 result.Data = &miloProto.Step{} |
37 // Assume this is a miloProto.Step. | 37 // Assume this is a miloProto.Step. |
38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { | 38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni
l { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 s := memoryStream{ | 76 s := memoryStream{ |
77 props: props, | 77 props: props, |
78 } | 78 } |
79 if c.stream == nil { | 79 if c.stream == nil { |
80 c.stream = map[string]*memoryStream{} | 80 c.stream = map[string]*memoryStream{} |
81 } | 81 } |
82 c.stream[s.props.Name] = &s | 82 c.stream[s.props.Name] = &s |
83 return &s, nil | 83 return &s, nil |
84 } | 84 } |
85 | 85 |
86 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd
ogStream) error { | 86 func (c *memoryClient) addLogDogTextStream(s *raw_presentation.Streams, ls *milo
Proto.LogdogStream) error { |
87 var keys []string | 87 var keys []string |
88 for k := range c.stream { | 88 for k := range c.stream { |
89 keys = append(keys, k) | 89 keys = append(keys, k) |
90 } | 90 } |
91 ms, ok := c.stream[ls.Name] | 91 ms, ok := c.stream[ls.Name] |
92 if !ok { | 92 if !ok { |
93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name,
keys) | 93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name,
keys) |
94 } | 94 } |
95 lds, err := ms.ToLogDogStream() | 95 lds, err := ms.ToLogDogStream() |
96 if err != nil { | 96 if err != nil { |
97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls
.Name, err, keys) | 97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls
.Name, err, keys) |
98 } | 98 } |
99 if lds.IsDatagram { | 99 if lds.IsDatagram { |
100 return fmt.Errorf("Expected %s to be a text stream, got a data s
tream", ls.Name) | 100 return fmt.Errorf("Expected %s to be a text stream, got a data s
tream", ls.Name) |
101 } | 101 } |
102 s.Streams[ls.Name] = lds | 102 s.Streams[ls.Name] = lds |
103 return nil | 103 return nil |
104 } | 104 } |
105 | 105 |
106 // addToStreams adds the set of stream with a given base path to the logdog | 106 // addToStreams adds the set of stream with a given base path to the logdog |
107 // stream map. A base path is assumed to have a stream named "annotations". | 107 // stream map. A base path is assumed to have a stream named "annotations". |
108 func (c *memoryClient) addToStreams(s *logdog.Streams, anno *miloProto.Step) err
or { | 108 func (c *memoryClient) addToStreams(s *raw_presentation.Streams, anno *miloProto
.Step) error { |
109 if lds := anno.StdoutStream; lds != nil { | 109 if lds := anno.StdoutStream; lds != nil { |
110 if err := c.addLogDogTextStream(s, lds); err != nil { | 110 if err := c.addLogDogTextStream(s, lds); err != nil { |
111 return fmt.Errorf( | 111 return fmt.Errorf( |
112 "Encountered error while processing step streams
for STDOUT: %s", err) | 112 "Encountered error while processing step streams
for STDOUT: %s", err) |
113 } | 113 } |
114 } | 114 } |
115 if lds := anno.StderrStream; lds != nil { | 115 if lds := anno.StderrStream; lds != nil { |
116 if err := c.addLogDogTextStream(s, lds); err != nil { | 116 if err := c.addLogDogTextStream(s, lds); err != nil { |
117 return fmt.Errorf( | 117 return fmt.Errorf( |
118 "Encountered error while processing step streams
for STDERR: %s", err) | 118 "Encountered error while processing step streams
for STDERR: %s", err) |
(...skipping 22 matching lines...) Expand all Loading... |
141 } | 141 } |
142 | 142 |
143 if err := c.addToStreams(s, substep); err != nil { | 143 if err := c.addToStreams(s, substep); err != nil { |
144 return err | 144 return err |
145 } | 145 } |
146 } | 146 } |
147 | 147 |
148 return nil | 148 return nil |
149 } | 149 } |
150 | 150 |
151 func (c *memoryClient) ToLogDogStreams() (*logdog.Streams, error) { | 151 func (c *memoryClient) ToLogDogStreams() (*raw_presentation.Streams, error) { |
152 » result := &logdog.Streams{} | 152 » result := &raw_presentation.Streams{} |
153 » result.Streams = map[string]*logdog.Stream{} | 153 » result.Streams = map[string]*raw_presentation.Stream{} |
154 | 154 |
155 // Register annotation stream. | 155 // Register annotation stream. |
156 const annotationStreamName = "annotations" | 156 const annotationStreamName = "annotations" |
157 ms, ok := c.stream[annotationStreamName] | 157 ms, ok := c.stream[annotationStreamName] |
158 if !ok { | 158 if !ok { |
159 return nil, fmt.Errorf("Could not find stream %s", annotationStr
eamName) | 159 return nil, fmt.Errorf("Could not find stream %s", annotationStr
eamName) |
160 } | 160 } |
161 ls, err := ms.ToLogDogStream() | 161 ls, err := ms.ToLogDogStream() |
162 if err != nil { | 162 if err != nil { |
163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno
tationStreamName, err) | 163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno
tationStreamName, err) |
(...skipping 19 matching lines...) Expand all Loading... |
183 for k := range result.Streams { | 183 for k := range result.Streams { |
184 lk = append(lk, k) | 184 lk = append(lk, k) |
185 } | 185 } |
186 return nil, fmt.Errorf( | 186 return nil, fmt.Errorf( |
187 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", | 187 "Number of streams do not match %d vs %d\nMemory:%s\nRes
ult:%s", |
188 len(c.stream), len(result.Streams), mk, lk) | 188 len(c.stream), len(result.Streams), mk, lk) |
189 } | 189 } |
190 | 190 |
191 return result, nil | 191 return result, nil |
192 } | 192 } |
OLD | NEW |