Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1364)

Unified Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1222903002: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: more fixes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
deleted file mode 100644
index 2d23b1d803ee3959a480d823cbd86364a53be40a..0000000000000000000000000000000000000000
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
+++ /dev/null
@@ -1,265 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package memory
-
-import (
- "errors"
- "fmt"
- "infra/gae/libs/wrapper"
- "net/http"
- "sync"
- "sync/atomic"
-
- "appengine/datastore"
- "appengine/taskqueue"
- pb "appengine_internal/taskqueue"
- "golang.org/x/net/context"
- "infra/libs/clock"
-)
-
-var (
- currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
- defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
-)
-
-//////////////////////////////// taskQueueData /////////////////////////////////
-
-type taskQueueData struct {
- sync.Mutex
- wrapper.BrokenFeatures
-
- named wrapper.QueueData
- archived wrapper.QueueData
-}
-
-var (
- _ = memContextObj((*taskQueueData)(nil))
- _ = wrapper.TQTestable((*taskQueueData)(nil))
-)
-
-func newTaskQueueData() memContextObj {
- return &taskQueueData{
- BrokenFeatures: wrapper.BrokenFeatures{
- DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)},
- named: wrapper.QueueData{"default": {}},
- archived: wrapper.QueueData{"default": {}},
- }
-}
-
-func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
-func (t *taskQueueData) endTxn() {}
-func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
- txn := obj.(*txnTaskQueueData)
- for qn, tasks := range txn.anony {
- for _, tsk := range tasks {
- tsk.Name = mkName(c, tsk.Name, t.named[qn])
- t.named[qn][tsk.Name] = tsk
- }
- }
- txn.anony = nil
-}
-func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
- return &txnTaskQueueData{
- BrokenFeatures: &t.BrokenFeatures,
- parent: t,
- anony: wrapper.AnonymousQueueData{},
- }, nil
-}
-
-func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
- return nil
-}
-
-func (t *taskQueueData) CreateQueue(queueName string) {
- t.Lock()
- defer t.Unlock()
-
- if _, ok := t.named[queueName]; ok {
- panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
- }
- t.named[queueName] = map[string]*taskqueue.Task{}
- t.archived[queueName] = map[string]*taskqueue.Task{}
-}
-
-func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
- t.Lock()
- defer t.Unlock()
-
- return dupQueue(t.named)
-}
-
-func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
- t.Lock()
- defer t.Unlock()
-
- return dupQueue(t.archived)
-}
-
-func (t *taskQueueData) resetTasksWithLock() {
- for queuename := range t.named {
- t.named[queuename] = map[string]*taskqueue.Task{}
- t.archived[queuename] = map[string]*taskqueue.Task{}
- }
-}
-
-func (t *taskQueueData) ResetTasks() {
- t.Lock()
- defer t.Unlock()
-
- t.resetTasksWithLock()
-}
-
-func (t *taskQueueData) getQueueName(queueName string) (string, error) {
- if queueName == "" {
- queueName = "default"
- }
- if _, ok := t.named[queueName]; !ok {
- return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
- }
- return queueName, nil
-}
-
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) (
- *taskqueue.Task, string, error) {
- queueName, err := t.getQueueName(queueName)
- if err != nil {
- return nil, "", err
- }
-
- toSched := dupTask(task)
-
- if toSched.Path == "" {
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
- }
-
- if toSched.ETA.IsZero() {
- toSched.ETA = clock.Now(c).Add(toSched.Delay)
- } else if toSched.Delay != 0 {
- panic("taskqueue: both Delay and ETA are set")
- }
- toSched.Delay = 0
-
- if toSched.Method == "" {
- toSched.Method = "POST"
- }
- if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
- return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
- }
- if toSched.Method != "POST" && toSched.Method != "PUT" {
- toSched.Payload = nil
- }
-
- if _, ok := toSched.Header[currentNamespace]; !ok {
- if ns != "" {
- if toSched.Header == nil {
- toSched.Header = http.Header{}
- }
- toSched.Header[currentNamespace] = []string{ns}
- }
- }
- // TODO(riannucci): implement DefaultNamespace
-
- if toSched.Name == "" {
- toSched.Name = mkName(c, "", t.named[queueName])
- } else {
- if !validTaskName.MatchString(toSched.Name) {
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
- }
- }
-
- return toSched, queueName, nil
-}
-
-/////////////////////////////// txnTaskQueueData ///////////////////////////////
-
-type txnTaskQueueData struct {
- *wrapper.BrokenFeatures
-
- lock sync.Mutex
-
- // boolean 0 or 1, use atomic.*Int32 to access.
- closed int32
- anony wrapper.AnonymousQueueData
- parent *taskQueueData
-}
-
-var (
- _ = memContextObj((*txnTaskQueueData)(nil))
- _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
-)
-
-func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
-
-func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
- panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
-}
-
-func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
- return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
-}
-
-func (t *txnTaskQueueData) endTxn() {
- if atomic.LoadInt32(&t.closed) == 1 {
- panic("cannot end transaction twice")
- }
- atomic.StoreInt32(&t.closed, 1)
-}
-
-func (t *txnTaskQueueData) IsBroken() error {
- // Slightly different from the SDK... datastore and taskqueue each implement
- // this here, where in the SDK only datastore.transaction.Call does.
- if atomic.LoadInt32(&t.closed) == 1 {
- return fmt.Errorf("taskqueue: transaction context has expired")
- }
- return t.parent.IsBroken()
-}
-
-func (t *txnTaskQueueData) ResetTasks() {
- t.Lock()
- defer t.Unlock()
-
- for queuename := range t.anony {
- t.anony[queuename] = nil
- }
- t.parent.resetTasksWithLock()
-}
-
-func (t *txnTaskQueueData) Lock() {
- t.lock.Lock()
- t.parent.Lock()
-}
-func (t *txnTaskQueueData) Unlock() {
- t.parent.Unlock()
- t.lock.Unlock()
-}
-
-func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
- t.Lock()
- defer t.Unlock()
-
- ret := make(wrapper.AnonymousQueueData, len(t.anony))
- for k, vs := range t.anony {
- ret[k] = make([]*taskqueue.Task, len(vs))
- for i, v := range vs {
- tsk := dupTask(v)
- tsk.Name = ""
- ret[k][i] = tsk
- }
- }
-
- return ret
-}
-
-func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
- return t.parent.GetTombstonedTasks()
-}
-
-func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
- return t.parent.GetScheduledTasks()
-}
-
-func (t *txnTaskQueueData) CreateQueue(queueName string) {
- t.parent.CreateQueue(queueName)
-}

Powered by Google App Engine
This is Rietveld 408576698