Chromium Code Reviews| Index: appengine/cmd/dm/distributor/distributor.go |
| diff --git a/appengine/cmd/dm/distributor/distributor.go b/appengine/cmd/dm/distributor/distributor.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..b6d2c8f140e0020a22ef0b7e6f9bb8dbcb05c3dd |
| --- /dev/null |
| +++ b/appengine/cmd/dm/distributor/distributor.go |
| @@ -0,0 +1,147 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +// Package distributor contains all the adaptors for the various supported |
| +// distributor protocols. At a high level, it works like this: |
| +// * Quests specify a distributor configuration by name as part of their |
| +// identity. |
| +// * When an Execution for that Quest NeedsExecution, DM reads configuration |
|
martiniss
2016/01/19 23:02:40
reads *its* configuration.
|
| +// (distributor.proto) from luci-config. This configuration is stored |
| +// as part of the Execution so that for the duration of a given Exectuion, |
| +// DM always interacts with the same distributor in the same way (barring |
| +// code changes in DM's adapter logic itself). |
| +// * DM uses the selected distributor implementation to start a task and |
| +// record its Token. Additionally, the distributor MUST subscribe to publish |
|
martiniss
2016/01/19 23:02:40
Split into another bullet point?
Also I don't re
|
| +// on DM's pubsub topic for updates. When publishing updates, the |
| +// distributor MUST include 2 attributes (execution_id, pubsub_key), which |
| +// are provided as part of TaskDescription. |
| +// * When DM gets a hit on pubsub, it will load the Execution, load its cached |
|
martiniss
2016/01/19 23:02:40
gets a message?
|
| +// distributor configuration, and then call HandleNotification for the |
| +// adapter to parse the notification body and return the state of the task. |
| +// |
| +// Adding a new distributor requires: |
| +// * Add a new subdir of protos with the configuration proto for the new |
|
martiniss
2016/01/19 23:02:40
Maybe add the directory itself you need, relative
|
| +// distributor. Each distributor implementation must have its own unique |
| +// Config message. |
| +// * Add a matching subdir of this package for the implementation of the |
| +// distributor. |
| +// * In the implementation, add a Register method that registers the |
| +// implementation with this package appropriately. |
| +// * In the DM frontend, import your new package implementation and run its |
| +// Register method. |
| +package distributor |
| + |
| +import ( |
| + "net/http" |
| + |
| + "github.com/luci/luci-go/appengine/cmd/dm/enums/attempt" |
| + "github.com/luci/luci-go/appengine/cmd/dm/enums/execution" |
| + "github.com/luci/luci-go/appengine/cmd/dm/types" |
|
nodir
2016/01/20 21:07:32
I thought we keep the luci-go import section last
|
| + |
| + "golang.org/x/net/context" |
| + "google.golang.org/api/pubsub/v1" |
| +) |
| + |
| +// Token is an opaque token that a distributor should use to |
| +// uniquely identify a single DM execution. |
| +type Token string |
| + |
| +// The PersistentState token for the job. For a given Attempt, this will be |
| +// retrieved from Finished executions and then passed to new Executions. |
| +type PersistentState string |
| + |
| +// TaskState is DM's view of the state for a distributor task. It contains one |
| +// of the execution.State values, and if the execution.State is Finished, |
| +// PersistentState should be set, if the task had persisted state. |
| +type TaskState struct { |
| + ExecutionState execution.State |
| + PersistentState PersistentState |
| +} |
| + |
| +// Dep represents the pertinent information of a single dependency. |
|
nodir
2016/01/20 21:07:32
persistent
|
| +type Dep struct { |
| + AttemptID types.AttemptID |
| + State attempt.State |
| + Data []byte |
| +} |
| + |
| +// TaskDescription is the interface for the object that will be consumed by |
| +// PrepareTask. |
| +type TaskDescription interface { |
| + Payload() []byte |
| + |
| + AttemptID() *types.AttemptID |
| + ExecutionID() types.UInt32 |
| + ExecutionKey() []byte |
| + |
| + PreviousState() PersistentState |
| + |
| + // PrepareTopic creates a PubSub topic for notifications related to the task |
| + // and adds given publisher to its ACL. |
| + // |
| + // It returns full name of the topic and a token that will be used to route |
| + // PubSub messages back to the Manager. Topic name and its configuration are |
| + // controlled by the Engine. The publisher to the topic must be instructed to |
| + // put the token into 'auth_token' attribute of PubSub messages. DM will know |
| + // how to route such messages to Interface.HandleNotification. |
| + PrepareTopic(publisher string) (topic, token string, err error) |
| +} |
| + |
| +// D is the interface for all distributor implementations. |
| +// |
| +// Retries |
| +// |
| +// Unless otherwise noted, DM will retry methods here if they return an error |
| +// marked as Transient, up to some internal limit. If they return |
| +// a non-Transient error (or nil) DM will make a best effort not to duplicate |
| +// calls, but it can't guarantee that. |
| +type D interface { |
| + // Run prepares and runs a new Task from the given TaskDescription. |
| + // |
| + // Scheduling the same TaskDescription multiple times SHOULD return the same |
| + // Token. |
| + // |
| + // If this returns a non-Transient error, the Execution will be marked as |
| + // Rejected with the returned error message as the 'Reason'. |
|
nodir
2016/01/20 21:07:32
What is Reason? Please reference it so a reader ca
|
| + Run(TaskDescription) (Token, error) |
| + |
| + // Cancel attempts to cancel a running task. If a task is canceled more than |
| + // once, this should return nil. |
| + Cancel(Token) error |
| + |
| + // GetStatus retrieves the current state of the task from the distributor. |
| + // |
| + // If this returns a non-Transient error more than 30 seconds after the task |
| + // was Run(), the execution will be marked Missing with the returned error |
| + // message as the 'Reason'. If it returns a non-Transient error within 30 |
| + // seconds of being run, DM will automatically treat that as Transient. |
| + GetStatus(Token) (*TaskState, error) |
|
nodir
2016/01/20 21:07:32
rename to "State"?
|
| + |
| + // InfoURL calculates a user-presentable information url for the task |
| + // identified by Token. This should be a local operation, so it is not the |
| + // implementation's responsibility to validate the token in this method (e.g. |
| + // it could point to a non-existant job, etc.) |
| + InfoURL(Token) string |
|
nodir
2016/01/20 21:07:32
could return *url.URL
|
| + |
| + // HandleNotification is called whenever DM receives a PubSub message sent to |
| + // a topic created with TaskDescription.PrepareTopic. |
| + // |
| + // Returning (nil, nil) will indicate that DM should poll the Distributor for |
| + // the TaskState. |
| + // |
| + // DM will convert pubsub Messages to a delayed GetStatus if a pubsub message |
| + // is delivered which refers to an Attempt whose status is NeedsExecution, |
| + // which could happen in the event of a not-fully-settled Tumble transacion. |
| + HandleNotification(*pubsub.PubsubMessage) (*TaskState, error) |
| + |
| + // HandleTaskQueueTask is called if a request appears at the tqHandlerURL |
| + // provided to the Factory. |
| + HandleTaskQueueTask(c context.Context, r *http.Request) error |
|
nodir
2016/01/20 21:07:32
HandlePushTask?
|
| +} |
| + |
| +// Factory is a function which produces new distributor instance with the |
|
nodir
2016/01/20 21:07:32
nit: a new distributor
|
| +// provided configuration proto. |
| +// |
| +// c is guaranteed to be non-transactional. |
| +type Factory func(c context.Context, dist *Config) (D, error) |