| OLD | NEW | 
|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 package memory | 5 package memory | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         "errors" | 8         "errors" | 
| 9         "fmt" | 9         "fmt" | 
| 10         "net/http" | 10         "net/http" | 
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 117 } | 117 } | 
| 118 | 118 | 
| 119 var tqOkMethods = map[string]struct{}{ | 119 var tqOkMethods = map[string]struct{}{ | 
| 120         "GET":    {}, | 120         "GET":    {}, | 
| 121         "POST":   {}, | 121         "POST":   {}, | 
| 122         "HEAD":   {}, | 122         "HEAD":   {}, | 
| 123         "PUT":    {}, | 123         "PUT":    {}, | 
| 124         "DELETE": {}, | 124         "DELETE": {}, | 
| 125 } | 125 } | 
| 126 | 126 | 
| 127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
     eueName string) (*tq.Task, string, error) { | 127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
     eueName string) (*tq.Task, error) { | 
| 128 »       queueName, err := t.getQueueName(queueName) | 128 »       toSched := task.Duplicate() | 
| 129 »       if err != nil { |  | 
| 130 »       »       return nil, "", err |  | 
| 131 »       } |  | 
| 132 |  | 
| 133 »       toSched := dupTask(task) |  | 
| 134 | 129 | 
| 135         if toSched.Path == "" { | 130         if toSched.Path == "" { | 
| 136                 toSched.Path = "/_ah/queue/" + queueName | 131                 toSched.Path = "/_ah/queue/" + queueName | 
| 137         } | 132         } | 
| 138 | 133 | 
| 139         if toSched.ETA.IsZero() { | 134         if toSched.ETA.IsZero() { | 
| 140                 toSched.ETA = clock.Now(c).Add(toSched.Delay) | 135                 toSched.ETA = clock.Now(c).Add(toSched.Delay) | 
| 141         } else if toSched.Delay != 0 { | 136         } else if toSched.Delay != 0 { | 
| 142                 panic("taskqueue: both Delay and ETA are set") | 137                 panic("taskqueue: both Delay and ETA are set") | 
| 143         } | 138         } | 
| 144         toSched.Delay = 0 | 139         toSched.Delay = 0 | 
| 145 | 140 | 
| 146         if toSched.Method == "" { | 141         if toSched.Method == "" { | 
| 147                 toSched.Method = "POST" | 142                 toSched.Method = "POST" | 
| 148         } | 143         } | 
| 149         if _, ok := tqOkMethods[toSched.Method]; !ok { | 144         if _, ok := tqOkMethods[toSched.Method]; !ok { | 
| 150 »       »       return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M
     ethod) | 145 »       »       return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho
     d) | 
| 151         } | 146         } | 
| 152         if toSched.Method != "POST" && toSched.Method != "PUT" { | 147         if toSched.Method != "POST" && toSched.Method != "PUT" { | 
| 153                 toSched.Payload = nil | 148                 toSched.Payload = nil | 
| 154         } | 149         } | 
| 155 | 150 | 
| 156         if _, ok := toSched.Header[currentNamespace]; !ok { | 151         if _, ok := toSched.Header[currentNamespace]; !ok { | 
| 157                 if ns != "" { | 152                 if ns != "" { | 
| 158                         if toSched.Header == nil { | 153                         if toSched.Header == nil { | 
| 159                                 toSched.Header = http.Header{} | 154                                 toSched.Header = http.Header{} | 
| 160                         } | 155                         } | 
| 161                         toSched.Header[currentNamespace] = []string{ns} | 156                         toSched.Header[currentNamespace] = []string{ns} | 
| 162                 } | 157                 } | 
| 163         } | 158         } | 
| 164         // TODO(riannucci): implement DefaultNamespace | 159         // TODO(riannucci): implement DefaultNamespace | 
| 165 | 160 | 
| 166         if toSched.Name == "" { | 161         if toSched.Name == "" { | 
| 167                 toSched.Name = mkName(c, "", t.named[queueName]) | 162                 toSched.Name = mkName(c, "", t.named[queueName]) | 
| 168         } else { | 163         } else { | 
| 169                 if !validTaskName.MatchString(toSched.Name) { | 164                 if !validTaskName.MatchString(toSched.Name) { | 
| 170 »       »       »       return nil, "", errors.New("INVALID_TASK_NAME") | 165 »       »       »       return nil, errors.New("INVALID_TASK_NAME") | 
| 171                 } | 166                 } | 
| 172         } | 167         } | 
| 173 | 168 | 
| 174 »       return toSched, queueName, nil | 169 »       return toSched, nil | 
| 175 } | 170 } | 
| 176 | 171 | 
| 177 /////////////////////////////// txnTaskQueueData /////////////////////////////// | 172 /////////////////////////////// txnTaskQueueData /////////////////////////////// | 
| 178 | 173 | 
| 179 type txnTaskQueueData struct { | 174 type txnTaskQueueData struct { | 
| 180         lock sync.Mutex | 175         lock sync.Mutex | 
| 181 | 176 | 
| 182         // boolean 0 or 1, use atomic.*Int32 to access. | 177         // boolean 0 or 1, use atomic.*Int32 to access. | 
| 183         closed int32 | 178         closed int32 | 
| 184         anony  tq.AnonymousQueueData | 179         anony  tq.AnonymousQueueData | 
| 185         parent *taskQueueData | 180         parent *taskQueueData | 
| 186 } | 181 } | 
| 187 | 182 | 
| 188 var ( | 183 var ( | 
| 189         _ = memContextObj((*txnTaskQueueData)(nil)) | 184         _ = memContextObj((*txnTaskQueueData)(nil)) | 
| 190         _ = tq.Testable((*txnTaskQueueData)(nil)) | 185         _ = tq.Testable((*txnTaskQueueData)(nil)) | 
| 191 ) | 186 ) | 
| 192 | 187 | 
| 193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool         { return 
     false } | 188 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool         { return 
     false } | 
