Index: appengine/cmd/dm/distributor/distributor.go |
diff --git a/appengine/cmd/dm/distributor/distributor.go b/appengine/cmd/dm/distributor/distributor.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b6d2c8f140e0020a22ef0b7e6f9bb8dbcb05c3dd |
--- /dev/null |
+++ b/appengine/cmd/dm/distributor/distributor.go |
@@ -0,0 +1,147 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// Package distributor contains all the adaptors for the various supported |
+// distributor protocols. At a high level, it works like this: |
+// * Quests specify a distributor configuration by name as part of their |
+// identity. |
+// * When an Execution for that Quest NeedsExecution, DM reads configuration |
martiniss
2016/01/19 23:02:40
reads *its* configuration.
|
+// (distributor.proto) from luci-config. This configuration is stored |
+// as part of the Execution so that for the duration of a given Exectuion, |
+// DM always interacts with the same distributor in the same way (barring |
+// code changes in DM's adapter logic itself). |
+// * DM uses the selected distributor implementation to start a task and |
+// record its Token. Additionally, the distributor MUST subscribe to publish |
martiniss
2016/01/19 23:02:40
Split into another bullet point?
Also I don't re
|
+// on DM's pubsub topic for updates. When publishing updates, the |
+// distributor MUST include 2 attributes (execution_id, pubsub_key), which |
+// are provided as part of TaskDescription. |
+// * When DM gets a hit on pubsub, it will load the Execution, load its cached |
martiniss
2016/01/19 23:02:40
gets a message?
|
+// distributor configuration, and then call HandleNotification for the |
+// adapter to parse the notification body and return the state of the task. |
+// |
+// Adding a new distributor requires: |
+// * Add a new subdir of protos with the configuration proto for the new |
martiniss
2016/01/19 23:02:40
Maybe add the directory itself you need, relative
|
+// distributor. Each distributor implementation must have its own unique |
+// Config message. |
+// * Add a matching subdir of this package for the implementation of the |
+// distributor. |
+// * In the implementation, add a Register method that registers the |
+// implementation with this package appropriately. |
+// * In the DM frontend, import your new package implementation and run its |
+// Register method. |
+package distributor |
+ |
+import ( |
+ "net/http" |
+ |
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/attempt" |
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/execution" |
+ "github.com/luci/luci-go/appengine/cmd/dm/types" |
nodir
2016/01/20 21:07:32
I thought we keep the luci-go import section last
|
+ |
+ "golang.org/x/net/context" |
+ "google.golang.org/api/pubsub/v1" |
+) |
+ |
+// Token is an opaque token that a distributor should use to |
+// uniquely identify a single DM execution. |
+type Token string |
+ |
+// The PersistentState token for the job. For a given Attempt, this will be |
+// retrieved from Finished executions and then passed to new Executions. |
+type PersistentState string |
+ |
+// TaskState is DM's view of the state for a distributor task. It contains one |
+// of the execution.State values, and if the execution.State is Finished, |
+// PersistentState should be set, if the task had persisted state. |
+type TaskState struct { |
+ ExecutionState execution.State |
+ PersistentState PersistentState |
+} |
+ |
+// Dep represents the pertinent information of a single dependency. |
nodir
2016/01/20 21:07:32
persistent
|
+type Dep struct { |
+ AttemptID types.AttemptID |
+ State attempt.State |
+ Data []byte |
+} |
+ |
+// TaskDescription is the interface for the object that will be consumed by |
+// PrepareTask. |
+type TaskDescription interface { |
+ Payload() []byte |
+ |
+ AttemptID() *types.AttemptID |
+ ExecutionID() types.UInt32 |
+ ExecutionKey() []byte |
+ |
+ PreviousState() PersistentState |
+ |
+ // PrepareTopic creates a PubSub topic for notifications related to the task |
+ // and adds given publisher to its ACL. |
+ // |
+ // It returns full name of the topic and a token that will be used to route |
+ // PubSub messages back to the Manager. Topic name and its configuration are |
+ // controlled by the Engine. The publisher to the topic must be instructed to |
+ // put the token into 'auth_token' attribute of PubSub messages. DM will know |
+ // how to route such messages to Interface.HandleNotification. |
+ PrepareTopic(publisher string) (topic, token string, err error) |
+} |
+ |
+// D is the interface for all distributor implementations. |
+// |
+// Retries |
+// |
+// Unless otherwise noted, DM will retry methods here if they return an error |
+// marked as Transient, up to some internal limit. If they return |
+// a non-Transient error (or nil) DM will make a best effort not to duplicate |
+// calls, but it can't guarantee that. |
+type D interface { |
+ // Run prepares and runs a new Task from the given TaskDescription. |
+ // |
+ // Scheduling the same TaskDescription multiple times SHOULD return the same |
+ // Token. |
+ // |
+ // If this returns a non-Transient error, the Execution will be marked as |
+ // Rejected with the returned error message as the 'Reason'. |
nodir
2016/01/20 21:07:32
What is Reason? Please reference it so a reader ca
|
+ Run(TaskDescription) (Token, error) |
+ |
+ // Cancel attempts to cancel a running task. If a task is canceled more than |
+ // once, this should return nil. |
+ Cancel(Token) error |
+ |
+ // GetStatus retrieves the current state of the task from the distributor. |
+ // |
+ // If this returns a non-Transient error more than 30 seconds after the task |
+ // was Run(), the execution will be marked Missing with the returned error |
+ // message as the 'Reason'. If it returns a non-Transient error within 30 |
+ // seconds of being run, DM will automatically treat that as Transient. |
+ GetStatus(Token) (*TaskState, error) |
nodir
2016/01/20 21:07:32
rename to "State"?
|
+ |
+ // InfoURL calculates a user-presentable information url for the task |
+ // identified by Token. This should be a local operation, so it is not the |
+ // implementation's responsibility to validate the token in this method (e.g. |
+ // it could point to a non-existant job, etc.) |
+ InfoURL(Token) string |
nodir
2016/01/20 21:07:32
could return *url.URL
|
+ |
+ // HandleNotification is called whenever DM receives a PubSub message sent to |
+ // a topic created with TaskDescription.PrepareTopic. |
+ // |
+ // Returning (nil, nil) will indicate that DM should poll the Distributor for |
+ // the TaskState. |
+ // |
+ // DM will convert pubsub Messages to a delayed GetStatus if a pubsub message |
+ // is delivered which refers to an Attempt whose status is NeedsExecution, |
+ // which could happen in the event of a not-fully-settled Tumble transacion. |
+ HandleNotification(*pubsub.PubsubMessage) (*TaskState, error) |
+ |
+ // HandleTaskQueueTask is called if a request appears at the tqHandlerURL |
+ // provided to the Factory. |
+ HandleTaskQueueTask(c context.Context, r *http.Request) error |
nodir
2016/01/20 21:07:32
HandlePushTask?
|
+} |
+ |
+// Factory is a function which produces new distributor instance with the |
nodir
2016/01/20 21:07:32
nit: a new distributor
|
+// provided configuration proto. |
+// |
+// c is guaranteed to be non-transactional. |
+type Factory func(c context.Context, dist *Config) (D, error) |