| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "testing" | 10 "testing" |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 94 "foo": "bar", | 94 "foo": "bar", |
| 95 "baz": "qux", | 95 "baz": "qux", |
| 96 }, | 96 }, |
| 97 ContentType: "application/text", | 97 ContentType: "application/text", |
| 98 Before: now, | 98 Before: now, |
| 99 After: now, | 99 After: now, |
| 100 Purged: Both, | 100 Purged: Both, |
| 101 State: true, | 101 State: true, |
| 102 } | 102 } |
| 103 | 103 |
| 104 st := logdog.LogStreamState{ |
| 105 Created: google.NewTimestamp(now), |
| 106 } |
| 107 |
| 104 var results []*LogStream | 108 var results []*LogStream |
| 105 accumulate := func(s *LogStream) bool { | 109 accumulate := func(s *LogStream) bool { |
| 106 results = append(results, s) | 110 results = append(results, s) |
| 107 return true | 111 return true |
| 108 } | 112 } |
| 109 | 113 |
| 110 Convey(`Can accumulate results across queries.`, func()
{ | 114 Convey(`Can accumulate results across queries.`, func()
{ |
| 111 // This handler will return a single query per r
equest, as well as a | 115 // This handler will return a single query per r
equest, as well as a |
| 112 // non-empty Next pointer for the next query ele
ment. It progresses | 116 // non-empty Next pointer for the next query ele
ment. It progresses |
| 113 // "a" => "b" => "final" => "". | 117 // "a" => "b" => "final" => "". |
| 114 svc.H = func(req *logdog.QueryRequest) (*logdog.
QueryResponse, error) { | 118 svc.H = func(req *logdog.QueryRequest) (*logdog.
QueryResponse, error) { |
| 115 r := logdog.QueryResponse{ | 119 r := logdog.QueryResponse{ |
| 116 Project: string(project), | 120 Project: string(project), |
| 117 } | 121 } |
| 122 |
| 118 switch req.Next { | 123 switch req.Next { |
| 119 case "": | 124 case "": |
| 120 » » » » » » r.Streams = append(r.Streams, ge
n("a", nil)) | 125 » » » » » » r.Streams = append(r.Streams, ge
n("a", &st)) |
| 121 r.Next = "b" | 126 r.Next = "b" |
| 122 case "b": | 127 case "b": |
| 123 » » » » » » r.Streams = append(r.Streams, ge
n("b", nil)) | 128 » » » » » » r.Streams = append(r.Streams, ge
n("b", &st)) |
| 124 r.Next = "final" | 129 r.Next = "final" |
| 125 case "final": | 130 case "final": |
| 126 » » » » » » r.Streams = append(r.Streams, ge
n("final", nil)) | 131 » » » » » » r.Streams = append(r.Streams, ge
n("final", &st)) |
| 127 default: | 132 default: |
| 128 return nil, errors.New("invalid
cursor") | 133 return nil, errors.New("invalid
cursor") |
| 129 } | 134 } |
| 130 return &r, nil | 135 return &r, nil |
| 131 } | 136 } |
| 132 | 137 |
| 133 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) | 138 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) |
| 134 So(results, shouldHaveLogStreams, "test/+/a", "t
est/+/b", "test/+/final") | 139 So(results, shouldHaveLogStreams, "test/+/a", "t
est/+/b", "test/+/final") |
| 135 }) | 140 }) |
| 136 | 141 |
| 137 Convey(`Will stop invoking the callback if it returns fa
lse.`, func() { | 142 Convey(`Will stop invoking the callback if it returns fa
lse.`, func() { |
| 138 // This handler will return three query results,
"a", "b", and "c". | 143 // This handler will return three query results,
"a", "b", and "c". |
| 139 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { | 144 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { |
| 140 return &logdog.QueryResponse{ | 145 return &logdog.QueryResponse{ |
| 141 Streams: []*logdog.QueryResponse
_Stream{ | 146 Streams: []*logdog.QueryResponse
_Stream{ |
| 142 » » » » » » » gen("a", nil), | 147 » » » » » » » gen("a", &st), |
| 143 » » » » » » » gen("b", nil), | 148 » » » » » » » gen("b", &st), |
| 144 » » » » » » » gen("c", nil), | 149 » » » » » » » gen("c", &st), |
| 145 }, | 150 }, |
| 146 Next: "infiniteloop", | 151 Next: "infiniteloop", |
| 147 }, nil | 152 }, nil |
| 148 } | 153 } |
| 149 | 154 |
| 150 accumulate = func(s *LogStream) bool { | 155 accumulate = func(s *LogStream) bool { |
| 151 results = append(results, s) | 156 results = append(results, s) |
| 152 return len(results) < 3 | 157 return len(results) < 3 |
| 153 } | 158 } |
| 154 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) | 159 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) |
| 155 So(results, shouldHaveLogStreams, "test/+/a", "t
est/+/b", "test/+/c") | 160 So(results, shouldHaveLogStreams, "test/+/a", "t
est/+/b", "test/+/c") |
| 156 }) | 161 }) |
| 157 | 162 |
| 158 Convey(`Will properly handle state and protobuf deserial
ization.`, func() { | 163 Convey(`Will properly handle state and protobuf deserial
ization.`, func() { |
| 159 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { | 164 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { |
| 160 return &logdog.QueryResponse{ | 165 return &logdog.QueryResponse{ |
| 161 Streams: []*logdog.QueryResponse
_Stream{ | 166 Streams: []*logdog.QueryResponse
_Stream{ |
| 162 gen("a", &logdog.LogStre
amState{ | 167 gen("a", &logdog.LogStre
amState{ |
| 163 Created: google.
NewTimestamp(now), | 168 Created: google.
NewTimestamp(now), |
| 164 }), | 169 }), |
| 165 }, | 170 }, |
| 166 }, nil | 171 }, nil |
| 167 } | 172 } |
| 168 | 173 |
| 169 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) | 174 So(client.Query(c, project, path, q, accumulate)
, ShouldBeNil) |
| 170 So(results, shouldHaveLogStreams, "test/+/a") | 175 So(results, shouldHaveLogStreams, "test/+/a") |
| 171 So(results[0], ShouldResemble, &LogStream{ | 176 So(results[0], ShouldResemble, &LogStream{ |
| 172 Path: "test/+/a", | 177 Path: "test/+/a", |
| 173 » » » » » Desc: &logpb.LogStreamDescriptor{Prefix:
"test", Name: "a"}, | 178 » » » » » Desc: logpb.LogStreamDescriptor{Prefix:
"test", Name: "a"}, |
| 174 » » » » » State: &StreamState{ | 179 » » » » » State: StreamState{ |
| 175 Created: now.UTC(), | 180 Created: now.UTC(), |
| 176 }, | 181 }, |
| 177 }) | 182 }) |
| 178 }) | 183 }) |
| 179 | 184 |
| 180 Convey(`Can query for stream types`, func() { | 185 Convey(`Can query for stream types`, func() { |
| 181 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { | 186 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { |
| 182 return &logdog.QueryResponse{}, nil | 187 return &logdog.QueryResponse{}, nil |
| 183 } | 188 } |
| 184 | 189 |
| (...skipping 27 matching lines...) Expand all Loading... |
| 212 Convey(`Will return ErrNoAccess if permission denied.`,
func() { | 217 Convey(`Will return ErrNoAccess if permission denied.`,
func() { |
| 213 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { | 218 svc.H = func(*logdog.QueryRequest) (*logdog.Quer
yResponse, error) { |
| 214 return nil, grpcutil.Unauthenticated | 219 return nil, grpcutil.Unauthenticated |
| 215 } | 220 } |
| 216 | 221 |
| 217 So(client.Query(c, project, path, q, accumulate)
, ShouldEqual, ErrNoAccess) | 222 So(client.Query(c, project, path, q, accumulate)
, ShouldEqual, ErrNoAccess) |
| 218 }) | 223 }) |
| 219 }) | 224 }) |
| 220 }) | 225 }) |
| 221 } | 226 } |
| OLD | NEW |