| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logdog | 5 package logdog |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 "strings" | 12 "strings" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/luci-go/common/config" | 15 "github.com/luci/luci-go/common/config" |
| 16 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/proto/google" | 17 "github.com/luci/luci-go/common/proto/google" |
| 18 miloProto "github.com/luci/luci-go/common/proto/milo" | 18 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 19 "github.com/luci/luci-go/grpc/grpcutil" | 19 "github.com/luci/luci-go/grpc/grpcutil" |
| 20 "github.com/luci/luci-go/grpc/prpc" | |
| 21 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | |
| 22 "github.com/luci/luci-go/logdog/api/logpb" | 20 "github.com/luci/luci-go/logdog/api/logpb" |
| 21 "github.com/luci/luci-go/logdog/client/coordinator" |
| 23 "github.com/luci/luci-go/logdog/common/types" | 22 "github.com/luci/luci-go/logdog/common/types" |
| 24 "github.com/luci/luci-go/milo/api/resp" | 23 "github.com/luci/luci-go/milo/api/resp" |
| 25 "github.com/luci/luci-go/milo/appengine/logdog/internal" | 24 "github.com/luci/luci-go/milo/appengine/logdog/internal" |
| 26 "github.com/luci/luci-go/milo/common/miloerror" | 25 "github.com/luci/luci-go/milo/common/miloerror" |
| 27 | 26 |
| 28 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
| 29 mc "github.com/luci/gae/service/memcache" | 28 mc "github.com/luci/gae/service/memcache" |
| 30 "golang.org/x/net/context" | 29 "golang.org/x/net/context" |
| 31 "google.golang.org/grpc/codes" | 30 "google.golang.org/grpc/codes" |
| 32 ) | 31 ) |
| (...skipping 12 matching lines...) Expand all Loading... |
| 45 type annotationStreamRequest struct { | 44 type annotationStreamRequest struct { |
| 46 *AnnotationStream | 45 *AnnotationStream |
| 47 | 46 |
| 48 // host is the name of the LogDog host. | 47 // host is the name of the LogDog host. |
| 49 host string | 48 host string |
| 50 | 49 |
| 51 project config.ProjectName | 50 project config.ProjectName |
| 52 path types.StreamPath | 51 path types.StreamPath |
| 53 | 52 |
| 54 // logDogClient is the HTTP client to use for LogDog communication. | 53 // logDogClient is the HTTP client to use for LogDog communication. |
| 55 » logDogClient http.Client | 54 » logDogClient *coordinator.Client |
| 56 | 55 |
| 57 // cs is the unmarshalled annotation stream Step and associated data. | 56 // cs is the unmarshalled annotation stream Step and associated data. |
| 58 cs internal.CachedStep | 57 cs internal.CachedStep |
| 59 } | 58 } |
| 60 | 59 |
| 61 func (as *annotationStreamRequest) normalize() error { | 60 func (as *annotationStreamRequest) normalize() error { |
| 62 if err := as.project.Validate(); err != nil { | 61 if err := as.project.Validate(); err != nil { |
| 63 return &miloerror.Error{ | 62 return &miloerror.Error{ |
| 64 Message: "Invalid project name", | 63 Message: "Invalid project name", |
| 65 Code: http.StatusBadRequest, | 64 Code: http.StatusBadRequest, |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 115 break | 114 break |
| 116 | 115 |
| 117 default: | 116 default: |
| 118 log.Fields{ | 117 log.Fields{ |
| 119 log.ErrorKey: err, | 118 log.ErrorKey: err, |
| 120 "memcacheKey": mcKey, | 119 "memcacheKey": mcKey, |
| 121 }.Errorf(c, "Failed to load annotation protobuf memcache cached
step.") | 120 }.Errorf(c, "Failed to load annotation protobuf memcache cached
step.") |
| 122 } | 121 } |
| 123 | 122 |
| 124 // Load from LogDog directly. | 123 // Load from LogDog directly. |
| 125 client := logdog.NewLogsPRPCClient(&prpc.Client{ | |
| 126 C: &as.logDogClient, | |
| 127 Host: as.host, | |
| 128 }) | |
| 129 | |
| 130 log.Fields{ | 124 log.Fields{ |
| 131 "project": as.project, | 125 "project": as.project, |
| 132 "path": as.path, | 126 "path": as.path, |
| 133 "host": as.host, | 127 "host": as.host, |
| 134 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") | 128 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") |
| 135 » resp, err := client.Tail(c, &logdog.TailRequest{ | 129 |
| 136 » » Project: string(as.project), | 130 » var ( |
| 137 » » Path: string(as.path), | 131 » » state coordinator.LogStream |
| 138 » » State: true, | 132 » » stream = as.logDogClient.Stream(as.project, as.path) |
| 139 » }) | 133 » ) |
| 134 » le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com
plete()) |
| 140 switch code := grpcutil.Code(err); code { | 135 switch code := grpcutil.Code(err); code { |
| 141 case codes.OK: | 136 case codes.OK: |
| 142 break | 137 break |
| 143 | 138 |
| 144 case codes.NotFound: | 139 case codes.NotFound: |
| 145 return &miloerror.Error{ | 140 return &miloerror.Error{ |
| 146 Message: "Stream not found", | 141 Message: "Stream not found", |
| 147 Code: http.StatusNotFound, | 142 Code: http.StatusNotFound, |
| 148 } | 143 } |
| 149 | 144 |
| 150 default: | 145 default: |
| 151 // TODO: Once we switch to delegation tokens and are making the
request on | 146 // TODO: Once we switch to delegation tokens and are making the
request on |
| 152 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. | 147 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. |
| 153 log.Fields{ | 148 log.Fields{ |
| 154 log.ErrorKey: err, | 149 log.ErrorKey: err, |
| 155 "code": code, | 150 "code": code, |
| 156 }.Errorf(c, "Failed to load LogDog annotation stream.") | 151 }.Errorf(c, "Failed to load LogDog annotation stream.") |
| 157 return &miloerror.Error{ | 152 return &miloerror.Error{ |
| 158 Message: "Failed to load stream", | 153 Message: "Failed to load stream", |
| 159 Code: http.StatusInternalServerError, | 154 Code: http.StatusInternalServerError, |
| 160 } | 155 } |
| 161 } | 156 } |
| 162 | 157 |
| 163 // Make sure that this is an annotation stream. | 158 // Make sure that this is an annotation stream. |
| 164 switch { | 159 switch { |
| 165 » case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: | 160 » case state.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| 166 return &miloerror.Error{ | 161 return &miloerror.Error{ |
| 167 Message: "Requested stream is not a Milo annotation prot
obuf", | 162 Message: "Requested stream is not a Milo annotation prot
obuf", |
| 168 Code: http.StatusBadRequest, | 163 Code: http.StatusBadRequest, |
| 169 } | 164 } |
| 170 | 165 |
| 171 » case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: | 166 » case state.Desc.StreamType != logpb.StreamType_DATAGRAM: |
| 172 return &miloerror.Error{ | 167 return &miloerror.Error{ |
| 173 Message: "Requested stream is not a datagram stream", | 168 Message: "Requested stream is not a datagram stream", |
| 174 Code: http.StatusBadRequest, | 169 Code: http.StatusBadRequest, |
| 175 } | 170 } |
| 176 | 171 |
| 177 » case len(resp.Logs) == 0: | 172 » case le == nil: |
| 178 // No annotation stream data, so render a minimal page. | 173 // No annotation stream data, so render a minimal page. |
| 179 return nil | 174 return nil |
| 180 } | 175 } |
| 181 | 176 |
| 182 // Get the last log entry in the stream. In reality, this will be index
0, | 177 // Get the last log entry in the stream. In reality, this will be index
0, |
| 183 // since the "Tail" call should only return one log entry. | 178 // since the "Tail" call should only return one log entry. |
| 184 » latestStream := resp.Logs[len(resp.Logs)-1] | 179 » // |
| 185 » dg := latestStream.GetDatagram() | 180 » // Because we supplied the "Complete" flag to Tail and suceeded, this da
tagram |
| 186 » switch { | 181 » // will be complete even if its source datagram(s) are fragments. |
| 187 » case dg == nil: | 182 » dg := le.GetDatagram() |
| 183 » if dg == nil { |
| 188 return &miloerror.Error{ | 184 return &miloerror.Error{ |
| 189 Message: "Datagram stream does not have datagram data", | 185 Message: "Datagram stream does not have datagram data", |
| 190 Code: http.StatusInternalServerError, | 186 Code: http.StatusInternalServerError, |
| 191 } | 187 } |
| 192 | |
| 193 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): | |
| 194 // LogDog splits large datagrams into consecutive fragments. If
the | |
| 195 // annotation state is fragmented, a reconstruction algorithm wi
ll have to | |
| 196 // be employed here to build the full datagram before processing
. | |
| 197 // | |
| 198 // At the moment, no annotation streams are expected to be anywh
ere close to | |
| 199 // this large, so we're going to handle this case by erroring. A | |
| 200 // reconstruction algorithm would look like: | |
| 201 // 1) "Tail" to get the latest datagram, identify it as partial. | |
| 202 // 1a) Perform a bounds check on the total datagram size to ensu
re that it | |
| 203 // can be safely reconstructed. | |
| 204 // 2) Determine if it's the last partial index. If not, then the
latest | |
| 205 // datagram is incomplete. Determine our initial datagram's
stream index | |
| 206 // the by subtracting the partial index from this message's
stream index. | |
| 207 // 2a) If this datagram index is "0", the first datagram in the
stream is | |
| 208 // partial and all of the data isn't here, so treat this as
"no data". | |
| 209 // 2b) Otherwise, goto (1), using "Get" request on the datagram
index minus | |
| 210 // one to get the previous datagram. | |
| 211 // 3) Issue a "Get" request for our initial datagram index throu
gh the index | |
| 212 // preceding ours. | |
| 213 // 4) Reassemble the binary data from the full set of datagrams. | |
| 214 return &miloerror.Error{ | |
| 215 Message: "Partial datagram streams are not supported yet
", | |
| 216 Code: http.StatusNotImplemented, | |
| 217 } | |
| 218 } | 188 } |
| 219 | 189 |
| 220 // Attempt to decode the Step protobuf. | 190 // Attempt to decode the Step protobuf. |
| 221 var step miloProto.Step | 191 var step miloProto.Step |
| 222 if err := proto.Unmarshal(dg.Data, &step); err != nil { | 192 if err := proto.Unmarshal(dg.Data, &step); err != nil { |
| 223 return &miloerror.Error{ | 193 return &miloerror.Error{ |
| 224 Message: "Failed to unmarshal annotation protobuf", | 194 Message: "Failed to unmarshal annotation protobuf", |
| 225 Code: http.StatusInternalServerError, | 195 Code: http.StatusInternalServerError, |
| 226 } | 196 } |
| 227 } | 197 } |
| (...skipping 14 matching lines...) Expand all Loading... |
| 242 } | 212 } |
| 243 } | 213 } |
| 244 if latestEndedTime.IsZero() { | 214 if latestEndedTime.IsZero() { |
| 245 // No substep had an ended time :( | 215 // No substep had an ended time :( |
| 246 latestEndedTime = step.Started.Time() | 216 latestEndedTime = step.Started.Time() |
| 247 } | 217 } |
| 248 | 218 |
| 249 // Build our CachedStep. | 219 // Build our CachedStep. |
| 250 as.cs = internal.CachedStep{ | 220 as.cs = internal.CachedStep{ |
| 251 Step: &step, | 221 Step: &step, |
| 252 » » Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI
ndex == uint64(resp.State.TerminalIndex)), | 222 » » Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == u
int64(state.State.TerminalIndex)), |
| 253 } | 223 } |
| 254 | 224 |
| 255 // Annotee is apparently not putting an ended time on some annotation pr
otos. | 225 // Annotee is apparently not putting an ended time on some annotation pr
otos. |
| 256 // This hack will ensure that a finished build will always have an ended
time. | 226 // This hack will ensure that a finished build will always have an ended
time. |
| 257 if as.cs.Finished && as.cs.Step.Ended == nil { | 227 if as.cs.Finished && as.cs.Step.Ended == nil { |
| 258 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime) | 228 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime) |
| 259 } | 229 } |
| 260 | 230 |
| 261 // Marshal and cache the step. If this is the final protobuf in the stre
am, | 231 // Marshal and cache the step. If this is the final protobuf in the stre
am, |
| 262 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet
ime. | 232 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet
ime. |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 354 if link.Label == "" { | 324 if link.Label == "" { |
| 355 link.Label = "unnamed" | 325 link.Label = "unnamed" |
| 356 } | 326 } |
| 357 return &link | 327 return &link |
| 358 | 328 |
| 359 default: | 329 default: |
| 360 // Don't know how to render. | 330 // Don't know how to render. |
| 361 return nil | 331 return nil |
| 362 } | 332 } |
| 363 } | 333 } |
| OLD | NEW |