| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package prod | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "reflect" | |
| 10 | |
| 11 "github.com/luci/gae" | |
| 12 "golang.org/x/net/context" | |
| 13 "google.golang.org/appengine/taskqueue" | |
| 14 ) | |
| 15 | |
| 16 // useTQ adds a gae.TaskQueue implementation to context, accessible | |
| 17 // by gae.GetTQ(c) | |
| 18 func useTQ(c context.Context) context.Context { | |
| 19 return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue { | |
| 20 return tqImpl{ci} | |
| 21 }) | |
| 22 } | |
| 23 | |
| 24 type tqImpl struct { | |
| 25 context.Context | |
| 26 } | |
| 27 | |
| 28 func init() { | |
| 29 const TASK_EXPECTED_FIELDS = 9 | |
| 30 // Runtime-assert that the number of fields in the Task structs is 9, to | |
| 31 // avoid missing additional fields if they're added later. | |
| 32 // all other type assertions are statically enforced by o2n() and tqF2R(
) | |
| 33 | |
| 34 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) | |
| 35 newType := reflect.TypeOf((*gae.TQTask)(nil)) | |
| 36 | |
| 37 if oldType.NumField() != newType.NumField() || | |
| 38 oldType.NumField() != TASK_EXPECTED_FIELDS { | |
| 39 panic(fmt.Errorf( | |
| 40 "prod/taskqueue:init() field count differs: %v, %v", | |
| 41 oldType, newType)) | |
| 42 } | |
| 43 } | |
| 44 | |
| 45 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a | |
| 46 // *gae.TQTask, and passes through an error. | |
| 47 func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { | |
| 48 if err != nil { | |
| 49 return nil, err | |
| 50 } | |
| 51 n := gae.TQTask{} | |
| 52 n.Path = o.Path | |
| 53 n.Payload = o.Payload | |
| 54 n.Header = o.Header | |
| 55 n.Method = o.Method | |
| 56 n.Name = o.Name | |
| 57 n.Delay = o.Delay | |
| 58 n.ETA = o.ETA | |
| 59 n.RetryCount = o.RetryCount | |
| 60 n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions) | |
| 61 return &n, nil | |
| 62 } | |
| 63 | |
| 64 // tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task. | |
| 65 func tqF2R(n *gae.TQTask) *taskqueue.Task { | |
| 66 o := taskqueue.Task{} | |
| 67 o.Path = n.Path | |
| 68 o.Payload = n.Payload | |
| 69 o.Header = n.Header | |
| 70 o.Method = n.Method | |
| 71 o.Name = n.Name | |
| 72 o.Delay = n.Delay | |
| 73 o.ETA = n.ETA | |
| 74 o.RetryCount = n.RetryCount | |
| 75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) | |
| 76 return &o | |
| 77 } | |
| 78 | |
| 79 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of | |
| 80 // *taskqueue.Task to a slice of *gae.TQTask | |
| 81 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) { | |
| 82 if err != nil { | |
| 83 return nil, gae.FixError(err) | |
| 84 } | |
| 85 ret := make([]*gae.TQTask, len(os)) | |
| 86 for i, t := range os { | |
| 87 ret[i], _ = tqR2FErr(t, nil) | |
| 88 } | |
| 89 return ret, nil | |
| 90 } | |
| 91 | |
| 92 // tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task. | |
| 93 func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { | |
| 94 ret := make([]*taskqueue.Task, len(ns)) | |
| 95 for i, t := range ns { | |
| 96 ret[i] = tqF2R(t) | |
| 97 } | |
| 98 return ret | |
| 99 } | |
| 100 | |
| 101 //////// TQSingleReadWriter | |
| 102 | |
| 103 func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { | |
| 104 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) | |
| 105 } | |
| 106 func (t tqImpl) Delete(task *gae.TQTask, queueName string) error { | |
| 107 return taskqueue.Delete(t.Context, tqF2R(task), queueName) | |
| 108 } | |
| 109 | |
| 110 //////// TQMultiReadWriter | |
| 111 | |
| 112 func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask,
error) { | |
| 113 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) | |
| 114 } | |
| 115 func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { | |
| 116 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu
eName)) | |
| 117 } | |
| 118 | |
| 119 //////// TQLeaser | |
| 120 | |
| 121 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQT
ask, error) { | |
| 122 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) | |
| 123 } | |
| 124 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*gae.TQTask, error) { | |
| 125 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) | |
| 126 } | |
| 127 func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) e
rror { | |
| 128 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim
e) | |
| 129 } | |
| 130 | |
| 131 //////// TQPurger | |
| 132 | |
| 133 func (t tqImpl) Purge(queueName string) error { | |
| 134 return taskqueue.Purge(t.Context, queueName) | |
| 135 } | |
| 136 | |
| 137 //////// TQStatter | |
| 138 | |
| 139 func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) { | |
| 140 stats, err := taskqueue.QueueStats(t.Context, queueNames) | |
| 141 if err != nil { | |
| 142 return nil, err | |
| 143 } | |
| 144 ret := make([]gae.TQStatistics, len(stats)) | |
| 145 for i, s := range stats { | |
| 146 ret[i] = gae.TQStatistics(s) | |
| 147 } | |
| 148 return ret, nil | |
| 149 } | |
| OLD | NEW |