Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: impl/prod/taskqueue.go

Issue 2512093002: Add support for Pull Queues to prod implementation. (Closed)
Patch Set: use time.Duration for leaseTime Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/taskqueue.go ('k') | service/taskqueue/interface.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "reflect" 9 "reflect"
10 "time"
10 11
11 tq "github.com/luci/gae/service/taskqueue" 12 tq "github.com/luci/gae/service/taskqueue"
12 "golang.org/x/net/context" 13 "golang.org/x/net/context"
13 "google.golang.org/appengine" 14 "google.golang.org/appengine"
14 "google.golang.org/appengine/taskqueue" 15 "google.golang.org/appengine/taskqueue"
15 ) 16 )
16 17
17 // useTQ adds a gae.TaskQueue implementation to context, accessible 18 // useTQ adds a gae.TaskQueue implementation to context, accessible
18 // by gae.GetTQ(c) 19 // by gae.GetTQ(c)
19 func useTQ(c context.Context) context.Context { 20 func useTQ(c context.Context) context.Context {
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 79
79 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. 80 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
80 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { 81 func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
81 ret := make([]*taskqueue.Task, len(ns)) 82 ret := make([]*taskqueue.Task, len(ns))
82 for i, t := range ns { 83 for i, t := range ns {
83 ret[i] = tqF2R(t) 84 ret[i] = tqF2R(t)
84 } 85 }
85 return ret 86 return ret
86 } 87 }
87 88
89 // tqMR2F (TQ multi-real-to-fake) converts []*taskqueue.Task to []*tq.Task.
90 func tqMR2F(ns []*taskqueue.Task) []*tq.Task {
91 ret := make([]*tq.Task, len(ns))
92 for i, t := range ns {
93 ret[i] = tqR2F(t)
94 }
95 return ret
96 }
97
88 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er ror { 98 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er ror {
89 realTasks, err := taskqueue.AddMulti(t.aeCtx, tqMF2R(tasks), queueName) 99 realTasks, err := taskqueue.AddMulti(t.aeCtx, tqMF2R(tasks), queueName)
90 if err != nil { 100 if err != nil {
91 if me, ok := err.(appengine.MultiError); ok { 101 if me, ok := err.(appengine.MultiError); ok {
92 for i, err := range me { 102 for i, err := range me {
93 tsk := (*taskqueue.Task)(nil) 103 tsk := (*taskqueue.Task)(nil)
94 if realTasks != nil { 104 if realTasks != nil {
95 tsk = realTasks[i] 105 tsk = realTasks[i]
96 } 106 }
97 cb(tqR2F(tsk), err) 107 cb(tqR2F(tsk), err)
(...skipping 12 matching lines...) Expand all
110 err := taskqueue.DeleteMulti(t.aeCtx, tqMF2R(tasks), queueName) 120 err := taskqueue.DeleteMulti(t.aeCtx, tqMF2R(tasks), queueName)
111 if me, ok := err.(appengine.MultiError); ok { 121 if me, ok := err.(appengine.MultiError); ok {
112 for _, err := range me { 122 for _, err := range me {
113 cb(err) 123 cb(err)
114 } 124 }
115 err = nil 125 err = nil
116 } 126 }
117 return err 127 return err
118 } 128 }
119 129
130 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime time.Duration) ( []*tq.Task, error) {
131 tasks, err := taskqueue.Lease(t.aeCtx, maxTasks, queueName, int(leaseTim e/time.Second))
132 if err != nil {
133 return nil, err
134 }
135 return tqMR2F(tasks), nil
136 }
137
138 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime time.Durati on, tag string) ([]*tq.Task, error) {
139 tasks, err := taskqueue.LeaseByTag(t.aeCtx, maxTasks, queueName, int(lea seTime/time.Second), tag)
140 if err != nil {
141 return nil, err
142 }
143 return tqMR2F(tasks), nil
144 }
145
146 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime time.Dura tion) error {
147 return taskqueue.ModifyLease(t.aeCtx, tqF2R(task), queueName, int(leaseT ime/time.Second))
148 }
149
120 func (t tqImpl) Purge(queueName string) error { 150 func (t tqImpl) Purge(queueName string) error {
121 return taskqueue.Purge(t.aeCtx, queueName) 151 return taskqueue.Purge(t.aeCtx, queueName)
122 } 152 }
123 153
124 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { 154 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
125 stats, err := taskqueue.QueueStats(t.aeCtx, queueNames) 155 stats, err := taskqueue.QueueStats(t.aeCtx, queueNames)
126 if err != nil { 156 if err != nil {
127 return err 157 return err
128 } 158 }
129 for _, s := range stats { 159 for _, s := range stats {
130 cb((*tq.Statistics)(&s), nil) 160 cb((*tq.Statistics)(&s), nil)
131 } 161 }
132 return nil 162 return nil
133 } 163 }
134 164
135 func (t tqImpl) GetTestable() tq.Testable { 165 func (t tqImpl) GetTestable() tq.Testable {
136 return nil 166 return nil
137 } 167 }
OLDNEW
« no previous file with comments | « impl/memory/taskqueue.go ('k') | service/taskqueue/interface.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698