Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(893)

Side by Side Diff: appengine/tq/tq_test.go

Issue 2986373002: [tq] Enable task deletion. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tq/tq.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 50
51 d := Dispatcher{} 51 d := Dispatcher{}
52 r := router.New() 52 r := router.New()
53 53
54 installRoutes := func() { 54 installRoutes := func() {
55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) { 55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) {
56 c.Context = ctx 56 c.Context = ctx
57 next(c) 57 next(c)
58 })) 58 }))
59 } 59 }
60 runTasks := func(ctx context.Context) []int {
61 var codes []int
62 for _, tasks := range taskqueue.GetTestable(ctx).GetSche duledTasks() {
63 for _, task := range tasks {
64 // Execute the task.
65 req := httptest.NewRequest("POST", "http ://example.com"+task.Path, bytes.NewReader(task.Payload))
66 rw := httptest.NewRecorder()
67 r.ServeHTTP(rw, req)
68 codes = append(codes, rw.Code)
69 }
70 }
71 return codes
72 }
60 73
61 Convey("Single task", func() { 74 Convey("Single task", func() {
62 var calls []proto.Message 75 var calls []proto.Message
63 handler := func(c context.Context, payload proto.Message , execCount int) error { 76 handler := func(c context.Context, payload proto.Message , execCount int) error {
64 calls = append(calls, payload) 77 calls = append(calls, payload)
65 return nil 78 return nil
66 } 79 }
67 80
68 // Abuse some well-known proto type to simplify the test . It's doesn't 81 // Abuse some well-known proto type to simplify the test . It's doesn't
69 // matter what proto type we use here as long as it is r egistered in 82 // matter what proto type we use here as long as it is r egistered in
70 // protobuf type registry. 83 // protobuf type registry.
71 d.RegisterTask(&duration.Duration{}, handler, "", nil) 84 d.RegisterTask(&duration.Duration{}, handler, "", nil)
72 installRoutes() 85 installRoutes()
73 86
74 err := d.AddTask(ctx, &Task{ 87 err := d.AddTask(ctx, &Task{
75 Payload: &duration.Duration{Seconds: 12 3}, 88 Payload: &duration.Duration{Seconds: 12 3},
76 DeduplicationKey: "abc", 89 DeduplicationKey: "abc",
90 NamePrefix: "prefix",
77 Title: "abc-def", 91 Title: "abc-def",
78 Delay: 30 * time.Second, 92 Delay: 30 * time.Second,
79 }) 93 })
80 So(err, ShouldBeNil) 94 So(err, ShouldBeNil)
81 95
82 // Added the task. 96 // Added the task.
83 expectedPath := "/internal/tasks/default/abc-def" 97 expectedPath := "/internal/tasks/default/abc-def"
84 » » » expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52 0a11dd5244e3399346ac0d3a7" 98 » » » expectedName := "prefix-afc6f8271b8598ee04e359916e6c584a 9bc3c520a11dd5244e3399346ac0d3a7"
85 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`) 99 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`)
86 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() 100 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
87 So(tasks, ShouldResemble, taskqueue.QueueData{ 101 So(tasks, ShouldResemble, taskqueue.QueueData{
88 "default": map[string]*taskqueue.Task{ 102 "default": map[string]*taskqueue.Task{
89 expectedName: { 103 expectedName: {
90 Path: expectedPath, 104 Path: expectedPath,
91 Payload: expectedBody, 105 Payload: expectedBody,
92 Name: expectedName, 106 Name: expectedName,
93 Method: "POST", 107 Method: "POST",
94 ETA: epoch.Add(30 * time.Sec ond), 108 ETA: epoch.Add(30 * time.Sec ond),
95 }, 109 },
96 }, 110 },
97 "another-q": {}, 111 "another-q": {},
98 }) 112 })
99 113
100 // Readd a task with same dedup key. Should be silently ignored. 114 // Readd a task with same dedup key. Should be silently ignored.
101 err = d.AddTask(ctx, &Task{ 115 err = d.AddTask(ctx, &Task{
102 Payload: &duration.Duration{Seconds: 12 3}, 116 Payload: &duration.Duration{Seconds: 12 3},
103 DeduplicationKey: "abc", 117 DeduplicationKey: "abc",
118 NamePrefix: "prefix",
104 }) 119 })
105 So(err, ShouldBeNil) 120 So(err, ShouldBeNil)
106 121
107 // No new tasks. 122 // No new tasks.
108 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() 123 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
109 So(len(tasks["default"]), ShouldResemble, 1) 124 So(len(tasks["default"]), ShouldResemble, 1)
110 125
111 » » » // Execute the task. 126 » » » Convey("Executed", func() {
112 » » » req := httptest.NewRequest("POST", "http://example.com"+ expectedPath, bytes.NewReader(expectedBody)) 127 » » » » // Execute the task.
113 » » » rw := httptest.NewRecorder() 128 » » » » So(runTasks(ctx), ShouldResemble, []int{200})
114 » » » r.ServeHTTP(rw, req) 129 » » » » So(calls, ShouldResemble, []proto.Message{
130 » » » » » &duration.Duration{Seconds: 123},
131 » » » » })
132 » » » })
115 133
116 » » » // Executed. 134 » » » Convey("Deleted", func() {
117 » » » So(calls, ShouldResemble, []proto.Message{ 135 » » » » So(d.DeleteTask(ctx, &Task{
118 » » » » &duration.Duration{Seconds: 123}, 136 » » » » » Payload: &duration.Duration{Sec onds: 123},
137 » » » » » DeduplicationKey: "abc",
138 » » » » » NamePrefix: "prefix",
139 » » » » }), ShouldBeNil)
140
141 » » » » // Did not execute any tasks.
142 » » » » So(runTasks(ctx), ShouldHaveLength, 0)
143 » » » » So(calls, ShouldHaveLength, 0)
119 }) 144 })
120 » » » So(rw.Code, ShouldEqual, 200) 145 » » })
146
147 » » Convey("Deleting unknown task returns nil", func() {
148 » » » handler := func(c context.Context, payload proto.Message , execCount int) error { return nil }
149 » » » d.RegisterTask(&duration.Duration{}, handler, "default", nil)
150
151 » » » So(d.DeleteTask(ctx, &Task{
152 » » » » Payload: &duration.Duration{Seconds: 12 3},
153 » » » » DeduplicationKey: "something",
154 » » » }), ShouldBeNil)
121 }) 155 })
122 156
123 Convey("Many tasks", func() { 157 Convey("Many tasks", func() {
124 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil } 158 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil }
125 d.RegisterTask(&duration.Duration{}, handler, "default", nil) 159 d.RegisterTask(&duration.Duration{}, handler, "default", nil)
126 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil ) 160 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil )
127 installRoutes() 161 installRoutes()
128 162
129 t := []*Task{} 163 t := []*Task{}
130 for i := 0; i < 200; i++ { 164 for i := 0; i < 200; i++ {
(...skipping 28 matching lines...) Expand all
159 delaysDefault := map[time.Duration]struct{}{} 193 delaysDefault := map[time.Duration]struct{}{}
160 for _, task := range allTasks["default"] { 194 for _, task := range allTasks["default"] {
161 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{} 195 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
162 } 196 }
163 delaysAnotherQ := map[time.Duration]struct{}{} 197 delaysAnotherQ := map[time.Duration]struct{}{}
164 for _, task := range allTasks["another-q"] { 198 for _, task := range allTasks["another-q"] {
165 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{} 199 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
166 } 200 }
167 So(len(delaysDefault), ShouldEqual, 100) 201 So(len(delaysDefault), ShouldEqual, 100)
168 So(len(delaysAnotherQ), ShouldEqual, 100) 202 So(len(delaysAnotherQ), ShouldEqual, 100)
203
204 // Delete the tasks.
205 So(d.DeleteTask(ctx, t...), ShouldBeNil)
206 So(runTasks(ctx), ShouldHaveLength, 0)
169 }) 207 })
170 208
171 Convey("Execution errors", func() { 209 Convey("Execution errors", func() {
172 var returnErr error 210 var returnErr error
173 panicNow := false 211 panicNow := false
174 handler := func(c context.Context, payload proto.Message , execCount int) error { 212 handler := func(c context.Context, payload proto.Message , execCount int) error {
175 if panicNow { 213 if panicNow {
176 panic("must not be called") 214 panic("must not be called")
177 } 215 }
178 return returnErr 216 return returnErr
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
225 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`) 263 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`)
226 So(rw.Code, ShouldEqual, 202) // no retry! 264 So(rw.Code, ShouldEqual, 202) // no retry!
227 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") 265 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
228 266
229 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` ) 267 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` )
230 So(rw.Code, ShouldEqual, 202) // no retry! 268 So(rw.Code, ShouldEqual, 202) // no retry!
231 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") 269 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
232 }) 270 })
233 }) 271 })
234 } 272 }
OLDNEW
« no previous file with comments | « appengine/tq/tq.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698