Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: go/src/infra/gae/libs/gae/prod/taskqueue.go

Issue 1240573002: Reland: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: expand coverage range to fit 32bit test expectations Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package prod
6
7 import (
8 "fmt"
9 "golang.org/x/net/context"
10 "reflect"
11
12 "infra/gae/libs/gae"
13
14 "google.golang.org/appengine/taskqueue"
15 )
16
17 // useTQ adds a gae.TaskQueue implementation to context, accessible
18 // by gae.GetTQ(c)
19 func useTQ(c context.Context) context.Context {
20 return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue {
21 return tqImpl{ci}
22 })
23 }
24
25 type tqImpl struct {
26 context.Context
27 }
28
29 func init() {
30 const TASK_EXPECTED_FIELDS = 9
31 // Runtime-assert that the number of fields in the Task structs is 9, to
32 // avoid missing additional fields if they're added later.
33 // all other type assertions are statically enforced by o2n() and tqF2R( )
34
35 oldType := reflect.TypeOf((*taskqueue.Task)(nil))
36 newType := reflect.TypeOf((*gae.TQTask)(nil))
37
38 if oldType.NumField() != newType.NumField() ||
39 oldType.NumField() != TASK_EXPECTED_FIELDS {
40 panic(fmt.Errorf(
41 "prod/taskqueue:init() field count differs: %v, %v",
42 oldType, newType))
43 }
44 }
45
46 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a
47 // *gae.TQTask, and passes through an error.
48 func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) {
49 if err != nil {
50 return nil, err
51 }
52 n := gae.TQTask{}
53 n.Path = o.Path
54 n.Payload = o.Payload
55 n.Header = o.Header
56 n.Method = o.Method
57 n.Name = o.Name
58 n.Delay = o.Delay
59 n.ETA = o.ETA
60 n.RetryCount = o.RetryCount
61 n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions)
62 return &n, nil
63 }
64
65 // tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task.
66 func tqF2R(n *gae.TQTask) *taskqueue.Task {
67 o := taskqueue.Task{}
68 o.Path = n.Path
69 o.Payload = n.Payload
70 o.Header = n.Header
71 o.Method = n.Method
72 o.Name = n.Name
73 o.Delay = n.Delay
74 o.ETA = n.ETA
75 o.RetryCount = n.RetryCount
76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions)
77 return &o
78 }
79
80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
81 // *taskqueue.Task to a slice of *gae.TQTask
82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) {
83 if err != nil {
84 return nil, gae.FixError(err)
85 }
86 ret := make([]*gae.TQTask, len(os))
87 for i, t := range os {
88 ret[i], _ = tqR2FErr(t, nil)
89 }
90 return ret, nil
91 }
92
93 // tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task.
94 func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task {
95 ret := make([]*taskqueue.Task, len(ns))
96 for i, t := range ns {
97 ret[i] = tqF2R(t)
98 }
99 return ret
100 }
101
102 //////// TQSingleReadWriter
103
104 func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
105 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName))
106 }
107 func (t tqImpl) Delete(task *gae.TQTask, queueName string) error {
108 return taskqueue.Delete(t.Context, tqF2R(task), queueName)
109 }
110
111 //////// TQMultiReadWriter
112
113 func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) {
114 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName) )
115 }
116 func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
117 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu eName))
118 }
119
120 //////// TQLeaser
121
122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQT ask, error) {
123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi me))
124 }
125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st ring) ([]*gae.TQTask, error) {
126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le aseTime, tag))
127 }
128 func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) e rror {
129 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim e)
130 }
131
132 //////// TQPurger
133
134 func (t tqImpl) Purge(queueName string) error {
135 return taskqueue.Purge(t.Context, queueName)
136 }
137
138 //////// TQStatter
139
140 func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) {
141 stats, err := taskqueue.QueueStats(t.Context, queueNames)
142 if err != nil {
143 return nil, err
144 }
145 ret := make([]gae.TQStatistics, len(stats))
146 for i, s := range stats {
147 ret[i] = gae.TQStatistics(s)
148 }
149 return ret, nil
150 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/gae/prod/raw_datastore_type_converter.go ('k') | go/src/infra/gae/libs/gae/properties.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698