Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: logdog/appengine/coordinator/coordinatorTest/taskqueue.go

Issue 2989333002: [logdog] Replace Tumble with push queues. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package coordinatorTest
16
17 import (
18 "bytes"
19 "fmt"
20 "io/ioutil"
21 "net/http"
22 "net/url"
23
24 "github.com/luci/luci-go/common/clock"
25 "github.com/luci/luci-go/common/logging"
26
27 "github.com/luci/gae/service/info"
28 tq "github.com/luci/gae/service/taskqueue"
29
30 "golang.org/x/net/context"
31 )
32
33 func drainTaskQueues(c context.Context, h http.Handler) {
34 now := clock.Now(c)
35
36 logging.Debugf(c, "Running task(s) for namespace %q...", info.GetNamespa ce(c))
37 tst := tq.GetTestable(c)
38 for queue, tasks := range tst.GetScheduledTasks() {
39 for taskName, task := range tasks {
40 if task.ETA.After(now) {
41 continue
42 }
43
44 logging.Debugf(c, "Running queue %q, task %q...", queue, taskName)
45 fakeTaskQueueHTTP(c, h, task)
46 if err := tq.Delete(c, queue, task); err != nil {
47 panic(fmt.Errorf("could not delete task %q: %s", taskName, err))
48 }
49 }
50 }
51 }
52
53 func fakeTaskQueueHTTP(c context.Context, h http.Handler, task *tq.Task) {
54 var rw fakeResponseWriter
55 req := http.Request{
56 Method: task.Method,
57 URL: &url.URL{
58 Scheme: "fake",
59 Host: "localhost",
60 Path: task.Path,
61 },
62 Body: ioutil.NopCloser(bytes.NewReader(task.Payload)),
63 }
64
65 h.ServeHTTP(&rw, &req)
66 }
67
68 type fakeResponseWriter struct {
69 code int
70 h http.Header
71 buf bytes.Buffer
72 }
73
74 func (rw *fakeResponseWriter) Header() http.Header {
75 if rw.h == nil {
76 rw.h = http.Header{}
77 }
78 return rw.h
79 }
80
81 func (rw *fakeResponseWriter) Write(p []byte) (int, error) { return rw.buf.Write (p) }
82 func (rw *fakeResponseWriter) WriteHeader(code int) { rw.code = code }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698