Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(695)

Side by Side Diff: appengine/cmd/dm/distributor/distributor.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: work in progress Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Package distributor contains all the adaptors for the various supported
6 // distributor protocols. At a high level, it works like this:
7 // * Quests specify a distributor configuration by name as part of their
8 // identity.
9 // * When an Execution for that Quest NeedsExecution, DM reads configuration
martiniss 2016/01/19 23:02:40 reads *its* configuration.
10 // (distributor.proto) from luci-config. This configuration is stored
11 // as part of the Execution so that for the duration of a given Exectuion,
12 // DM always interacts with the same distributor in the same way (barring
13 // code changes in DM's adapter logic itself).
14 // * DM uses the selected distributor implementation to start a task and
15 // record its Token. Additionally, the distributor MUST subscribe to publish
martiniss 2016/01/19 23:02:40 Split into another bullet point? Also I don't re
16 // on DM's pubsub topic for updates. When publishing updates, the
17 // distributor MUST include 2 attributes (execution_id, pubsub_key), which
18 // are provided as part of TaskDescription.
19 // * When DM gets a hit on pubsub, it will load the Execution, load its cached
martiniss 2016/01/19 23:02:40 gets a message?
20 // distributor configuration, and then call HandleNotification for the
21 // adapter to parse the notification body and return the state of the task.
22 //
23 // Adding a new distributor requires:
24 // * Add a new subdir of protos with the configuration proto for the new
martiniss 2016/01/19 23:02:40 Maybe add the directory itself you need, relative
25 // distributor. Each distributor implementation must have its own unique
26 // Config message.
27 // * Add a matching subdir of this package for the implementation of the
28 // distributor.
29 // * In the implementation, add a Register method that registers the
30 // implementation with this package appropriately.
31 // * In the DM frontend, import your new package implementation and run its
32 // Register method.
33 package distributor
34
35 import (
36 "net/http"
37
38 "github.com/luci/luci-go/appengine/cmd/dm/enums/attempt"
39 "github.com/luci/luci-go/appengine/cmd/dm/enums/execution"
40 "github.com/luci/luci-go/appengine/cmd/dm/types"
nodir 2016/01/20 21:07:32 I thought we keep the luci-go import section last
41
42 "golang.org/x/net/context"
43 "google.golang.org/api/pubsub/v1"
44 )
45
46 // Token is an opaque token that a distributor should use to
47 // uniquely identify a single DM execution.
48 type Token string
49
50 // The PersistentState token for the job. For a given Attempt, this will be
51 // retrieved from Finished executions and then passed to new Executions.
52 type PersistentState string
53
54 // TaskState is DM's view of the state for a distributor task. It contains one
55 // of the execution.State values, and if the execution.State is Finished,
56 // PersistentState should be set, if the task had persisted state.
57 type TaskState struct {
58 ExecutionState execution.State
59 PersistentState PersistentState
60 }
61
62 // Dep represents the pertinent information of a single dependency.
nodir 2016/01/20 21:07:32 persistent
63 type Dep struct {
64 AttemptID types.AttemptID
65 State attempt.State
66 Data []byte
67 }
68
69 // TaskDescription is the interface for the object that will be consumed by
70 // PrepareTask.
71 type TaskDescription interface {
72 Payload() []byte
73
74 AttemptID() *types.AttemptID
75 ExecutionID() types.UInt32
76 ExecutionKey() []byte
77
78 PreviousState() PersistentState
79
80 // PrepareTopic creates a PubSub topic for notifications related to the task
81 // and adds given publisher to its ACL.
82 //
83 // It returns full name of the topic and a token that will be used to ro ute
84 // PubSub messages back to the Manager. Topic name and its configuration are
85 // controlled by the Engine. The publisher to the topic must be instruct ed to
86 // put the token into 'auth_token' attribute of PubSub messages. DM will know
87 // how to route such messages to Interface.HandleNotification.
88 PrepareTopic(publisher string) (topic, token string, err error)
89 }
90
91 // D is the interface for all distributor implementations.
92 //
93 // Retries
94 //
95 // Unless otherwise noted, DM will retry methods here if they return an error
96 // marked as Transient, up to some internal limit. If they return
97 // a non-Transient error (or nil) DM will make a best effort not to duplicate
98 // calls, but it can't guarantee that.
99 type D interface {
100 // Run prepares and runs a new Task from the given TaskDescription.
101 //
102 // Scheduling the same TaskDescription multiple times SHOULD return the same
103 // Token.
104 //
105 // If this returns a non-Transient error, the Execution will be marked a s
106 // Rejected with the returned error message as the 'Reason'.
nodir 2016/01/20 21:07:32 What is Reason? Please reference it so a reader ca
107 Run(TaskDescription) (Token, error)
108
109 // Cancel attempts to cancel a running task. If a task is canceled more than
110 // once, this should return nil.
111 Cancel(Token) error
112
113 // GetStatus retrieves the current state of the task from the distributo r.
114 //
115 // If this returns a non-Transient error more than 30 seconds after the task
116 // was Run(), the execution will be marked Missing with the returned err or
117 // message as the 'Reason'. If it returns a non-Transient error within 3 0
118 // seconds of being run, DM will automatically treat that as Transient.
119 GetStatus(Token) (*TaskState, error)
nodir 2016/01/20 21:07:32 rename to "State"?
120
121 // InfoURL calculates a user-presentable information url for the task
122 // identified by Token. This should be a local operation, so it is not t he
123 // implementation's responsibility to validate the token in this method (e.g.
124 // it could point to a non-existant job, etc.)
125 InfoURL(Token) string
nodir 2016/01/20 21:07:32 could return *url.URL
126
127 // HandleNotification is called whenever DM receives a PubSub message se nt to
128 // a topic created with TaskDescription.PrepareTopic.
129 //
130 // Returning (nil, nil) will indicate that DM should poll the Distributo r for
131 // the TaskState.
132 //
133 // DM will convert pubsub Messages to a delayed GetStatus if a pubsub me ssage
134 // is delivered which refers to an Attempt whose status is NeedsExecutio n,
135 // which could happen in the event of a not-fully-settled Tumble transac ion.
136 HandleNotification(*pubsub.PubsubMessage) (*TaskState, error)
137
138 // HandleTaskQueueTask is called if a request appears at the tqHandlerUR L
139 // provided to the Factory.
140 HandleTaskQueueTask(c context.Context, r *http.Request) error
nodir 2016/01/20 21:07:32 HandlePushTask?
141 }
142
143 // Factory is a function which produces new distributor instance with the
nodir 2016/01/20 21:07:32 nit: a new distributor
144 // provided configuration proto.
145 //
146 // c is guaranteed to be non-transactional.
147 type Factory func(c context.Context, dist *Config) (D, error)
OLDNEW
« no previous file with comments | « appengine/cmd/dm/distributor/config.go ('k') | appengine/cmd/dm/distributor/impl/jobsim/distributor.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698