OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 // Package distributor contains all the adaptors for the various supported |
| 6 // distributor protocols. At a high level, it works like this: |
| 7 // * Quests specify a distributor configuration by name as part of their |
| 8 // identity. |
| 9 // * When an Execution for that Quest NeedsExecution, DM reads configuration |
| 10 // (distributor.proto) from luci-config. This configuration is stored |
| 11 // as part of the Execution so that for the duration of a given Exectuion, |
| 12 // DM always interacts with the same distributor in the same way (barring |
| 13 // code changes in DM's adapter logic itself). |
| 14 // * DM uses the selected distributor implementation to start a task and |
| 15 // record its Token. Additionally, the distributor MUST subscribe to publish |
| 16 // on DM's pubsub topic for updates. When publishing updates, the |
| 17 // distributor MUST include 2 attributes (execution_id, pubsub_key), which |
| 18 // are provided as part of TaskDescription. |
| 19 // * When DM gets a hit on pubsub, it will load the Execution, load its cached |
| 20 // distributor configuration, and then call HandleNotification for the |
| 21 // adapter to parse the notification body and return the state of the task. |
| 22 // |
| 23 // Adding a new distributor requires: |
| 24 // * Add a new subdir of protos with the configuration proto for the new |
| 25 // distributor. Each distributor implementation must have its own unique |
| 26 // Config message. |
| 27 // * Add a matching subdir of this package for the implementation of the |
| 28 // distributor. |
| 29 // * In the implementation, add a Register method that registers the |
| 30 // implementation with this package appropriately. |
| 31 // * In the DM frontend, import your new package implementation and run its |
| 32 // Register method. |
| 33 package distributor |
| 34 |
| 35 import ( |
| 36 "net/http" |
| 37 "time" |
| 38 |
| 39 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 40 |
| 41 "golang.org/x/net/context" |
| 42 ) |
| 43 |
| 44 // Token is an opaque token that a distributor should use to |
| 45 // uniquely identify a single DM execution. |
| 46 type Token string |
| 47 |
| 48 // The PersistentState token for the job. For a given Attempt, this will be |
| 49 // retrieved from Finished executions and then passed to new Executions. |
| 50 type PersistentState []byte |
| 51 |
| 52 // TaskResult is the conclusion of the task. One of the two fields may be |
| 53 // populated. |
| 54 type TaskResult struct { |
| 55 // PersistentState if AbnormalFinish is nil. This indicates that the Exe
cution |
| 56 // is counted as FINISHED, with this value as its PersistentState. |
| 57 PersistentState PersistentState |
| 58 // If this is !nil, the Execution is counted as ABNORMAL_FINISHED, with
this |
| 59 // value as the result. |
| 60 AbnormalFinish *dm.AbnormalFinish |
| 61 } |
| 62 |
| 63 // Notification represents a notification from the distributor to DM that |
| 64 // a particular execution has a status update. Data and Attrs are interpreted |
| 65 // purely by the distributor implementation. |
| 66 type Notification struct { |
| 67 ID *dm.Execution_ID |
| 68 Data []byte |
| 69 Attrs map[string]string |
| 70 } |
| 71 |
| 72 // D is the interface for all distributor implementations. |
| 73 // |
| 74 // Retries |
| 75 // |
| 76 // Unless otherwise noted, DM will retry methods here if they return an error |
| 77 // marked as Transient, up to some internal limit. If they return |
| 78 // a non-Transient error (or nil) DM will make a best effort not to duplicate |
| 79 // calls, but it can't guarantee that. |
| 80 type D interface { |
| 81 // Run prepares and runs a new Task from the given TaskDescription. |
| 82 // |
| 83 // Scheduling the same TaskDescription multiple times SHOULD return the
same |
| 84 // Token. It's OK if this doesn't happen, but only one of the scheduled
tasks |
| 85 // will be able to invoke ActivateExecution; the other one(s) will |
| 86 // early-abort and/or timeout. |
| 87 // |
| 88 // If this returns a non-Transient error, the Execution will be marked a
s |
| 89 // Rejected with the returned error message as the 'Reason'. |
| 90 // |
| 91 // The various time durations, if non-zero, will be used verbatim for DM
to |
| 92 // timeout that phase of the task's execution. If the task's execution t
imes |
| 93 // out in the 'STOPPING' phase, DM will poll the distributor's GetStatus |
| 94 // method up to 3 times with a 30-second gap to attempt to retrieve the
final |
| 95 // information. After more than 3 times, DM will give up and mark the ta
sk as |
| 96 // expired. |
| 97 // |
| 98 // If the distributor doesn't intend to use Pubsub for notifying DM abou
t the |
| 99 // final status of the job, set timeToStop to the amount of time you wan
t DM |
| 100 // to wait before polling GetStatus. e.g. if after calling FinishAttempt
or |
| 101 // EnsureGraphData your distributor needs 10 seconds before it can corre
ctly |
| 102 // respond to a GetStatus request, you should set timeToStop to >= 10s. |
| 103 // Otherwise timeToStop should be set fairly high (e.g. 12 hours) as a h
edge |
| 104 // against a broken pubsub notification pipeline. |
| 105 // |
| 106 // If you have the choice between pubsub or not, prefer to use pubsub as
it |
| 107 // allows DM to more proactively update the graph state (and unblock wai
ting |
| 108 // Attempts, etc.) |
| 109 Run(*TaskDescription) (tok Token, timeToStart, timeToRun, timeToStop tim
e.Duration, err error) |
| 110 |
| 111 // Cancel attempts to cancel a running task. If a task is canceled more
than |
| 112 // once, this should return nil. |
| 113 Cancel(Token) error |
| 114 |
| 115 // GetStatus retrieves the current state of the task from the distributo
r. |
| 116 // |
| 117 // If this returns a non-Transient error more than 30 seconds after the
task |
| 118 // was Run(), the execution will be marked Missing with the returned err
or |
| 119 // message as the 'Reason'. If it returns a non-Transient error within 3
0 |
| 120 // seconds of being run, DM will automatically treat that as Transient. |
| 121 GetStatus(Token) (*TaskResult, error) |
| 122 |
| 123 // InfoURL calculates a user-presentable information url for the task |
| 124 // identified by Token. This should be a local operation, so it is not t
he |
| 125 // implementation's responsibility to validate the token in this method
(e.g. |
| 126 // it could point to a non-existant job, etc.) |
| 127 InfoURL(Token) string |
| 128 |
| 129 // HandleNotification is called whenever DM receives a PubSub message se
nt to |
| 130 // a topic created with TaskDescription.PrepareTopic. The Attrs map will
omit |
| 131 // the 'auth_token' field. |
| 132 // |
| 133 // Returning (nil, nil) will indicate that DM should ignore this notific
ation. |
| 134 // |
| 135 // DM will convert pubsub Messages to a delayed GetStatus if a pubsub me
ssage |
| 136 // is delivered which refers to an Attempt whose status is NeedsExecutio
n, |
| 137 // which could happen in the event of a not-fully-settled transacion. |
| 138 // |
| 139 // DM will ignore any notifications for executions which it doesn't know |
| 140 // about. |
| 141 HandleNotification(notification *Notification) (*TaskResult, error) |
| 142 |
| 143 // HandleTaskQueueTask is called if the distributor used Config.EnqueueT
ask. |
| 144 // |
| 145 // It may return zero or more Notifications for DM about arbitrary Execu
tions. |
| 146 // These notifications will be handled 'later' by the HandleNotification |
| 147 // implementation. |
| 148 HandleTaskQueueTask(r *http.Request) ([]*Notification, error) |
| 149 |
| 150 // Validate should return a non-nil error if the given payload is not |
| 151 // appropriate for this Distributor. Payload is guaranteed to be a valid |
| 152 // JSON object. This should validate that the content of that JSON objec
t is |
| 153 // what the distributor expects. |
| 154 Validate(payload string) error |
| 155 } |
| 156 |
| 157 // Factory is a function which produces new distributor instance with the |
| 158 // provided configuration proto. |
| 159 // |
| 160 // c is guaranteed to be non-transactional. |
| 161 type Factory func(c context.Context, dist *Config) (D, error) |
OLD | NEW |