Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 ds "github.com/luci/gae/service/datastore"
11 "github.com/luci/luci-go/common/config" 10 "github.com/luci/luci-go/common/config"
11 "github.com/luci/luci-go/common/errors"
12 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/proto/google"
13 "github.com/luci/luci-go/common/retry" 14 "github.com/luci/luci-go/common/retry"
14 "github.com/luci/luci-go/grpc/grpcutil" 15 "github.com/luci/luci-go/grpc/grpcutil"
15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
16 "github.com/luci/luci-go/logdog/api/logpb" 17 "github.com/luci/luci-go/logdog/api/logpb"
17 "github.com/luci/luci-go/logdog/appengine/coordinator" 18 "github.com/luci/luci-go/logdog/appengine/coordinator"
18 "github.com/luci/luci-go/logdog/common/storage" 19 "github.com/luci/luci-go/logdog/common/storage"
19 "github.com/luci/luci-go/logdog/common/storage/archive"
20 "github.com/luci/luci-go/logdog/common/types" 20 "github.com/luci/luci-go/logdog/common/types"
21 21
22 ds "github.com/luci/gae/service/datastore"
23
22 "golang.org/x/net/context" 24 "golang.org/x/net/context"
23 "google.golang.org/grpc/codes" 25 "google.golang.org/grpc/codes"
24 ) 26 )
25 27
26 const ( 28 const (
27 // getInitialArraySize is the initial amount of log slots to allocate fo r a 29 // getInitialArraySize is the initial amount of log slots to allocate fo r a
28 // Get request. 30 // Get request.
29 getInitialArraySize = 256 31 getInitialArraySize = 256
30 32
31 // getBytesLimit is the maximum amount of data that we are willing to qu ery. 33 // getBytesLimit is the maximum amount of data that we are willing to qu ery.
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 // If this log entry is Purged and we're not admin, pretend it doesn't e xist. 89 // If this log entry is Purged and we're not admin, pretend it doesn't e xist.
88 if ls.Purged { 90 if ls.Purged {
89 if authErr := coordinator.IsAdminUser(c); authErr != nil { 91 if authErr := coordinator.IsAdminUser(c); authErr != nil {
90 log.Fields{ 92 log.Fields{
91 log.ErrorKey: authErr, 93 log.ErrorKey: authErr,
92 }.Warningf(c, "Non-superuser requested purged log.") 94 }.Warningf(c, "Non-superuser requested purged log.")
93 return nil, grpcutil.Errf(codes.NotFound, "path not foun d") 95 return nil, grpcutil.Errf(codes.NotFound, "path not foun d")
94 } 96 }
95 } 97 }
96 98
97 // If nothing was requested, return nothing.
98 resp := logdog.GetResponse{} 99 resp := logdog.GetResponse{}
99 if !(req.State || tail) && req.LogCount < 0 {
100 return &resp, nil
101 }
102
103 if req.State { 100 if req.State {
104 resp.State = buildLogStreamState(ls, lst) 101 resp.State = buildLogStreamState(ls, lst)
105 102
106 var err error 103 var err error
107 resp.Desc, err = ls.DescriptorValue() 104 resp.Desc, err = ls.DescriptorValue()
108 if err != nil { 105 if err != nil {
109 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.") 106 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.")
110 return nil, grpcutil.Internal 107 return nil, grpcutil.Internal
111 } 108 }
112 } 109 }
113 110
114 // Retrieve requested logs from storage, if requested. 111 // Retrieve requested logs from storage, if requested.
115 » if tail || req.LogCount >= 0 { 112 » if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil {
116 » » var err error 113 » » log.WithError(err).Errorf(c, "Failed to get logs.")
117 » » resp.Logs, err = s.getLogs(c, req, tail, ls, lst) 114 » » return nil, grpcutil.Internal
118 » » if err != nil {
119 » » » log.WithError(err).Errorf(c, "Failed to get logs.")
120 » » » return nil, grpcutil.Internal
121 » » }
122 } 115 }
123 116
124 log.Fields{ 117 log.Fields{
125 "logCount": len(resp.Logs), 118 "logCount": len(resp.Logs),
126 }.Debugf(c, "Get request completed successfully.") 119 }.Debugf(c, "Get request completed successfully.")
127 return &resp, nil 120 return &resp, nil
128 } 121 }
129 122
130 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l s *coordinator.LogStream, 123 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog .GetResponse,
131 » lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) { 124 » tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) e rror {
132 » byteLimit := int(req.ByteCount) 125
133 » if byteLimit <= 0 || byteLimit > getBytesLimit { 126 » // Identify our URL signing parameters.
134 » » byteLimit = getBytesLimit 127 » var signingRequest coordinator.URLSigningRequest
128 » if sr := req.GetSignedUrls; sr != nil {
129 » » signingRequest.Lifetime = sr.Lifetime.Duration()
130 » » signingRequest.Stream = sr.Stream
131 » » signingRequest.Index = sr.Index
132 » }
133 » if !tail && req.LogCount < 0 && !signingRequest.HasWork() {
134 » » // No log operations are acutally needed, so don't bother instan ting our
135 » » // Storage instance only to do nothing.
136 » » return nil
135 } 137 }
136 138
137 svc := coordinator.GetServices(c) 139 svc := coordinator.GetServices(c)
138 » var st storage.Storage 140 » st, err := svc.StorageForStream(c, lst)
139 » if !lst.ArchivalState().Archived() { 141 » if err != nil {
140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") 142 » » return errors.Annotate(err).InternalReason("failed to create sto rage instance").Err()
141
142 » » // Logs are not archived. Fetch from intermediate storage.
143 » » var err error
144 » » st, err = svc.IntermediateStorage(c)
145 » » if err != nil {
146 » » » return nil, err
147 » » }
148 » } else {
149 » » log.Fields{
150 » » » "indexURL": lst.ArchiveIndexURL,
151 » » » "streamURL": lst.ArchiveStreamURL,
152 » » » "archiveTime": lst.ArchivedTime,
153 » » }.Debugf(c, "Log is archived. Fetching from archive storage.")
154
155 » » var err error
156 » » gs, err := svc.GSClient(c)
157 » » if err != nil {
158 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage client.")
159 » » » return nil, err
160 » » }
161 » » defer func() {
162 » » » if err := gs.Close(); err != nil {
163 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
164 » » » }
165 » » }()
166
167 » » st, err = archive.New(c, archive.Options{
168 » » » IndexURL: lst.ArchiveIndexURL,
169 » » » StreamURL: lst.ArchiveStreamURL,
170 » » » Client: gs,
171 » » » Cache: svc.StorageCache(),
172 » » })
173 » » if err != nil {
174 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
175 » » » return nil, err
176 » » }
177 } 143 }
178 defer st.Close() 144 defer st.Close()
179 145
180 project, path := coordinator.Project(c), ls.Path() 146 project, path := coordinator.Project(c), ls.Path()
181 147
182 var fetchedLogs []*logpb.LogEntry
183 var err error
184 if tail { 148 if tail {
185 » » fetchedLogs, err = getTail(c, st, project, path) 149 » » resp.Logs, err = getTail(c, st, project, path)
186 » } else { 150 » } else if req.LogCount >= 0 {
187 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) 151 » » byteLimit := int(req.ByteCount)
152 » » if byteLimit <= 0 || byteLimit > getBytesLimit {
153 » » » byteLimit = getBytesLimit
154 » » }
155
156 » » resp.Logs, err = getHead(c, req, st, project, path, byteLimit)
188 } 157 }
189 if err != nil { 158 if err != nil {
190 log.WithError(err).Errorf(c, "Failed to fetch log records.") 159 log.WithError(err).Errorf(c, "Failed to fetch log records.")
191 » » return nil, err 160 » » return err
192 } 161 }
193 162
194 » return fetchedLogs, nil 163 » // If we're requesting a signedl URL, try and get that too.
164 » if signingRequest.HasWork() {
165 » » signedURLs, err := st.GetSignedURLs(c, &signingRequest)
166 » » switch {
167 » » case err != nil:
168 » » » return errors.Annotate(err).InternalReason("failed to ge nerate signed URL").Err()
169
170 » » case signedURLs == nil:
171 » » » log.Debugf(c, "Signed URL was requested, but is not supp orted by storage.")
172
173 » » default:
174 » » » resp.SignedUrls = &logdog.GetResponse_SignedUrls{
175 » » » » Expiration: google.NewTimestamp(signedURLs.Expir ation),
176 » » » » Stream: signedURLs.Stream,
177 » » » » Index: signedURLs.Index,
178 » » » }
179 » » }
180 » }
181
182 » return nil
195 } 183 }
196 184
197 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, 185 func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName,
198 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { 186 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
199 log.Fields{ 187 log.Fields{
200 "project": project, 188 "project": project,
201 "path": path, 189 "path": path,
202 "index": req.Index, 190 "index": req.Index,
203 "count": req.LogCount, 191 "count": req.LogCount,
204 "bytes": req.ByteCount, 192 "bytes": req.ByteCount,
205 "noncontiguous": req.NonContiguous, 193 "noncontiguous": req.NonContiguous,
206 }.Debugf(c, "Issuing Get request.") 194 }.Debugf(c, "Issuing Get request.")
207 195
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 log.Fields{ 259 log.Fields{
272 log.ErrorKey: err, 260 log.ErrorKey: err,
273 "initialIndex": req.Index, 261 "initialIndex": req.Index,
274 "nextIndex": sreq.Index, 262 "nextIndex": sreq.Index,
275 "count": len(logs), 263 "count": len(logs),
276 }.Errorf(c, "Failed to execute range request.") 264 }.Errorf(c, "Failed to execute range request.")
277 return nil, err 265 return nil, err
278 } 266 }
279 } 267 }
280 268
281 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( 269 func getTail(c context.Context, st coordinator.Storage, project config.ProjectNa me, path types.StreamPath) (
282 []*logpb.LogEntry, error) { 270 []*logpb.LogEntry, error) {
283 log.Fields{ 271 log.Fields{
284 "project": project, 272 "project": project,
285 "path": path, 273 "path": path,
286 }.Debugf(c, "Issuing Tail request.") 274 }.Debugf(c, "Issuing Tail request.")
287 275
288 var e *storage.Entry 276 var e *storage.Entry
289 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { 277 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) {
290 e, err = st.Tail(project, path) 278 e, err = st.Tail(project, path)
291 return 279 return
(...skipping 13 matching lines...) Expand all
305 return []*logpb.LogEntry{le}, nil 293 return []*logpb.LogEntry{le}, nil
306 294
307 case storage.ErrDoesNotExist: 295 case storage.ErrDoesNotExist:
308 return nil, nil 296 return nil, nil
309 297
310 default: 298 default:
311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 299 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
312 return nil, err 300 return nil, err
313 } 301 }
314 } 302 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698