Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 ds "github.com/luci/gae/service/datastore" | |
| 11 "github.com/luci/luci-go/common/config" | 10 "github.com/luci/luci-go/common/config" |
| 11 "github.com/luci/luci-go/common/errors" | |
| 12 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/common/proto/google" | |
| 13 "github.com/luci/luci-go/common/retry" | 14 "github.com/luci/luci-go/common/retry" |
| 14 "github.com/luci/luci-go/grpc/grpcutil" | 15 "github.com/luci/luci-go/grpc/grpcutil" |
| 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 16 "github.com/luci/luci-go/logdog/api/logpb" | 17 "github.com/luci/luci-go/logdog/api/logpb" |
| 17 "github.com/luci/luci-go/logdog/appengine/coordinator" | 18 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 18 "github.com/luci/luci-go/logdog/common/storage" | 19 "github.com/luci/luci-go/logdog/common/storage" |
| 19 "github.com/luci/luci-go/logdog/common/storage/archive" | |
| 20 "github.com/luci/luci-go/logdog/common/types" | 20 "github.com/luci/luci-go/logdog/common/types" |
| 21 | 21 |
| 22 ds "github.com/luci/gae/service/datastore" | |
| 23 | |
| 22 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 23 "google.golang.org/grpc/codes" | 25 "google.golang.org/grpc/codes" |
| 24 ) | 26 ) |
| 25 | 27 |
| 26 const ( | 28 const ( |
| 27 // getInitialArraySize is the initial amount of log slots to allocate fo r a | 29 // getInitialArraySize is the initial amount of log slots to allocate fo r a |
| 28 // Get request. | 30 // Get request. |
| 29 getInitialArraySize = 256 | 31 getInitialArraySize = 256 |
| 30 | 32 |
| 31 // getBytesLimit is the maximum amount of data that we are willing to qu ery. | 33 // getBytesLimit is the maximum amount of data that we are willing to qu ery. |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 52 } | 54 } |
| 53 return s.getImpl(c, &r, true) | 55 return s.getImpl(c, &r, true) |
| 54 } | 56 } |
| 55 | 57 |
| 56 // getImpl is common code shared between Get and Tail endpoints. | 58 // getImpl is common code shared between Get and Tail endpoints. |
| 57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) { | 59 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) { |
| 58 log.Fields{ | 60 log.Fields{ |
| 59 "project": req.Project, | 61 "project": req.Project, |
| 60 "path": req.Path, | 62 "path": req.Path, |
| 61 "index": req.Index, | 63 "index": req.Index, |
| 64 "sign": req.SignEntryUrlLifetime.Duration(), | |
| 62 "tail": tail, | 65 "tail": tail, |
| 63 }.Debugf(c, "Received get request.") | 66 }.Debugf(c, "Received get request.") |
| 64 | 67 |
| 65 path := types.StreamPath(req.Path) | 68 path := types.StreamPath(req.Path) |
| 66 if err := path.Validate(); err != nil { | 69 if err := path.Validate(); err != nil { |
| 67 log.WithError(err).Errorf(c, "Invalid path supplied.") | 70 log.WithError(err).Errorf(c, "Invalid path supplied.") |
| 68 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue") | 71 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue") |
| 69 } | 72 } |
| 70 | 73 |
| 71 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} | 74 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 87 // If this log entry is Purged and we're not admin, pretend it doesn't e xist. | 90 // If this log entry is Purged and we're not admin, pretend it doesn't e xist. |
| 88 if ls.Purged { | 91 if ls.Purged { |
| 89 if authErr := coordinator.IsAdminUser(c); authErr != nil { | 92 if authErr := coordinator.IsAdminUser(c); authErr != nil { |
| 90 log.Fields{ | 93 log.Fields{ |
| 91 log.ErrorKey: authErr, | 94 log.ErrorKey: authErr, |
| 92 }.Warningf(c, "Non-superuser requested purged log.") | 95 }.Warningf(c, "Non-superuser requested purged log.") |
| 93 return nil, grpcutil.Errf(codes.NotFound, "path not foun d") | 96 return nil, grpcutil.Errf(codes.NotFound, "path not foun d") |
| 94 } | 97 } |
| 95 } | 98 } |
| 96 | 99 |
| 97 // If nothing was requested, return nothing. | |
| 98 resp := logdog.GetResponse{} | 100 resp := logdog.GetResponse{} |
| 99 if !(req.State || tail) && req.LogCount < 0 { | |
| 100 return &resp, nil | |
| 101 } | |
| 102 | |
| 103 if req.State { | 101 if req.State { |
| 104 resp.State = buildLogStreamState(ls, lst) | 102 resp.State = buildLogStreamState(ls, lst) |
| 105 | 103 |
| 106 var err error | 104 var err error |
| 107 resp.Desc, err = ls.DescriptorValue() | 105 resp.Desc, err = ls.DescriptorValue() |
| 108 if err != nil { | 106 if err != nil { |
| 109 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.") | 107 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.") |
| 110 return nil, grpcutil.Internal | 108 return nil, grpcutil.Internal |
| 111 } | 109 } |
| 112 } | 110 } |
| 113 | 111 |
| 114 // Retrieve requested logs from storage, if requested. | 112 // Retrieve requested logs from storage, if requested. |
| 115 » if tail || req.LogCount >= 0 { | 113 » if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil { |
| 116 » » var err error | 114 » » log.WithError(err).Errorf(c, "Failed to get logs.") |
| 117 » » resp.Logs, err = s.getLogs(c, req, tail, ls, lst) | 115 » » return nil, grpcutil.Internal |
| 118 » » if err != nil { | |
| 119 » » » log.WithError(err).Errorf(c, "Failed to get logs.") | |
| 120 » » » return nil, grpcutil.Internal | |
| 121 » » } | |
| 122 } | 116 } |
| 123 | 117 |
| 124 log.Fields{ | 118 log.Fields{ |
| 125 "logCount": len(resp.Logs), | 119 "logCount": len(resp.Logs), |
| 126 }.Debugf(c, "Get request completed successfully.") | 120 }.Debugf(c, "Get request completed successfully.") |
| 127 return &resp, nil | 121 return &resp, nil |
| 128 } | 122 } |
| 129 | 123 |
| 130 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l s *coordinator.LogStream, | 124 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog .GetResponse, |
| 131 » lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) { | 125 » tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) e rror { |
| 132 » byteLimit := int(req.ByteCount) | 126 |
| 133 » if byteLimit <= 0 || byteLimit > getBytesLimit { | 127 » signURLLifetime := req.SignEntryUrlLifetime.Duration() |
| 134 » » byteLimit = getBytesLimit | 128 » if !tail && req.LogCount < 0 && signURLLifetime <= 0 { |
| 129 » » // No log operations are acutally needed, so don't bother instan ting our | |
| 130 » » // Storage instance only to do nothing. | |
| 131 » » return nil | |
| 135 } | 132 } |
| 136 | 133 |
| 137 svc := coordinator.GetServices(c) | 134 svc := coordinator.GetServices(c) |
| 138 » var st storage.Storage | 135 » st, err := svc.StorageForStream(c, lst) |
| 139 » if !lst.ArchivalState().Archived() { | 136 » if err != nil { |
| 140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") | 137 » » return errors.Annotate(err).InternalReason("failed to create sto rage instance").Err() |
| 141 | |
| 142 » » // Logs are not archived. Fetch from intermediate storage. | |
| 143 » » var err error | |
| 144 » » st, err = svc.IntermediateStorage(c) | |
| 145 » » if err != nil { | |
| 146 » » » return nil, err | |
| 147 » » } | |
| 148 » } else { | |
| 149 » » log.Fields{ | |
| 150 » » » "indexURL": lst.ArchiveIndexURL, | |
| 151 » » » "streamURL": lst.ArchiveStreamURL, | |
| 152 » » » "archiveTime": lst.ArchivedTime, | |
| 153 » » }.Debugf(c, "Log is archived. Fetching from archive storage.") | |
| 154 | |
| 155 » » var err error | |
| 156 » » gs, err := svc.GSClient(c) | |
| 157 » » if err != nil { | |
| 158 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage client.") | |
| 159 » » » return nil, err | |
| 160 » » } | |
| 161 » » defer func() { | |
| 162 » » » if err := gs.Close(); err != nil { | |
| 163 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.") | |
| 164 » » » } | |
| 165 » » }() | |
| 166 | |
| 167 » » st, err = archive.New(c, archive.Options{ | |
| 168 » » » IndexURL: lst.ArchiveIndexURL, | |
| 169 » » » StreamURL: lst.ArchiveStreamURL, | |
| 170 » » » Client: gs, | |
| 171 » » » Cache: svc.StorageCache(), | |
| 172 » » }) | |
| 173 » » if err != nil { | |
| 174 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") | |
| 175 » » » return nil, err | |
| 176 » » } | |
| 177 } | 138 } |
| 178 defer st.Close() | 139 defer st.Close() |
| 179 | 140 |
| 180 project, path := coordinator.Project(c), ls.Path() | 141 project, path := coordinator.Project(c), ls.Path() |
| 181 | 142 |
| 182 var fetchedLogs []*logpb.LogEntry | |
| 183 var err error | |
| 184 if tail { | 143 if tail { |
| 185 » » fetchedLogs, err = getTail(c, st, project, path) | 144 » » resp.Logs, err = getTail(c, st, project, path) |
| 186 » } else { | 145 » } else if req.LogCount >= 0 { |
| 187 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) | 146 » » byteLimit := int(req.ByteCount) |
| 147 » » if byteLimit <= 0 || byteLimit > getBytesLimit { | |
| 148 » » » byteLimit = getBytesLimit | |
| 149 » » } | |
| 150 | |
| 151 » » resp.Logs, err = getHead(c, req, st, project, path, byteLimit) | |
| 188 } | 152 } |
| 189 if err != nil { | 153 if err != nil { |
| 190 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 154 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
| 191 » » return nil, err | 155 » » return err |
| 192 } | 156 } |
| 193 | 157 |
| 194 » return fetchedLogs, nil | 158 » // If we're requesting a signedl URL, try and get that too. |
| 159 » if signURLLifetime > 0 { | |
|
Vadim Sh.
2016/11/30 21:03:52
is a request with 'tail == true' and 'signURLLifet
dnj
2016/12/01 17:39:30
Nope, b/c that parameter is not part of the TailRe
| |
| 160 » » value, expire, err := st.SignStreamURL(c, signURLLifetime) | |
| 161 » » switch err { | |
| 162 » » case nil: | |
| 163 » » » resp.SignedEntryUrl = &logdog.GetResponse_SignedEntryUrl { | |
| 164 » » » » Value: value, | |
| 165 » » » » Expiration: google.NewTimestamp(expire), | |
| 166 » » » } | |
| 167 | |
| 168 » » case coordinator.ErrSigningNotSupported: | |
| 169 » » » log.Debugf(c, "Signed URL was requested, but is not supp orted by storage.") | |
| 170 » » » break | |
| 171 | |
| 172 » » default: | |
| 173 » » » return errors.Annotate(err).InternalReason("failed to ge nerate signed URL").Err() | |
| 174 » » } | |
| 175 » } | |
| 176 | |
| 177 » return nil | |
| 195 } | 178 } |
| 196 | 179 |
| 197 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, | 180 func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName, |
| 198 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { | 181 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { |
| 199 log.Fields{ | 182 log.Fields{ |
| 200 "project": project, | 183 "project": project, |
| 201 "path": path, | 184 "path": path, |
| 202 "index": req.Index, | 185 "index": req.Index, |
| 203 "count": req.LogCount, | 186 "count": req.LogCount, |
| 204 "bytes": req.ByteCount, | 187 "bytes": req.ByteCount, |
| 205 "noncontiguous": req.NonContiguous, | 188 "noncontiguous": req.NonContiguous, |
| 206 }.Debugf(c, "Issuing Get request.") | 189 }.Debugf(c, "Issuing Get request.") |
| 207 | 190 |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 271 log.Fields{ | 254 log.Fields{ |
| 272 log.ErrorKey: err, | 255 log.ErrorKey: err, |
| 273 "initialIndex": req.Index, | 256 "initialIndex": req.Index, |
| 274 "nextIndex": sreq.Index, | 257 "nextIndex": sreq.Index, |
| 275 "count": len(logs), | 258 "count": len(logs), |
| 276 }.Errorf(c, "Failed to execute range request.") | 259 }.Errorf(c, "Failed to execute range request.") |
| 277 return nil, err | 260 return nil, err |
| 278 } | 261 } |
| 279 } | 262 } |
| 280 | 263 |
| 281 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( | 264 func getTail(c context.Context, st coordinator.Storage, project config.ProjectNa me, path types.StreamPath) ( |
| 282 []*logpb.LogEntry, error) { | 265 []*logpb.LogEntry, error) { |
| 283 log.Fields{ | 266 log.Fields{ |
| 284 "project": project, | 267 "project": project, |
| 285 "path": path, | 268 "path": path, |
| 286 }.Debugf(c, "Issuing Tail request.") | 269 }.Debugf(c, "Issuing Tail request.") |
| 287 | 270 |
| 288 var e *storage.Entry | 271 var e *storage.Entry |
| 289 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { | 272 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { |
| 290 e, err = st.Tail(project, path) | 273 e, err = st.Tail(project, path) |
| 291 return | 274 return |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 305 return []*logpb.LogEntry{le}, nil | 288 return []*logpb.LogEntry{le}, nil |
| 306 | 289 |
| 307 case storage.ErrDoesNotExist: | 290 case storage.ErrDoesNotExist: |
| 308 return nil, nil | 291 return nil, nil |
| 309 | 292 |
| 310 default: | 293 default: |
| 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 294 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 312 return nil, err | 295 return nil, err |
| 313 } | 296 } |
| 314 } | 297 } |
| OLD | NEW |