Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: milo/appengine/job_source/swarming/memoryClient.go

Issue 2944983003: [milo] {buildbucket,buildbot,swarming,logdog} -> backends/*. (Closed)
Patch Set: fix the tests Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 10
11 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
12 miloProto "github.com/luci/luci-go/common/proto/milo" 12 miloProto "github.com/luci/luci-go/common/proto/milo"
13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient" 13 "github.com/luci/luci-go/logdog/client/butlerlib/streamclient"
14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" 14 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto"
15 » "github.com/luci/luci-go/milo/appengine/logdog" 15 » "github.com/luci/luci-go/milo/appengine/job_source/raw_presentation"
16 ) 16 )
17 17
18 // In-memory datastructure to hold a fake butler client. 18 // In-memory datastructure to hold a fake butler client.
19 type memoryStream struct { 19 type memoryStream struct {
20 props *streamproto.Properties 20 props *streamproto.Properties
21 21
22 closed bool 22 closed bool
23 buf bytes.Buffer 23 buf bytes.Buffer
24 isDatagram bool 24 isDatagram bool
25 } 25 }
26 26
27 func (s *memoryStream) ToLogDogStream() (*logdog.Stream, error) { 27 func (s *memoryStream) ToLogDogStream() (*raw_presentation.Stream, error) {
28 » result := &logdog.Stream{ 28 » result := &raw_presentation.Stream{
29 Closed: s.closed, 29 Closed: s.closed,
30 IsDatagram: s.isDatagram, 30 IsDatagram: s.isDatagram,
31 Path: s.props.Name, 31 Path: s.props.Name,
32 Prefix: s.props.Prefix, 32 Prefix: s.props.Prefix,
33 } 33 }
34 34
35 if s.isDatagram { 35 if s.isDatagram {
36 result.Data = &miloProto.Step{} 36 result.Data = &miloProto.Step{}
37 // Assume this is a miloProto.Step. 37 // Assume this is a miloProto.Step.
38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni l { 38 if err := proto.Unmarshal(s.buf.Bytes(), result.Data); err != ni l {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 s := memoryStream{ 76 s := memoryStream{
77 props: props, 77 props: props,
78 } 78 }
79 if c.stream == nil { 79 if c.stream == nil {
80 c.stream = map[string]*memoryStream{} 80 c.stream = map[string]*memoryStream{}
81 } 81 }
82 c.stream[s.props.Name] = &s 82 c.stream[s.props.Name] = &s
83 return &s, nil 83 return &s, nil
84 } 84 }
85 85
86 func (c *memoryClient) addLogDogTextStream(s *logdog.Streams, ls *miloProto.Logd ogStream) error { 86 func (c *memoryClient) addLogDogTextStream(s *raw_presentation.Streams, ls *milo Proto.LogdogStream) error {
87 var keys []string 87 var keys []string
88 for k := range c.stream { 88 for k := range c.stream {
89 keys = append(keys, k) 89 keys = append(keys, k)
90 } 90 }
91 ms, ok := c.stream[ls.Name] 91 ms, ok := c.stream[ls.Name]
92 if !ok { 92 if !ok {
93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name, keys) 93 return fmt.Errorf("Could not find text stream %q\n%s", ls.Name, keys)
94 } 94 }
95 lds, err := ms.ToLogDogStream() 95 lds, err := ms.ToLogDogStream()
96 if err != nil { 96 if err != nil {
97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls .Name, err, keys) 97 return fmt.Errorf("Could not convert text stream %s\n%s\n%s", ls .Name, err, keys)
98 } 98 }
99 if lds.IsDatagram { 99 if lds.IsDatagram {
100 return fmt.Errorf("Expected %s to be a text stream, got a data s tream", ls.Name) 100 return fmt.Errorf("Expected %s to be a text stream, got a data s tream", ls.Name)
101 } 101 }
102 s.Streams[ls.Name] = lds 102 s.Streams[ls.Name] = lds
103 return nil 103 return nil
104 } 104 }
105 105
106 // addToStreams adds the set of stream with a given base path to the logdog 106 // addToStreams adds the set of stream with a given base path to the logdog
107 // stream map. A base path is assumed to have a stream named "annotations". 107 // stream map. A base path is assumed to have a stream named "annotations".
108 func (c *memoryClient) addToStreams(s *logdog.Streams, anno *miloProto.Step) err or { 108 func (c *memoryClient) addToStreams(s *raw_presentation.Streams, anno *miloProto .Step) error {
109 if lds := anno.StdoutStream; lds != nil { 109 if lds := anno.StdoutStream; lds != nil {
110 if err := c.addLogDogTextStream(s, lds); err != nil { 110 if err := c.addLogDogTextStream(s, lds); err != nil {
111 return fmt.Errorf( 111 return fmt.Errorf(
112 "Encountered error while processing step streams for STDOUT: %s", err) 112 "Encountered error while processing step streams for STDOUT: %s", err)
113 } 113 }
114 } 114 }
115 if lds := anno.StderrStream; lds != nil { 115 if lds := anno.StderrStream; lds != nil {
116 if err := c.addLogDogTextStream(s, lds); err != nil { 116 if err := c.addLogDogTextStream(s, lds); err != nil {
117 return fmt.Errorf( 117 return fmt.Errorf(
118 "Encountered error while processing step streams for STDERR: %s", err) 118 "Encountered error while processing step streams for STDERR: %s", err)
(...skipping 22 matching lines...) Expand all
141 } 141 }
142 142
143 if err := c.addToStreams(s, substep); err != nil { 143 if err := c.addToStreams(s, substep); err != nil {
144 return err 144 return err
145 } 145 }
146 } 146 }
147 147
148 return nil 148 return nil
149 } 149 }
150 150
151 func (c *memoryClient) ToLogDogStreams() (*logdog.Streams, error) { 151 func (c *memoryClient) ToLogDogStreams() (*raw_presentation.Streams, error) {
152 » result := &logdog.Streams{} 152 » result := &raw_presentation.Streams{}
153 » result.Streams = map[string]*logdog.Stream{} 153 » result.Streams = map[string]*raw_presentation.Stream{}
154 154
155 // Register annotation stream. 155 // Register annotation stream.
156 const annotationStreamName = "annotations" 156 const annotationStreamName = "annotations"
157 ms, ok := c.stream[annotationStreamName] 157 ms, ok := c.stream[annotationStreamName]
158 if !ok { 158 if !ok {
159 return nil, fmt.Errorf("Could not find stream %s", annotationStr eamName) 159 return nil, fmt.Errorf("Could not find stream %s", annotationStr eamName)
160 } 160 }
161 ls, err := ms.ToLogDogStream() 161 ls, err := ms.ToLogDogStream()
162 if err != nil { 162 if err != nil {
163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno tationStreamName, err) 163 return nil, fmt.Errorf("Could not unmarshal stream %s\n%s", anno tationStreamName, err)
(...skipping 19 matching lines...) Expand all
183 for k := range result.Streams { 183 for k := range result.Streams {
184 lk = append(lk, k) 184 lk = append(lk, k)
185 } 185 }
186 return nil, fmt.Errorf( 186 return nil, fmt.Errorf(
187 "Number of streams do not match %d vs %d\nMemory:%s\nRes ult:%s", 187 "Number of streams do not match %d vs %d\nMemory:%s\nRes ult:%s",
188 len(c.stream), len(result.Streams), mk, lk) 188 len(c.stream), len(result.Streams), mk, lk)
189 } 189 }
190 190
191 return result, nil 191 return result, nil
192 } 192 }
OLDNEW
« no previous file with comments | « milo/appengine/job_source/swarming/html_test.go ('k') | milo/appengine/job_source/swarming/testdata/build-canceled » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698