OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prod | 5 package prod |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "reflect" | 9 "reflect" |
10 | 10 |
11 "github.com/luci/gae" | |
12 tq "github.com/luci/gae/service/taskqueue" | 11 tq "github.com/luci/gae/service/taskqueue" |
| 12 "github.com/luci/luci-go/common/errors" |
13 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
14 "google.golang.org/appengine/taskqueue" | 14 "google.golang.org/appengine/taskqueue" |
15 ) | 15 ) |
16 | 16 |
17 // useTQ adds a gae.TaskQueue implementation to context, accessible | 17 // useTQ adds a gae.TaskQueue implementation to context, accessible |
18 // by gae.GetTQ(c) | 18 // by gae.GetTQ(c) |
19 func useTQ(c context.Context) context.Context { | 19 func useTQ(c context.Context) context.Context { |
20 return tq.SetFactory(c, func(ci context.Context) tq.Interface { | 20 return tq.SetFactory(c, func(ci context.Context) tq.Interface { |
21 return tqImpl{ci} | 21 return tqImpl{ci} |
22 }) | 22 }) |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
74 o.ETA = n.ETA | 74 o.ETA = n.ETA |
75 o.RetryCount = n.RetryCount | 75 o.RetryCount = n.RetryCount |
76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) | 76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) |
77 return &o | 77 return &o |
78 } | 78 } |
79 | 79 |
80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of | 80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of |
81 // *taskqueue.Task to a slice of *tq.Task | 81 // *taskqueue.Task to a slice of *tq.Task |
82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { | 82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { |
83 if err != nil { | 83 if err != nil { |
84 » » return nil, gae.FixError(err) | 84 » » return nil, errors.Fix(err) |
85 } | 85 } |
86 ret := make([]*tq.Task, len(os)) | 86 ret := make([]*tq.Task, len(os)) |
87 for i, t := range os { | 87 for i, t := range os { |
88 ret[i], _ = tqR2FErr(t, nil) | 88 ret[i], _ = tqR2FErr(t, nil) |
89 } | 89 } |
90 return ret, nil | 90 return ret, nil |
91 } | 91 } |
92 | 92 |
93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. | 93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. |
94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { | 94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
(...skipping 12 matching lines...) Expand all Loading... |
107 func (t tqImpl) Delete(task *tq.Task, queueName string) error { | 107 func (t tqImpl) Delete(task *tq.Task, queueName string) error { |
108 return taskqueue.Delete(t.Context, tqF2R(task), queueName) | 108 return taskqueue.Delete(t.Context, tqF2R(task), queueName) |
109 } | 109 } |
110 | 110 |
111 //////// TQMultiReadWriter | 111 //////// TQMultiReadWriter |
112 | 112 |
113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error)
{ | 113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error)
{ |
114 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) | 114 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) |
115 } | 115 } |
116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { | 116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
117 » return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu
eName)) | 117 » return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueN
ame)) |
118 } | 118 } |
119 | 119 |
120 //////// TQLeaser | 120 //////// TQLeaser |
121 | 121 |
122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task
, error) { | 122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task
, error) { |
123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) | 123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) |
124 } | 124 } |
125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*tq.Task, error) { | 125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*tq.Task, error) { |
126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) | 126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) |
127 } | 127 } |
(...skipping 13 matching lines...) Expand all Loading... |
141 stats, err := taskqueue.QueueStats(t.Context, queueNames) | 141 stats, err := taskqueue.QueueStats(t.Context, queueNames) |
142 if err != nil { | 142 if err != nil { |
143 return nil, err | 143 return nil, err |
144 } | 144 } |
145 ret := make([]tq.Statistics, len(stats)) | 145 ret := make([]tq.Statistics, len(stats)) |
146 for i, s := range stats { | 146 for i, s := range stats { |
147 ret[i] = tq.Statistics(s) | 147 ret[i] = tq.Statistics(s) |
148 } | 148 } |
149 return ret, nil | 149 return ret, nil |
150 } | 150 } |
OLD | NEW |