Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2174)

Unified Diff: appengine/cmd/dm/distributor/distributor.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: work in progress Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/distributor/config.go ('k') | appengine/cmd/dm/distributor/impl/jobsim/distributor.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/distributor/distributor.go
diff --git a/appengine/cmd/dm/distributor/distributor.go b/appengine/cmd/dm/distributor/distributor.go
new file mode 100644
index 0000000000000000000000000000000000000000..b6d2c8f140e0020a22ef0b7e6f9bb8dbcb05c3dd
--- /dev/null
+++ b/appengine/cmd/dm/distributor/distributor.go
@@ -0,0 +1,147 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Package distributor contains all the adaptors for the various supported
+// distributor protocols. At a high level, it works like this:
+// * Quests specify a distributor configuration by name as part of their
+// identity.
+// * When an Execution for that Quest NeedsExecution, DM reads configuration
martiniss 2016/01/19 23:02:40 reads *its* configuration.
+// (distributor.proto) from luci-config. This configuration is stored
+// as part of the Execution so that for the duration of a given Exectuion,
+// DM always interacts with the same distributor in the same way (barring
+// code changes in DM's adapter logic itself).
+// * DM uses the selected distributor implementation to start a task and
+// record its Token. Additionally, the distributor MUST subscribe to publish
martiniss 2016/01/19 23:02:40 Split into another bullet point? Also I don't re
+// on DM's pubsub topic for updates. When publishing updates, the
+// distributor MUST include 2 attributes (execution_id, pubsub_key), which
+// are provided as part of TaskDescription.
+// * When DM gets a hit on pubsub, it will load the Execution, load its cached
martiniss 2016/01/19 23:02:40 gets a message?
+// distributor configuration, and then call HandleNotification for the
+// adapter to parse the notification body and return the state of the task.
+//
+// Adding a new distributor requires:
+// * Add a new subdir of protos with the configuration proto for the new
martiniss 2016/01/19 23:02:40 Maybe add the directory itself you need, relative
+// distributor. Each distributor implementation must have its own unique
+// Config message.
+// * Add a matching subdir of this package for the implementation of the
+// distributor.
+// * In the implementation, add a Register method that registers the
+// implementation with this package appropriately.
+// * In the DM frontend, import your new package implementation and run its
+// Register method.
+package distributor
+
+import (
+ "net/http"
+
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/attempt"
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/execution"
+ "github.com/luci/luci-go/appengine/cmd/dm/types"
nodir 2016/01/20 21:07:32 I thought we keep the luci-go import section last
+
+ "golang.org/x/net/context"
+ "google.golang.org/api/pubsub/v1"
+)
+
+// Token is an opaque token that a distributor should use to
+// uniquely identify a single DM execution.
+type Token string
+
+// The PersistentState token for the job. For a given Attempt, this will be
+// retrieved from Finished executions and then passed to new Executions.
+type PersistentState string
+
+// TaskState is DM's view of the state for a distributor task. It contains one
+// of the execution.State values, and if the execution.State is Finished,
+// PersistentState should be set, if the task had persisted state.
+type TaskState struct {
+ ExecutionState execution.State
+ PersistentState PersistentState
+}
+
+// Dep represents the pertinent information of a single dependency.
nodir 2016/01/20 21:07:32 persistent
+type Dep struct {
+ AttemptID types.AttemptID
+ State attempt.State
+ Data []byte
+}
+
+// TaskDescription is the interface for the object that will be consumed by
+// PrepareTask.
+type TaskDescription interface {
+ Payload() []byte
+
+ AttemptID() *types.AttemptID
+ ExecutionID() types.UInt32
+ ExecutionKey() []byte
+
+ PreviousState() PersistentState
+
+ // PrepareTopic creates a PubSub topic for notifications related to the task
+ // and adds given publisher to its ACL.
+ //
+ // It returns full name of the topic and a token that will be used to route
+ // PubSub messages back to the Manager. Topic name and its configuration are
+ // controlled by the Engine. The publisher to the topic must be instructed to
+ // put the token into 'auth_token' attribute of PubSub messages. DM will know
+ // how to route such messages to Interface.HandleNotification.
+ PrepareTopic(publisher string) (topic, token string, err error)
+}
+
+// D is the interface for all distributor implementations.
+//
+// Retries
+//
+// Unless otherwise noted, DM will retry methods here if they return an error
+// marked as Transient, up to some internal limit. If they return
+// a non-Transient error (or nil) DM will make a best effort not to duplicate
+// calls, but it can't guarantee that.
+type D interface {
+ // Run prepares and runs a new Task from the given TaskDescription.
+ //
+ // Scheduling the same TaskDescription multiple times SHOULD return the same
+ // Token.
+ //
+ // If this returns a non-Transient error, the Execution will be marked as
+ // Rejected with the returned error message as the 'Reason'.
nodir 2016/01/20 21:07:32 What is Reason? Please reference it so a reader ca
+ Run(TaskDescription) (Token, error)
+
+ // Cancel attempts to cancel a running task. If a task is canceled more than
+ // once, this should return nil.
+ Cancel(Token) error
+
+ // GetStatus retrieves the current state of the task from the distributor.
+ //
+ // If this returns a non-Transient error more than 30 seconds after the task
+ // was Run(), the execution will be marked Missing with the returned error
+ // message as the 'Reason'. If it returns a non-Transient error within 30
+ // seconds of being run, DM will automatically treat that as Transient.
+ GetStatus(Token) (*TaskState, error)
nodir 2016/01/20 21:07:32 rename to "State"?
+
+ // InfoURL calculates a user-presentable information url for the task
+ // identified by Token. This should be a local operation, so it is not the
+ // implementation's responsibility to validate the token in this method (e.g.
+ // it could point to a non-existant job, etc.)
+ InfoURL(Token) string
nodir 2016/01/20 21:07:32 could return *url.URL
+
+ // HandleNotification is called whenever DM receives a PubSub message sent to
+ // a topic created with TaskDescription.PrepareTopic.
+ //
+ // Returning (nil, nil) will indicate that DM should poll the Distributor for
+ // the TaskState.
+ //
+ // DM will convert pubsub Messages to a delayed GetStatus if a pubsub message
+ // is delivered which refers to an Attempt whose status is NeedsExecution,
+ // which could happen in the event of a not-fully-settled Tumble transacion.
+ HandleNotification(*pubsub.PubsubMessage) (*TaskState, error)
+
+ // HandleTaskQueueTask is called if a request appears at the tqHandlerURL
+ // provided to the Factory.
+ HandleTaskQueueTask(c context.Context, r *http.Request) error
nodir 2016/01/20 21:07:32 HandlePushTask?
+}
+
+// Factory is a function which produces new distributor instance with the
nodir 2016/01/20 21:07:32 nit: a new distributor
+// provided configuration proto.
+//
+// c is guaranteed to be non-transactional.
+type Factory func(c context.Context, dist *Config) (D, error)
« no previous file with comments | « appengine/cmd/dm/distributor/config.go ('k') | appengine/cmd/dm/distributor/impl/jobsim/distributor.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698