Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(25)

Side by Side Diff: impl/prod/taskqueue.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/prod/raw_datastore_type_converter.go ('k') | mathrand.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "reflect" 9 "reflect"
10 10
11 "github.com/luci/gae" 11 "github.com/luci/gae"
12 tq "github.com/luci/gae/service/taskqueue"
12 "golang.org/x/net/context" 13 "golang.org/x/net/context"
13 "google.golang.org/appengine/taskqueue" 14 "google.golang.org/appengine/taskqueue"
14 ) 15 )
15 16
16 // useTQ adds a gae.TaskQueue implementation to context, accessible 17 // useTQ adds a gae.TaskQueue implementation to context, accessible
17 // by gae.GetTQ(c) 18 // by gae.GetTQ(c)
18 func useTQ(c context.Context) context.Context { 19 func useTQ(c context.Context) context.Context {
19 » return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue { 20 » return tq.SetFactory(c, func(ci context.Context) tq.Interface {
20 return tqImpl{ci} 21 return tqImpl{ci}
21 }) 22 })
22 } 23 }
23 24
24 type tqImpl struct { 25 type tqImpl struct {
25 context.Context 26 context.Context
26 } 27 }
27 28
28 func init() { 29 func init() {
29 » const TASK_EXPECTED_FIELDS = 9 30 » const taskExpectedFields = 9
30 // Runtime-assert that the number of fields in the Task structs is 9, to 31 // Runtime-assert that the number of fields in the Task structs is 9, to
31 // avoid missing additional fields if they're added later. 32 // avoid missing additional fields if they're added later.
32 // all other type assertions are statically enforced by o2n() and tqF2R( ) 33 // all other type assertions are statically enforced by o2n() and tqF2R( )
33 34
34 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) 35 oldType := reflect.TypeOf((*taskqueue.Task)(nil))
35 » newType := reflect.TypeOf((*gae.TQTask)(nil)) 36 » newType := reflect.TypeOf((*tq.Task)(nil))
36 37
37 if oldType.NumField() != newType.NumField() || 38 if oldType.NumField() != newType.NumField() ||
38 » » oldType.NumField() != TASK_EXPECTED_FIELDS { 39 » » oldType.NumField() != taskExpectedFields {
39 panic(fmt.Errorf( 40 panic(fmt.Errorf(
40 "prod/taskqueue:init() field count differs: %v, %v", 41 "prod/taskqueue:init() field count differs: %v, %v",
41 oldType, newType)) 42 oldType, newType))
42 } 43 }
43 } 44 }
44 45
45 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a 46 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a
46 // *gae.TQTask, and passes through an error. 47 // *tq.Task, and passes through an error.
47 func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { 48 func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
48 if err != nil { 49 if err != nil {
49 return nil, err 50 return nil, err
50 } 51 }
51 » n := gae.TQTask{} 52 » n := tq.Task{}
52 n.Path = o.Path 53 n.Path = o.Path
53 n.Payload = o.Payload 54 n.Payload = o.Payload
54 n.Header = o.Header 55 n.Header = o.Header
55 n.Method = o.Method 56 n.Method = o.Method
56 n.Name = o.Name 57 n.Name = o.Name
57 n.Delay = o.Delay 58 n.Delay = o.Delay
58 n.ETA = o.ETA 59 n.ETA = o.ETA
59 n.RetryCount = o.RetryCount 60 n.RetryCount = o.RetryCount
60 » n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions) 61 » n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions)
61 return &n, nil 62 return &n, nil
62 } 63 }
63 64
64 // tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task. 65 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task.
65 func tqF2R(n *gae.TQTask) *taskqueue.Task { 66 func tqF2R(n *tq.Task) *taskqueue.Task {
66 o := taskqueue.Task{} 67 o := taskqueue.Task{}
67 o.Path = n.Path 68 o.Path = n.Path
68 o.Payload = n.Payload 69 o.Payload = n.Payload
69 o.Header = n.Header 70 o.Header = n.Header
70 o.Method = n.Method 71 o.Method = n.Method
71 o.Name = n.Name 72 o.Name = n.Name
72 o.Delay = n.Delay 73 o.Delay = n.Delay
73 o.ETA = n.ETA 74 o.ETA = n.ETA
74 o.RetryCount = n.RetryCount 75 o.RetryCount = n.RetryCount
75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) 76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions)
76 return &o 77 return &o
77 } 78 }
78 79
79 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of 80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
80 // *taskqueue.Task to a slice of *gae.TQTask 81 // *taskqueue.Task to a slice of *tq.Task
81 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) { 82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) {
82 if err != nil { 83 if err != nil {
83 return nil, gae.FixError(err) 84 return nil, gae.FixError(err)
84 } 85 }
85 » ret := make([]*gae.TQTask, len(os)) 86 » ret := make([]*tq.Task, len(os))
86 for i, t := range os { 87 for i, t := range os {
87 ret[i], _ = tqR2FErr(t, nil) 88 ret[i], _ = tqR2FErr(t, nil)
88 } 89 }
89 return ret, nil 90 return ret, nil
90 } 91 }
91 92
92 // tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task. 93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
93 func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { 94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
94 ret := make([]*taskqueue.Task, len(ns)) 95 ret := make([]*taskqueue.Task, len(ns))
95 for i, t := range ns { 96 for i, t := range ns {
96 ret[i] = tqF2R(t) 97 ret[i] = tqF2R(t)
97 } 98 }
98 return ret 99 return ret
99 } 100 }
100 101
101 //////// TQSingleReadWriter 102 //////// TQSingleReadWriter
102 103
103 func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { 104 func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
104 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) 105 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName))
105 } 106 }
106 func (t tqImpl) Delete(task *gae.TQTask, queueName string) error { 107 func (t tqImpl) Delete(task *tq.Task, queueName string) error {
107 return taskqueue.Delete(t.Context, tqF2R(task), queueName) 108 return taskqueue.Delete(t.Context, tqF2R(task), queueName)
108 } 109 }
109 110
110 //////// TQMultiReadWriter 111 //////// TQMultiReadWriter
111 112
112 func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) { 113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
113 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName) ) 114 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName) )
114 } 115 }
115 func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { 116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
116 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu eName)) 117 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu eName))
117 } 118 }
118 119
119 //////// TQLeaser 120 //////// TQLeaser
120 121
121 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQT ask, error) { 122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task , error) {
122 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi me)) 123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi me))
123 } 124 }
124 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st ring) ([]*gae.TQTask, error) { 125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st ring) ([]*tq.Task, error) {
125 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le aseTime, tag)) 126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le aseTime, tag))
126 } 127 }
127 func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) e rror { 128 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) erro r {
128 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim e) 129 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim e)
129 } 130 }
130 131
131 //////// TQPurger 132 //////// TQPurger
132 133
133 func (t tqImpl) Purge(queueName string) error { 134 func (t tqImpl) Purge(queueName string) error {
134 return taskqueue.Purge(t.Context, queueName) 135 return taskqueue.Purge(t.Context, queueName)
135 } 136 }
136 137
137 //////// TQStatter 138 //////// TQStatter
138 139
139 func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) { 140 func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) {
140 stats, err := taskqueue.QueueStats(t.Context, queueNames) 141 stats, err := taskqueue.QueueStats(t.Context, queueNames)
141 if err != nil { 142 if err != nil {
142 return nil, err 143 return nil, err
143 } 144 }
144 » ret := make([]gae.TQStatistics, len(stats)) 145 » ret := make([]tq.Statistics, len(stats))
145 for i, s := range stats { 146 for i, s := range stats {
146 » » ret[i] = gae.TQStatistics(s) 147 » » ret[i] = tq.Statistics(s)
147 } 148 }
148 return ret, nil 149 return ret, nil
149 } 150 }
OLDNEW
« no previous file with comments | « impl/prod/raw_datastore_type_converter.go ('k') | mathrand.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698