| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 | 110 |
| 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { | 111 func (svc *prodSwarmingService) getSwarmingRequest(c context.Context, taskID str
ing) (*swarming.SwarmingRpcsTaskRequest, error) { |
| 112 return svc.client.Task.Request(taskID).Context(c).Do() | 112 return svc.client.Task.Request(taskID).Context(c).Do() |
| 113 } | 113 } |
| 114 | 114 |
| 115 type swarmingFetchParams struct { | 115 type swarmingFetchParams struct { |
| 116 fetchReq bool | 116 fetchReq bool |
| 117 fetchRes bool | 117 fetchRes bool |
| 118 fetchLog bool | 118 fetchLog bool |
| 119 | 119 |
| 120 » // taskTagCallback, if not nil, is a callback that will be invoked after | 120 » // taskResCallback, if not nil, is a callback that will be invoked after |
| 121 // fetching the result, if fetchRes is true. It will be passed a key/val
ue map | 121 // fetching the result, if fetchRes is true. It will be passed a key/val
ue map |
| 122 // of the Swarming result's tags. | 122 // of the Swarming result's tags. |
| 123 // | 123 // |
| 124 » // If taskTagCallback returns true, any pending log fetch will be cancel
led | 124 » // If taskResCallback returns true, any pending log fetch will be cancel
led |
| 125 // without error. | 125 // without error. |
| 126 » taskTagCallback func(map[string]string) bool | 126 » taskResCallback func(*swarming.SwarmingRpcsTaskResult) bool |
| 127 } | 127 } |
| 128 | 128 |
| 129 type swarmingFetchResult struct { | 129 type swarmingFetchResult struct { |
| 130 req *swarming.SwarmingRpcsTaskRequest | 130 req *swarming.SwarmingRpcsTaskRequest |
| 131 res *swarming.SwarmingRpcsTaskResult | 131 res *swarming.SwarmingRpcsTaskResult |
| 132 | 132 |
| 133 // log is the log data content. If no log data was fetched, this will em
pty. | 133 // log is the log data content. If no log data was fetched, this will em
pty. |
| 134 // If the log fetch was cancelled, this is undefined. | 134 // If the log fetch was cancelled, this is undefined. |
| 135 log string | 135 log string |
| 136 } | 136 } |
| 137 | 137 |
| 138 // swarmingFetch fetches (in parallel) the components that it is configured to | 138 // swarmingFetch fetches (in parallel) the components that it is configured to |
| 139 // fetch. | 139 // fetch. |
| 140 // | 140 // |
| 141 // After fetching, an ACL check is performed to confirm that the user is | 141 // After fetching, an ACL check is performed to confirm that the user is |
| 142 // permitted to view the resulting data. If this check fails, get returns | 142 // permitted to view the resulting data. If this check fails, get returns |
| 143 // errNotMiloJob. | 143 // errNotMiloJob. |
| 144 // | 144 // |
| 145 // TODO(hinoka): Make this ACL check something more specific than the | 145 // TODO(hinoka): Make this ACL check something more specific than the |
| 146 // resence of the "allow_milo" dimension. | 146 // resence of the "allow_milo" dimension. |
| 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( | 147 func swarmingFetch(c context.Context, svc swarmingService, taskID string, req sw
armingFetchParams) ( |
| 148 *swarmingFetchResult, error) { | 148 *swarmingFetchResult, error) { |
| 149 | 149 |
| 150 // logErr is managed separately from other fetch errors, since in some | 150 // logErr is managed separately from other fetch errors, since in some |
| 151 // situations it's acceptable to not have a log stream. | 151 // situations it's acceptable to not have a log stream. |
| 152 var logErr error | 152 var logErr error |
| 153 var fr swarmingFetchResult | 153 var fr swarmingFetchResult |
| 154 var resTags map[string]string | |
| 155 | 154 |
| 156 // Special Context to enable the cancellation of log fetching. | 155 // Special Context to enable the cancellation of log fetching. |
| 157 logsCancelled := false | 156 logsCancelled := false |
| 158 logCtx, cancelLogs := context.WithCancel(c) | 157 logCtx, cancelLogs := context.WithCancel(c) |
| 159 defer cancelLogs() | 158 defer cancelLogs() |
| 160 | 159 |
| 161 err := parallel.FanOutIn(func(workC chan<- func() error) { | 160 err := parallel.FanOutIn(func(workC chan<- func() error) { |
| 162 if req.fetchReq { | 161 if req.fetchReq { |
| 163 workC <- func() (err error) { | 162 workC <- func() (err error) { |
| 164 fr.req, err = svc.getSwarmingRequest(c, taskID) | 163 fr.req, err = svc.getSwarmingRequest(c, taskID) |
| 165 return | 164 return |
| 166 } | 165 } |
| 167 } | 166 } |
| 168 | 167 |
| 169 if req.fetchRes { | 168 if req.fetchRes { |
| 170 workC <- func() (err error) { | 169 workC <- func() (err error) { |
| 171 if fr.res, err = svc.getSwarmingResult(c, taskID
); err == nil { | 170 if fr.res, err = svc.getSwarmingResult(c, taskID
); err == nil { |
| 172 » » » » » resTags = swarmingTags(fr.res.Tags) | 171 » » » » » if req.taskResCallback != nil && req.tas
kResCallback(fr.res) { |
| 173 » » » » » if req.taskTagCallback != nil && req.tas
kTagCallback(resTags) { | |
| 174 logsCancelled = true | 172 logsCancelled = true |
| 175 cancelLogs() | 173 cancelLogs() |
| 176 } | 174 } |
| 177 } | 175 } |
| 178 return | 176 return |
| 179 } | 177 } |
| 180 } | 178 } |
| 181 | 179 |
| 182 if req.fetchLog { | 180 if req.fetchLog { |
| 183 workC <- func() error { | 181 workC <- func() error { |
| (...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 506 | 504 |
| 507 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { | 505 func (bl *buildLoader) swarmingBuildImpl(c context.Context, svc swarmingService,
linkBase, taskID string) (*resp.MiloBuild, error) { |
| 508 // Fetch the data from Swarming | 506 // Fetch the data from Swarming |
| 509 var logDogStreamAddr *types.StreamAddr | 507 var logDogStreamAddr *types.StreamAddr |
| 510 | 508 |
| 511 fetchParams := swarmingFetchParams{ | 509 fetchParams := swarmingFetchParams{ |
| 512 fetchRes: true, | 510 fetchRes: true, |
| 513 fetchLog: true, | 511 fetchLog: true, |
| 514 | 512 |
| 515 // Cancel if LogDog annotation stream parameters are present in
the tag set. | 513 // Cancel if LogDog annotation stream parameters are present in
the tag set. |
| 516 » » taskTagCallback: func(tags map[string]string) (cancelLogs bool)
{ | 514 » » taskResCallback: func(res *swarming.SwarmingRpcsTaskResult) (can
celLogs bool) { |
| 515 » » » tags := swarmingTags(res.Tags) |
| 516 |
| 517 var err error | 517 var err error |
| 518 » » » if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags); err != nil { | 518 » » » if logDogStreamAddr, err = resolveLogDogStreamAddrFromTa
gs(tags, res.TaskId, res.TryNumber); err != nil { |
| 519 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") | 519 logging.WithError(err).Debugf(c, "Not using LogD
og annotation stream.") |
| 520 return false | 520 return false |
| 521 } | 521 } |
| 522 return true | 522 return true |
| 523 }, | 523 }, |
| 524 } | 524 } |
| 525 fr, err := swarmingFetch(c, svc, taskID, fetchParams) | 525 fr, err := swarmingFetch(c, svc, taskID, fetchParams) |
| 526 if err != nil { | 526 if err != nil { |
| 527 return nil, err | 527 return nil, err |
| 528 } | 528 } |
| (...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 665 for _, tag := range v { | 665 for _, tag := range v { |
| 666 var value string | 666 var value string |
| 667 parts := strings.SplitN(tag, ":", 2) | 667 parts := strings.SplitN(tag, ":", 2) |
| 668 if len(parts) == 2 { | 668 if len(parts) == 2 { |
| 669 value = parts[1] | 669 value = parts[1] |
| 670 } | 670 } |
| 671 res[parts[0]] = value | 671 res[parts[0]] = value |
| 672 } | 672 } |
| 673 return res | 673 return res |
| 674 } | 674 } |
| OLD | NEW |