Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: impl/prod/taskqueue.go

Issue 2512093002: Add support for Pull Queues to prod implementation. (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package prod 5 package prod
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "reflect" 9 "reflect"
10 10
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 78
79 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. 79 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
80 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { 80 func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
81 ret := make([]*taskqueue.Task, len(ns)) 81 ret := make([]*taskqueue.Task, len(ns))
82 for i, t := range ns { 82 for i, t := range ns {
83 ret[i] = tqF2R(t) 83 ret[i] = tqF2R(t)
84 } 84 }
85 return ret 85 return ret
86 } 86 }
87 87
88 // tqMR2F (TQ multi-real-to-fake) converts []*taskqueue.Task to []*tq.Task.
89 func tqMR2F(ns []*taskqueue.Task) []*tq.Task {
90 ret := make([]*tq.Task, len(ns))
91 for i, t := range ns {
92 ret[i] = tqR2F(t)
93 }
94 return ret
95 }
96
88 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er ror { 97 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er ror {
89 realTasks, err := taskqueue.AddMulti(t.aeCtx, tqMF2R(tasks), queueName) 98 realTasks, err := taskqueue.AddMulti(t.aeCtx, tqMF2R(tasks), queueName)
90 if err != nil { 99 if err != nil {
91 if me, ok := err.(appengine.MultiError); ok { 100 if me, ok := err.(appengine.MultiError); ok {
92 for i, err := range me { 101 for i, err := range me {
93 tsk := (*taskqueue.Task)(nil) 102 tsk := (*taskqueue.Task)(nil)
94 if realTasks != nil { 103 if realTasks != nil {
95 tsk = realTasks[i] 104 tsk = realTasks[i]
96 } 105 }
97 cb(tqR2F(tsk), err) 106 cb(tqR2F(tsk), err)
(...skipping 12 matching lines...) Expand all
110 err := taskqueue.DeleteMulti(t.aeCtx, tqMF2R(tasks), queueName) 119 err := taskqueue.DeleteMulti(t.aeCtx, tqMF2R(tasks), queueName)
111 if me, ok := err.(appengine.MultiError); ok { 120 if me, ok := err.(appengine.MultiError); ok {
112 for _, err := range me { 121 for _, err := range me {
113 cb(err) 122 cb(err)
114 } 123 }
115 err = nil 124 err = nil
116 } 125 }
117 return err 126 return err
118 } 127 }
119 128
129 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task , error) {
130 tasks, err := taskqueue.Lease(t.aeCtx, maxTasks, queueName, leaseTime)
131 if err != nil {
132 return nil, err
133 }
134 return tqMR2F(tasks), nil
135 }
136
137 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st ring) ([]*tq.Task, error) {
138 tasks, err := taskqueue.LeaseByTag(t.aeCtx, maxTasks, queueName, leaseTi me, tag)
139 if err != nil {
140 return nil, err
141 }
142 return tqMR2F(tasks), nil
143 }
144
145 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) erro r {
146 return taskqueue.ModifyLease(t.aeCtx, tqF2R(task), queueName, leaseTime)
147 }
148
120 func (t tqImpl) Purge(queueName string) error { 149 func (t tqImpl) Purge(queueName string) error {
121 return taskqueue.Purge(t.aeCtx, queueName) 150 return taskqueue.Purge(t.aeCtx, queueName)
122 } 151 }
123 152
124 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { 153 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
125 stats, err := taskqueue.QueueStats(t.aeCtx, queueNames) 154 stats, err := taskqueue.QueueStats(t.aeCtx, queueNames)
126 if err != nil { 155 if err != nil {
127 return err 156 return err
128 } 157 }
129 for _, s := range stats { 158 for _, s := range stats {
130 cb((*tq.Statistics)(&s), nil) 159 cb((*tq.Statistics)(&s), nil)
131 } 160 }
132 return nil 161 return nil
133 } 162 }
134 163
135 func (t tqImpl) GetTestable() tq.Testable { 164 func (t tqImpl) GetTestable() tq.Testable {
136 return nil 165 return nil
137 } 166 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698