| OLD | NEW |
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 50 | 50 |
| 51 d := Dispatcher{} | 51 d := Dispatcher{} |
| 52 r := router.New() | 52 r := router.New() |
| 53 | 53 |
| 54 installRoutes := func() { | 54 installRoutes := func() { |
| 55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou
ter.Context, next router.Handler) { | 55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou
ter.Context, next router.Handler) { |
| 56 c.Context = ctx | 56 c.Context = ctx |
| 57 next(c) | 57 next(c) |
| 58 })) | 58 })) |
| 59 } | 59 } |
| 60 runTasks := func(ctx context.Context) []int { |
| 61 var codes []int |
| 62 for _, tasks := range taskqueue.GetTestable(ctx).GetSche
duledTasks() { |
| 63 for _, task := range tasks { |
| 64 // Execute the task. |
| 65 req := httptest.NewRequest("POST", "http
://example.com"+task.Path, bytes.NewReader(task.Payload)) |
| 66 rw := httptest.NewRecorder() |
| 67 r.ServeHTTP(rw, req) |
| 68 codes = append(codes, rw.Code) |
| 69 } |
| 70 } |
| 71 return codes |
| 72 } |
| 60 | 73 |
| 61 Convey("Single task", func() { | 74 Convey("Single task", func() { |
| 62 var calls []proto.Message | 75 var calls []proto.Message |
| 63 handler := func(c context.Context, payload proto.Message
, execCount int) error { | 76 handler := func(c context.Context, payload proto.Message
, execCount int) error { |
| 64 calls = append(calls, payload) | 77 calls = append(calls, payload) |
| 65 return nil | 78 return nil |
| 66 } | 79 } |
| 67 | 80 |
| 68 // Abuse some well-known proto type to simplify the test
. It's doesn't | 81 // Abuse some well-known proto type to simplify the test
. It's doesn't |
| 69 // matter what proto type we use here as long as it is r
egistered in | 82 // matter what proto type we use here as long as it is r
egistered in |
| 70 // protobuf type registry. | 83 // protobuf type registry. |
| 71 d.RegisterTask(&duration.Duration{}, handler, "", nil) | 84 d.RegisterTask(&duration.Duration{}, handler, "", nil) |
| 72 installRoutes() | 85 installRoutes() |
| 73 | 86 |
| 74 err := d.AddTask(ctx, &Task{ | 87 err := d.AddTask(ctx, &Task{ |
| 75 Payload: &duration.Duration{Seconds: 12
3}, | 88 Payload: &duration.Duration{Seconds: 12
3}, |
| 76 DeduplicationKey: "abc", | 89 DeduplicationKey: "abc", |
| 90 NamePrefix: "prefix", |
| 77 Title: "abc-def", | 91 Title: "abc-def", |
| 78 Delay: 30 * time.Second, | 92 Delay: 30 * time.Second, |
| 79 }) | 93 }) |
| 80 So(err, ShouldBeNil) | 94 So(err, ShouldBeNil) |
| 81 | 95 |
| 82 // Added the task. | 96 // Added the task. |
| 83 expectedPath := "/internal/tasks/default/abc-def" | 97 expectedPath := "/internal/tasks/default/abc-def" |
| 84 » » » expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52
0a11dd5244e3399346ac0d3a7" | 98 » » » expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a
9bc3c520a11dd5244e3399346ac0d3a7" |
| 85 expectedBody := []byte(`{"type":"google.protobuf.Duratio
n","body":"123.000s"}`) | 99 expectedBody := []byte(`{"type":"google.protobuf.Duratio
n","body":"123.000s"}`) |
| 86 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() | 100 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
| 87 So(tasks, ShouldResemble, taskqueue.QueueData{ | 101 So(tasks, ShouldResemble, taskqueue.QueueData{ |
| 88 "default": map[string]*taskqueue.Task{ | 102 "default": map[string]*taskqueue.Task{ |
| 89 expectedName: { | 103 expectedName: { |
| 90 Path: expectedPath, | 104 Path: expectedPath, |
| 91 Payload: expectedBody, | 105 Payload: expectedBody, |
| 92 Name: expectedName, | 106 Name: expectedName, |
| 93 Method: "POST", | 107 Method: "POST", |
| 94 ETA: epoch.Add(30 * time.Sec
ond), | 108 ETA: epoch.Add(30 * time.Sec
ond), |
| 95 }, | 109 }, |
| 96 }, | 110 }, |
| 97 "another-q": {}, | 111 "another-q": {}, |
| 98 }) | 112 }) |
| 99 | 113 |
| 100 // Readd a task with same dedup key. Should be silently
ignored. | 114 // Readd a task with same dedup key. Should be silently
ignored. |
| 101 err = d.AddTask(ctx, &Task{ | 115 err = d.AddTask(ctx, &Task{ |
| 102 Payload: &duration.Duration{Seconds: 12
3}, | 116 Payload: &duration.Duration{Seconds: 12
3}, |
| 103 DeduplicationKey: "abc", | 117 DeduplicationKey: "abc", |
| 118 NamePrefix: "prefix", |
| 104 }) | 119 }) |
| 105 So(err, ShouldBeNil) | 120 So(err, ShouldBeNil) |
| 106 | 121 |
| 107 // No new tasks. | 122 // No new tasks. |
| 108 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() | 123 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() |
| 109 So(len(tasks["default"]), ShouldResemble, 1) | 124 So(len(tasks["default"]), ShouldResemble, 1) |
| 110 | 125 |
| 111 » » » // Execute the task. | 126 » » » Convey("Executed", func() { |
| 112 » » » req := httptest.NewRequest("POST", "http://example.com"+
expectedPath, bytes.NewReader(expectedBody)) | 127 » » » » // Execute the task. |
| 113 » » » rw := httptest.NewRecorder() | 128 » » » » So(runTasks(ctx), ShouldResemble, []int{200}) |
| 114 » » » r.ServeHTTP(rw, req) | 129 » » » » So(calls, ShouldResemble, []proto.Message{ |
| 130 » » » » » &duration.Duration{Seconds: 123}, |
| 131 » » » » }) |
| 132 » » » }) |
| 115 | 133 |
| 116 » » » // Executed. | 134 » » » Convey("Deleted", func() { |
| 117 » » » So(calls, ShouldResemble, []proto.Message{ | 135 » » » » So(d.DeleteTask(ctx, &Task{ |
| 118 » » » » &duration.Duration{Seconds: 123}, | 136 » » » » » Payload: &duration.Duration{Sec
onds: 123}, |
| 137 » » » » » DeduplicationKey: "abc", |
| 138 » » » » » NamePrefix: "prefix", |
| 139 » » » » }), ShouldBeNil) |
| 140 |
| 141 » » » » // Did not execute any tasks. |
| 142 » » » » So(runTasks(ctx), ShouldHaveLength, 0) |
| 143 » » » » So(calls, ShouldHaveLength, 0) |
| 119 }) | 144 }) |
| 120 » » » So(rw.Code, ShouldEqual, 200) | 145 » » }) |
| 146 |
| 147 » » Convey("Deleting unknown task returns nil", func() { |
| 148 » » » handler := func(c context.Context, payload proto.Message
, execCount int) error { return nil } |
| 149 » » » d.RegisterTask(&duration.Duration{}, handler, "default",
nil) |
| 150 |
| 151 » » » So(d.DeleteTask(ctx, &Task{ |
| 152 » » » » Payload: &duration.Duration{Seconds: 12
3}, |
| 153 » » » » DeduplicationKey: "something", |
| 154 » » » }), ShouldBeNil) |
| 121 }) | 155 }) |
| 122 | 156 |
| 123 Convey("Many tasks", func() { | 157 Convey("Many tasks", func() { |
| 124 handler := func(c context.Context, payload proto.Message
, execCount int) error { return nil } | 158 handler := func(c context.Context, payload proto.Message
, execCount int) error { return nil } |
| 125 d.RegisterTask(&duration.Duration{}, handler, "default",
nil) | 159 d.RegisterTask(&duration.Duration{}, handler, "default",
nil) |
| 126 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil
) | 160 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil
) |
| 127 installRoutes() | 161 installRoutes() |
| 128 | 162 |
| 129 t := []*Task{} | 163 t := []*Task{} |
| 130 for i := 0; i < 200; i++ { | 164 for i := 0; i < 200; i++ { |
| (...skipping 28 matching lines...) Expand all Loading... |
| 159 delaysDefault := map[time.Duration]struct{}{} | 193 delaysDefault := map[time.Duration]struct{}{} |
| 160 for _, task := range allTasks["default"] { | 194 for _, task := range allTasks["default"] { |
| 161 delaysDefault[task.ETA.Sub(epoch)/time.Second] =
struct{}{} | 195 delaysDefault[task.ETA.Sub(epoch)/time.Second] =
struct{}{} |
| 162 } | 196 } |
| 163 delaysAnotherQ := map[time.Duration]struct{}{} | 197 delaysAnotherQ := map[time.Duration]struct{}{} |
| 164 for _, task := range allTasks["another-q"] { | 198 for _, task := range allTasks["another-q"] { |
| 165 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second]
= struct{}{} | 199 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second]
= struct{}{} |
| 166 } | 200 } |
| 167 So(len(delaysDefault), ShouldEqual, 100) | 201 So(len(delaysDefault), ShouldEqual, 100) |
| 168 So(len(delaysAnotherQ), ShouldEqual, 100) | 202 So(len(delaysAnotherQ), ShouldEqual, 100) |
| 203 |
| 204 // Delete the tasks. |
| 205 So(d.DeleteTask(ctx, t...), ShouldBeNil) |
| 206 So(runTasks(ctx), ShouldHaveLength, 0) |
| 169 }) | 207 }) |
| 170 | 208 |
| 171 Convey("Execution errors", func() { | 209 Convey("Execution errors", func() { |
| 172 var returnErr error | 210 var returnErr error |
| 173 panicNow := false | 211 panicNow := false |
| 174 handler := func(c context.Context, payload proto.Message
, execCount int) error { | 212 handler := func(c context.Context, payload proto.Message
, execCount int) error { |
| 175 if panicNow { | 213 if panicNow { |
| 176 panic("must not be called") | 214 panic("must not be called") |
| 177 } | 215 } |
| 178 return returnErr | 216 return returnErr |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 225 rw = execute(`{"type":"google.protobuf.Duration","body":
"blah"}`) | 263 rw = execute(`{"type":"google.protobuf.Duration","body":
"blah"}`) |
| 226 So(rw.Code, ShouldEqual, 202) // no retry! | 264 So(rw.Code, ShouldEqual, 202) // no retry! |
| 227 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | 265 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 228 | 266 |
| 229 rw = execute(`{"type":"unknown.proto.type","body":"{}"}`
) | 267 rw = execute(`{"type":"unknown.proto.type","body":"{}"}`
) |
| 230 So(rw.Code, ShouldEqual, 202) // no retry! | 268 So(rw.Code, ShouldEqual, 202) // no retry! |
| 231 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | 269 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 232 }) | 270 }) |
| 233 }) | 271 }) |
| 234 } | 272 } |
| OLD | NEW |