| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 ds "github.com/luci/gae/service/datastore" | |
| 11 "github.com/luci/luci-go/common/config" | 10 "github.com/luci/luci-go/common/config" |
| 11 "github.com/luci/luci-go/common/errors" |
| 12 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/proto/google" |
| 13 "github.com/luci/luci-go/common/retry" | 14 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/grpc/grpcutil" | 15 "github.com/luci/luci-go/grpc/grpcutil" |
| 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 16 "github.com/luci/luci-go/logdog/api/logpb" | 17 "github.com/luci/luci-go/logdog/api/logpb" |
| 17 "github.com/luci/luci-go/logdog/appengine/coordinator" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 18 "github.com/luci/luci-go/logdog/common/storage" | 19 "github.com/luci/luci-go/logdog/common/storage" |
| 19 "github.com/luci/luci-go/logdog/common/storage/archive" | |
| 20 "github.com/luci/luci-go/logdog/common/types" | 20 "github.com/luci/luci-go/logdog/common/types" |
| 21 | 21 |
| 22 ds "github.com/luci/gae/service/datastore" |
| 23 |
| 22 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 23 "google.golang.org/grpc/codes" | 25 "google.golang.org/grpc/codes" |
| 24 ) | 26 ) |
| 25 | 27 |
| 26 const ( | 28 const ( |
| 27 // getInitialArraySize is the initial amount of log slots to allocate fo
r a | 29 // getInitialArraySize is the initial amount of log slots to allocate fo
r a |
| 28 // Get request. | 30 // Get request. |
| 29 getInitialArraySize = 256 | 31 getInitialArraySize = 256 |
| 30 | 32 |
| 31 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. | 33 // getBytesLimit is the maximum amount of data that we are willing to qu
ery. |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 87 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. | 89 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. |
| 88 if ls.Purged { | 90 if ls.Purged { |
| 89 if authErr := coordinator.IsAdminUser(c); authErr != nil { | 91 if authErr := coordinator.IsAdminUser(c); authErr != nil { |
| 90 log.Fields{ | 92 log.Fields{ |
| 91 log.ErrorKey: authErr, | 93 log.ErrorKey: authErr, |
| 92 }.Warningf(c, "Non-superuser requested purged log.") | 94 }.Warningf(c, "Non-superuser requested purged log.") |
| 93 return nil, grpcutil.Errf(codes.NotFound, "path not foun
d") | 95 return nil, grpcutil.Errf(codes.NotFound, "path not foun
d") |
| 94 } | 96 } |
| 95 } | 97 } |
| 96 | 98 |
| 97 // If nothing was requested, return nothing. | |
| 98 resp := logdog.GetResponse{} | 99 resp := logdog.GetResponse{} |
| 99 if !(req.State || tail) && req.LogCount < 0 { | |
| 100 return &resp, nil | |
| 101 } | |
| 102 | |
| 103 if req.State { | 100 if req.State { |
| 104 resp.State = buildLogStreamState(ls, lst) | 101 resp.State = buildLogStreamState(ls, lst) |
| 105 | 102 |
| 106 var err error | 103 var err error |
| 107 resp.Desc, err = ls.DescriptorValue() | 104 resp.Desc, err = ls.DescriptorValue() |
| 108 if err != nil { | 105 if err != nil { |
| 109 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") | 106 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") |
| 110 return nil, grpcutil.Internal | 107 return nil, grpcutil.Internal |
| 111 } | 108 } |
| 112 } | 109 } |
| 113 | 110 |
| 114 // Retrieve requested logs from storage, if requested. | 111 // Retrieve requested logs from storage, if requested. |
| 115 » if tail || req.LogCount >= 0 { | 112 » if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil { |
| 116 » » var err error | 113 » » log.WithError(err).Errorf(c, "Failed to get logs.") |
| 117 » » resp.Logs, err = s.getLogs(c, req, tail, ls, lst) | 114 » » return nil, grpcutil.Internal |
| 118 » » if err != nil { | |
| 119 » » » log.WithError(err).Errorf(c, "Failed to get logs.") | |
| 120 » » » return nil, grpcutil.Internal | |
| 121 » » } | |
| 122 } | 115 } |
| 123 | 116 |
| 124 log.Fields{ | 117 log.Fields{ |
| 125 "logCount": len(resp.Logs), | 118 "logCount": len(resp.Logs), |
| 126 }.Debugf(c, "Get request completed successfully.") | 119 }.Debugf(c, "Get request completed successfully.") |
| 127 return &resp, nil | 120 return &resp, nil |
| 128 } | 121 } |
| 129 | 122 |
| 130 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l
s *coordinator.LogStream, | 123 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog
.GetResponse, |
| 131 » lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) { | 124 » tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) e
rror { |
| 132 » byteLimit := int(req.ByteCount) | 125 |
| 133 » if byteLimit <= 0 || byteLimit > getBytesLimit { | 126 » // Identify our URL signing parameters. |
| 134 » » byteLimit = getBytesLimit | 127 » var signingRequest coordinator.URLSigningRequest |
| 128 » if sr := req.GetSignedUrls; sr != nil { |
| 129 » » signingRequest.Lifetime = sr.Lifetime.Duration() |
| 130 » » signingRequest.Stream = sr.Stream |
| 131 » » signingRequest.Index = sr.Index |
| 132 » } |
| 133 » if !tail && req.LogCount < 0 && !signingRequest.HasWork() { |
| 134 » » // No log operations are acutally needed, so don't bother instan
ting our |
| 135 » » // Storage instance only to do nothing. |
| 136 » » return nil |
| 135 } | 137 } |
| 136 | 138 |
| 137 svc := coordinator.GetServices(c) | 139 svc := coordinator.GetServices(c) |
| 138 » var st storage.Storage | 140 » st, err := svc.StorageForStream(c, lst) |
| 139 » if !lst.ArchivalState().Archived() { | 141 » if err != nil { |
| 140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") | 142 » » return errors.Annotate(err).InternalReason("failed to create sto
rage instance").Err() |
| 141 | |
| 142 » » // Logs are not archived. Fetch from intermediate storage. | |
| 143 » » var err error | |
| 144 » » st, err = svc.IntermediateStorage(c) | |
| 145 » » if err != nil { | |
| 146 » » » return nil, err | |
| 147 » » } | |
| 148 » } else { | |
| 149 » » log.Fields{ | |
| 150 » » » "indexURL": lst.ArchiveIndexURL, | |
| 151 » » » "streamURL": lst.ArchiveStreamURL, | |
| 152 » » » "archiveTime": lst.ArchivedTime, | |
| 153 » » }.Debugf(c, "Log is archived. Fetching from archive storage.") | |
| 154 | |
| 155 » » var err error | |
| 156 » » gs, err := svc.GSClient(c) | |
| 157 » » if err != nil { | |
| 158 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage client.") | |
| 159 » » » return nil, err | |
| 160 » » } | |
| 161 » » defer func() { | |
| 162 » » » if err := gs.Close(); err != nil { | |
| 163 » » » » log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") | |
| 164 » » » } | |
| 165 » » }() | |
| 166 | |
| 167 » » st, err = archive.New(c, archive.Options{ | |
| 168 » » » IndexURL: lst.ArchiveIndexURL, | |
| 169 » » » StreamURL: lst.ArchiveStreamURL, | |
| 170 » » » Client: gs, | |
| 171 » » » Cache: svc.StorageCache(), | |
| 172 » » }) | |
| 173 » » if err != nil { | |
| 174 » » » log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | |
| 175 » » » return nil, err | |
| 176 » » } | |
| 177 } | 143 } |
| 178 defer st.Close() | 144 defer st.Close() |
| 179 | 145 |
| 180 project, path := coordinator.Project(c), ls.Path() | 146 project, path := coordinator.Project(c), ls.Path() |
| 181 | 147 |
| 182 var fetchedLogs []*logpb.LogEntry | |
| 183 var err error | |
| 184 if tail { | 148 if tail { |
| 185 » » fetchedLogs, err = getTail(c, st, project, path) | 149 » » resp.Logs, err = getTail(c, st, project, path) |
| 186 » } else { | 150 » } else if req.LogCount >= 0 { |
| 187 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) | 151 » » byteLimit := int(req.ByteCount) |
| 152 » » if byteLimit <= 0 || byteLimit > getBytesLimit { |
| 153 » » » byteLimit = getBytesLimit |
| 154 » » } |
| 155 |
| 156 » » resp.Logs, err = getHead(c, req, st, project, path, byteLimit) |
| 188 } | 157 } |
| 189 if err != nil { | 158 if err != nil { |
| 190 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 159 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 191 » » return nil, err | 160 » » return err |
| 192 } | 161 } |
| 193 | 162 |
| 194 » return fetchedLogs, nil | 163 » // If we're requesting a signedl URL, try and get that too. |
| 164 » if signingRequest.HasWork() { |
| 165 » » signedURLs, err := st.GetSignedURLs(c, &signingRequest) |
| 166 » » switch { |
| 167 » » case err != nil: |
| 168 » » » return errors.Annotate(err).InternalReason("failed to ge
nerate signed URL").Err() |
| 169 |
| 170 » » case signedURLs == nil: |
| 171 » » » log.Debugf(c, "Signed URL was requested, but is not supp
orted by storage.") |
| 172 |
| 173 » » default: |
| 174 » » » resp.SignedUrls = &logdog.GetResponse_SignedUrls{ |
| 175 » » » » Expiration: google.NewTimestamp(signedURLs.Expir
ation), |
| 176 » » » » Stream: signedURLs.Stream, |
| 177 » » » » Index: signedURLs.Index, |
| 178 » » » } |
| 179 » » } |
| 180 » } |
| 181 |
| 182 » return nil |
| 195 } | 183 } |
| 196 | 184 |
| 197 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
ect config.ProjectName, | 185 func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage,
project config.ProjectName, |
| 198 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { | 186 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { |
| 199 log.Fields{ | 187 log.Fields{ |
| 200 "project": project, | 188 "project": project, |
| 201 "path": path, | 189 "path": path, |
| 202 "index": req.Index, | 190 "index": req.Index, |
| 203 "count": req.LogCount, | 191 "count": req.LogCount, |
| 204 "bytes": req.ByteCount, | 192 "bytes": req.ByteCount, |
| 205 "noncontiguous": req.NonContiguous, | 193 "noncontiguous": req.NonContiguous, |
| 206 }.Debugf(c, "Issuing Get request.") | 194 }.Debugf(c, "Issuing Get request.") |
| 207 | 195 |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 271 log.Fields{ | 259 log.Fields{ |
| 272 log.ErrorKey: err, | 260 log.ErrorKey: err, |
| 273 "initialIndex": req.Index, | 261 "initialIndex": req.Index, |
| 274 "nextIndex": sreq.Index, | 262 "nextIndex": sreq.Index, |
| 275 "count": len(logs), | 263 "count": len(logs), |
| 276 }.Errorf(c, "Failed to execute range request.") | 264 }.Errorf(c, "Failed to execute range request.") |
| 277 return nil, err | 265 return nil, err |
| 278 } | 266 } |
| 279 } | 267 } |
| 280 | 268 |
| 281 func getTail(c context.Context, st storage.Storage, project config.ProjectName,
path types.StreamPath) ( | 269 func getTail(c context.Context, st coordinator.Storage, project config.ProjectNa
me, path types.StreamPath) ( |
| 282 []*logpb.LogEntry, error) { | 270 []*logpb.LogEntry, error) { |
| 283 log.Fields{ | 271 log.Fields{ |
| 284 "project": project, | 272 "project": project, |
| 285 "path": path, | 273 "path": path, |
| 286 }.Debugf(c, "Issuing Tail request.") | 274 }.Debugf(c, "Issuing Tail request.") |
| 287 | 275 |
| 288 var e *storage.Entry | 276 var e *storage.Entry |
| 289 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { | 277 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { |
| 290 e, err = st.Tail(project, path) | 278 e, err = st.Tail(project, path) |
| 291 return | 279 return |
| (...skipping 13 matching lines...) Expand all Loading... |
| 305 return []*logpb.LogEntry{le}, nil | 293 return []*logpb.LogEntry{le}, nil |
| 306 | 294 |
| 307 case storage.ErrDoesNotExist: | 295 case storage.ErrDoesNotExist: |
| 308 return nil, nil | 296 return nil, nil |
| 309 | 297 |
| 310 default: | 298 default: |
| 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 299 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 312 return nil, err | 300 return nil, err |
| 313 } | 301 } |
| 314 } | 302 } |
| OLD | NEW |