| 194 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj)    { panic("
     impossible") } | 189 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj)    { panic("
     impossible") } | 
| 195 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic("
     impossible") } | 190 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic("
     impossible") } | 
| 196 | 191 | 
| 197 func (t *txnTaskQueueData) endTxn() { | 192 func (t *txnTaskQueueData) endTxn() { | 
| 198         if atomic.LoadInt32(&t.closed) == 1 { | 193         if atomic.LoadInt32(&t.closed) == 1 { | 
| 199                 panic("cannot end transaction twice") | 194                 panic("cannot end transaction twice") | 
| 200         } | 195         } | 
| 201         atomic.StoreInt32(&t.closed, 1) | 196         atomic.StoreInt32(&t.closed, 1) | 
| 202 } | 197 } | 
| 203 | 198 | 
| 204 func (t *txnTaskQueueData) run(f func() error) error { |  | 
| 205         // Slightly different from the SDK... datastore and taskqueue each imple
     ment |  | 
| 206         // this here, where in the SDK only datastore.transaction.Call does. |  | 
| 207         if atomic.LoadInt32(&t.closed) == 1 { |  | 
| 208                 return fmt.Errorf("taskqueue: transaction context has expired") |  | 
| 209         } |  | 
| 210         return f() |  | 
| 211 } |  | 
| 212 |  | 
| 213 func (t *txnTaskQueueData) ResetTasks() { | 199 func (t *txnTaskQueueData) ResetTasks() { | 
| 214         t.Lock() | 200         t.Lock() | 
| 215         defer t.Unlock() | 201         defer t.Unlock() | 
| 216 | 202 | 
| 217         for queuename := range t.anony { | 203         for queuename := range t.anony { | 
| 218                 t.anony[queuename] = nil | 204                 t.anony[queuename] = nil | 
| 219         } | 205         } | 
| 220         t.parent.resetTasksWithLock() | 206         t.parent.resetTasksWithLock() | 
| 221 } | 207 } | 
| 222 | 208 | 
| 223 func (t *txnTaskQueueData) Lock() { | 209 func (t *txnTaskQueueData) Lock() { | 
| 224         t.lock.Lock() | 210         t.lock.Lock() | 
| 225         t.parent.Lock() | 211         t.parent.Lock() | 
| 226 } | 212 } | 
| 227 func (t *txnTaskQueueData) Unlock() { | 213 func (t *txnTaskQueueData) Unlock() { | 
| 228         t.parent.Unlock() | 214         t.parent.Unlock() | 
| 229         t.lock.Unlock() | 215         t.lock.Unlock() | 
| 230 } | 216 } | 
| 231 | 217 | 
| 232 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { | 218 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { | 
| 233         t.Lock() | 219         t.Lock() | 
| 234         defer t.Unlock() | 220         defer t.Unlock() | 
| 235 | 221 | 
| 236         ret := make(tq.AnonymousQueueData, len(t.anony)) | 222         ret := make(tq.AnonymousQueueData, len(t.anony)) | 
| 237         for k, vs := range t.anony { | 223         for k, vs := range t.anony { | 
| 238                 ret[k] = make([]*tq.Task, len(vs)) | 224                 ret[k] = make([]*tq.Task, len(vs)) | 
| 239                 for i, v := range vs { | 225                 for i, v := range vs { | 
| 240 »       »       »       tsk := dupTask(v) | 226 »       »       »       tsk := v.Duplicate() | 
| 241                         tsk.Name = "" | 227                         tsk.Name = "" | 
| 242                         ret[k][i] = tsk | 228                         ret[k][i] = tsk | 
| 243                 } | 229                 } | 
| 244         } | 230         } | 
| 245 | 231 | 
| 246         return ret | 232         return ret | 
| 247 } | 233 } | 
| 248 | 234 | 
| 249 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData { | 235 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData { | 
| 250         return t.parent.GetTombstonedTasks() | 236         return t.parent.GetTombstonedTasks() | 
| 251 } | 237 } | 
| 252 | 238 | 
| 253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { | 239 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { | 
| 254         return t.parent.GetScheduledTasks() | 240         return t.parent.GetScheduledTasks() | 
| 255 } | 241 } | 
| 256 | 242 | 
| 257 func (t *txnTaskQueueData) CreateQueue(queueName string) { | 243 func (t *txnTaskQueueData) CreateQueue(queueName string) { | 
| 258         t.parent.CreateQueue(queueName) | 244         t.parent.CreateQueue(queueName) | 
| 259 } | 245 } | 
| OLD | NEW | 
|---|