Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Side by Side Diff: impl/prod/taskqueue.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "reflect" 9 "reflect"
10 10
11 tq "github.com/luci/gae/service/taskqueue" 11 tq "github.com/luci/gae/service/taskqueue"
12 "github.com/luci/luci-go/common/errors"
13 "golang.org/x/net/context" 12 "golang.org/x/net/context"
13 "google.golang.org/appengine"
14 "google.golang.org/appengine/taskqueue" 14 "google.golang.org/appengine/taskqueue"
15 ) 15 )
16 16
17 // useTQ adds a gae.TaskQueue implementation to context, accessible 17 // useTQ adds a gae.TaskQueue implementation to context, accessible
18 // by gae.GetTQ(c) 18 // by gae.GetTQ(c)
19 func useTQ(c context.Context) context.Context { 19 func useTQ(c context.Context) context.Context {
20 » return tq.SetFactory(c, func(ci context.Context) tq.Interface { 20 » return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface {
21 return tqImpl{ci} 21 return tqImpl{ci}
22 }) 22 })
23 } 23 }
24 24
25 type tqImpl struct { 25 type tqImpl struct {
26 context.Context 26 context.Context
27 } 27 }
28 28
29 func init() { 29 func init() {
30 const taskExpectedFields = 9 30 const taskExpectedFields = 9
31 // Runtime-assert that the number of fields in the Task structs is 9, to 31 // Runtime-assert that the number of fields in the Task structs is 9, to
32 // avoid missing additional fields if they're added later. 32 // avoid missing additional fields if they're added later.
33 // all other type assertions are statically enforced by o2n() and tqF2R( ) 33 // all other type assertions are statically enforced by o2n() and tqF2R( )
34 34
35 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) 35 oldType := reflect.TypeOf((*taskqueue.Task)(nil))
36 newType := reflect.TypeOf((*tq.Task)(nil)) 36 newType := reflect.TypeOf((*tq.Task)(nil))
37 37
38 if oldType.NumField() != newType.NumField() || 38 if oldType.NumField() != newType.NumField() ||
39 oldType.NumField() != taskExpectedFields { 39 oldType.NumField() != taskExpectedFields {
40 panic(fmt.Errorf( 40 panic(fmt.Errorf(
41 "prod/taskqueue:init() field count differs: %v, %v", 41 "prod/taskqueue:init() field count differs: %v, %v",
42 oldType, newType)) 42 oldType, newType))
43 } 43 }
44 } 44 }
45 45
46 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a 46 // tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task.
47 // *tq.Task, and passes through an error. 47 func tqR2F(o *taskqueue.Task) *tq.Task {
48 func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { 48 » if o == nil {
49 » if err != nil { 49 » » return nil
50 » » return nil, err
51 } 50 }
52 n := tq.Task{} 51 n := tq.Task{}
53 n.Path = o.Path 52 n.Path = o.Path
54 n.Payload = o.Payload 53 n.Payload = o.Payload
55 n.Header = o.Header 54 n.Header = o.Header
56 n.Method = o.Method 55 n.Method = o.Method
57 n.Name = o.Name 56 n.Name = o.Name
58 n.Delay = o.Delay 57 n.Delay = o.Delay
59 n.ETA = o.ETA 58 n.ETA = o.ETA
60 n.RetryCount = o.RetryCount 59 n.RetryCount = o.RetryCount
61 n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) 60 n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions)
62 » return &n, nil 61 » return &n
63 } 62 }
64 63
65 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. 64 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task.
66 func tqF2R(n *tq.Task) *taskqueue.Task { 65 func tqF2R(n *tq.Task) *taskqueue.Task {
67 o := taskqueue.Task{} 66 o := taskqueue.Task{}
68 o.Path = n.Path 67 o.Path = n.Path
69 o.Payload = n.Payload 68 o.Payload = n.Payload
70 o.Header = n.Header 69 o.Header = n.Header
71 o.Method = n.Method 70 o.Method = n.Method
72 o.Name = n.Name 71 o.Name = n.Name
73 o.Delay = n.Delay 72 o.Delay = n.Delay
74 o.ETA = n.ETA 73 o.ETA = n.ETA
75 o.RetryCount = n.RetryCount 74 o.RetryCount = n.RetryCount
76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) 75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions)
77 return &o 76 return &o
78 } 77 }
79 78
80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
81 // *taskqueue.Task to a slice of *tq.Task
82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) {
83 if err != nil {
84 return nil, errors.Fix(err)
85 }
86 ret := make([]*tq.Task, len(os))
87 for i, t := range os {
88 ret[i], _ = tqR2FErr(t, nil)
89 }
90 return ret, nil
91 }
92
93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. 79 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { 80 func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
95 ret := make([]*taskqueue.Task, len(ns)) 81 ret := make([]*taskqueue.Task, len(ns))
96 for i, t := range ns { 82 for i, t := range ns {
97 ret[i] = tqF2R(t) 83 ret[i] = tqF2R(t)
98 } 84 }
99 return ret 85 return ret
100 } 86 }
101 87
102 //////// TQSingleReadWriter 88 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er ror {
103 89 » realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName )
104 func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { 90 » if err != nil {
105 » return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) 91 » » if me, ok := err.(appengine.MultiError); ok {
106 } 92 » » » for i, err := range me {
107 func (t tqImpl) Delete(task *tq.Task, queueName string) error { 93 » » » » tsk := (*taskqueue.Task)(nil)
108 » return taskqueue.Delete(t.Context, tqF2R(task), queueName) 94 » » » » if realTasks != nil {
95 » » » » » tsk = realTasks[i]
96 » » » » }
97 » » » » cb(tqR2F(tsk), err)
98 » » » }
99 » » » err = nil
100 » » }
101 » } else {
102 » » for _, tsk := range realTasks {
103 » » » cb(tqR2F(tsk), nil)
104 » » }
105 » }
106 » return err
109 } 107 }
110 108
111 //////// TQMultiReadWriter 109 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) err or {
112 110 » err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)
113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { 111 » if me, ok := err.(appengine.MultiError); ok {
114 » return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName) ) 112 » » for _, err := range me {
113 » » » cb(err)
114 » » }
115 » » err = nil
116 » }
117 » return err
115 } 118 }
116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
117 return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueN ame))
118 }
119
120 //////// TQLeaser
121
122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task , error) {
123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi me))
124 }
125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st ring) ([]*tq.Task, error) {
126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le aseTime, tag))
127 }
128 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) erro r {
129 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim e)
130 }
131
132 //////// TQPurger
133 119
134 func (t tqImpl) Purge(queueName string) error { 120 func (t tqImpl) Purge(queueName string) error {
135 return taskqueue.Purge(t.Context, queueName) 121 return taskqueue.Purge(t.Context, queueName)
136 } 122 }
137 123
138 //////// TQStatter 124 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
139
140 func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) {
141 stats, err := taskqueue.QueueStats(t.Context, queueNames) 125 stats, err := taskqueue.QueueStats(t.Context, queueNames)
142 if err != nil { 126 if err != nil {
143 » » return nil, err 127 » » return err
144 } 128 }
145 » ret := make([]tq.Statistics, len(stats)) 129 » for _, s := range stats {
146 » for i, s := range stats { 130 » » cb((*tq.Statistics)(&s), nil)
147 » » ret[i] = tq.Statistics(s)
148 } 131 }
149 » return ret, nil 132 » return nil
150 } 133 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698