| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package coordinatorTest |
| 16 |
| 17 import ( |
| 18 "bytes" |
| 19 "fmt" |
| 20 "io/ioutil" |
| 21 "net/http" |
| 22 "net/url" |
| 23 |
| 24 "github.com/luci/luci-go/common/clock" |
| 25 "github.com/luci/luci-go/common/logging" |
| 26 |
| 27 "github.com/luci/gae/service/info" |
| 28 tq "github.com/luci/gae/service/taskqueue" |
| 29 |
| 30 "golang.org/x/net/context" |
| 31 ) |
| 32 |
| 33 func drainTaskQueues(c context.Context, h http.Handler) { |
| 34 now := clock.Now(c) |
| 35 |
| 36 logging.Debugf(c, "Running task(s) for namespace %q...", info.GetNamespa
ce(c)) |
| 37 tst := tq.GetTestable(c) |
| 38 for queue, tasks := range tst.GetScheduledTasks() { |
| 39 for taskName, task := range tasks { |
| 40 if task.ETA.After(now) { |
| 41 continue |
| 42 } |
| 43 |
| 44 logging.Debugf(c, "Running queue %q, task %q...", queue,
taskName) |
| 45 fakeTaskQueueHTTP(c, h, task) |
| 46 if err := tq.Delete(c, queue, task); err != nil { |
| 47 panic(fmt.Errorf("could not delete task %q: %s",
taskName, err)) |
| 48 } |
| 49 } |
| 50 } |
| 51 } |
| 52 |
| 53 func fakeTaskQueueHTTP(c context.Context, h http.Handler, task *tq.Task) { |
| 54 var rw fakeResponseWriter |
| 55 req := http.Request{ |
| 56 Method: task.Method, |
| 57 URL: &url.URL{ |
| 58 Scheme: "fake", |
| 59 Host: "localhost", |
| 60 Path: task.Path, |
| 61 }, |
| 62 Body: ioutil.NopCloser(bytes.NewReader(task.Payload)), |
| 63 } |
| 64 |
| 65 h.ServeHTTP(&rw, &req) |
| 66 } |
| 67 |
| 68 type fakeResponseWriter struct { |
| 69 code int |
| 70 h http.Header |
| 71 buf bytes.Buffer |
| 72 } |
| 73 |
| 74 func (rw *fakeResponseWriter) Header() http.Header { |
| 75 if rw.h == nil { |
| 76 rw.h = http.Header{} |
| 77 } |
| 78 return rw.h |
| 79 } |
| 80 |
| 81 func (rw *fakeResponseWriter) Write(p []byte) (int, error) { return rw.buf.Write
(p) } |
| 82 func (rw *fakeResponseWriter) WriteHeader(code int) { rw.code = code } |
| OLD | NEW |