Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(442)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "time" 8 "time"
9 9
10 ds "github.com/luci/gae/service/datastore"
11 "github.com/luci/luci-go/common/config" 10 "github.com/luci/luci-go/common/config"
11 "github.com/luci/luci-go/common/errors"
12 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/proto/google"
13 "github.com/luci/luci-go/common/retry" 14 "github.com/luci/luci-go/common/retry"
14 "github.com/luci/luci-go/grpc/grpcutil" 15 "github.com/luci/luci-go/grpc/grpcutil"
15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 16 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
16 "github.com/luci/luci-go/logdog/api/logpb" 17 "github.com/luci/luci-go/logdog/api/logpb"
17 "github.com/luci/luci-go/logdog/appengine/coordinator" 18 "github.com/luci/luci-go/logdog/appengine/coordinator"
18 "github.com/luci/luci-go/logdog/common/storage" 19 "github.com/luci/luci-go/logdog/common/storage"
19 "github.com/luci/luci-go/logdog/common/storage/archive"
20 "github.com/luci/luci-go/logdog/common/types" 20 "github.com/luci/luci-go/logdog/common/types"
21 21
22 ds "github.com/luci/gae/service/datastore"
23
22 "golang.org/x/net/context" 24 "golang.org/x/net/context"
23 "google.golang.org/grpc/codes" 25 "google.golang.org/grpc/codes"
24 ) 26 )
25 27
26 const ( 28 const (
27 // getInitialArraySize is the initial amount of log slots to allocate fo r a 29 // getInitialArraySize is the initial amount of log slots to allocate fo r a
28 // Get request. 30 // Get request.
29 getInitialArraySize = 256 31 getInitialArraySize = 256
30 32
31 // getBytesLimit is the maximum amount of data that we are willing to qu ery. 33 // getBytesLimit is the maximum amount of data that we are willing to qu ery.
(...skipping 20 matching lines...) Expand all
52 } 54 }
53 return s.getImpl(c, &r, true) 55 return s.getImpl(c, &r, true)
54 } 56 }
55 57
56 // getImpl is common code shared between Get and Tail endpoints. 58 // getImpl is common code shared between Get and Tail endpoints.
57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) { 59 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) {
58 log.Fields{ 60 log.Fields{
59 "project": req.Project, 61 "project": req.Project,
60 "path": req.Path, 62 "path": req.Path,
61 "index": req.Index, 63 "index": req.Index,
64 "sign": req.SignEntryUrlLifetime.Duration(),
62 "tail": tail, 65 "tail": tail,
63 }.Debugf(c, "Received get request.") 66 }.Debugf(c, "Received get request.")
64 67
65 path := types.StreamPath(req.Path) 68 path := types.StreamPath(req.Path)
66 if err := path.Validate(); err != nil { 69 if err := path.Validate(); err != nil {
67 log.WithError(err).Errorf(c, "Invalid path supplied.") 70 log.WithError(err).Errorf(c, "Invalid path supplied.")
68 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue") 71 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue")
69 } 72 }
70 73
71 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)} 74 ls := &coordinator.LogStream{ID: coordinator.LogStreamID(path)}
(...skipping 15 matching lines...) Expand all
87 // If this log entry is Purged and we're not admin, pretend it doesn't e xist. 90 // If this log entry is Purged and we're not admin, pretend it doesn't e xist.
88 if ls.Purged { 91 if ls.Purged {
89 if authErr := coordinator.IsAdminUser(c); authErr != nil { 92 if authErr := coordinator.IsAdminUser(c); authErr != nil {
90 log.Fields{ 93 log.Fields{
91 log.ErrorKey: authErr, 94 log.ErrorKey: authErr,
92 }.Warningf(c, "Non-superuser requested purged log.") 95 }.Warningf(c, "Non-superuser requested purged log.")
93 return nil, grpcutil.Errf(codes.NotFound, "path not foun d") 96 return nil, grpcutil.Errf(codes.NotFound, "path not foun d")
94 } 97 }
95 } 98 }
96 99
97 // If nothing was requested, return nothing.
98 resp := logdog.GetResponse{} 100 resp := logdog.GetResponse{}
99 if !(req.State || tail) && req.LogCount < 0 {
100 return &resp, nil
101 }
102
103 if req.State { 101 if req.State {
104 resp.State = buildLogStreamState(ls, lst) 102 resp.State = buildLogStreamState(ls, lst)
105 103
106 var err error 104 var err error
107 resp.Desc, err = ls.DescriptorValue() 105 resp.Desc, err = ls.DescriptorValue()
108 if err != nil { 106 if err != nil {
109 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.") 107 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.")
110 return nil, grpcutil.Internal 108 return nil, grpcutil.Internal
111 } 109 }
112 } 110 }
113 111
114 // Retrieve requested logs from storage, if requested. 112 // Retrieve requested logs from storage, if requested.
115 » if tail || req.LogCount >= 0 { 113 » if err := s.getLogs(c, req, &resp, tail, ls, lst); err != nil {
116 » » var err error 114 » » log.WithError(err).Errorf(c, "Failed to get logs.")
117 » » resp.Logs, err = s.getLogs(c, req, tail, ls, lst) 115 » » return nil, grpcutil.Internal
118 » » if err != nil {
119 » » » log.WithError(err).Errorf(c, "Failed to get logs.")
120 » » » return nil, grpcutil.Internal
121 » » }
122 } 116 }
123 117
124 log.Fields{ 118 log.Fields{
125 "logCount": len(resp.Logs), 119 "logCount": len(resp.Logs),
126 }.Debugf(c, "Get request completed successfully.") 120 }.Debugf(c, "Get request completed successfully.")
127 return &resp, nil 121 return &resp, nil
128 } 122 }
129 123
130 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, tail bool, l s *coordinator.LogStream, 124 func (s *server) getLogs(c context.Context, req *logdog.GetRequest, resp *logdog .GetResponse,
131 » lst *coordinator.LogStreamState) ([]*logpb.LogEntry, error) { 125 » tail bool, ls *coordinator.LogStream, lst *coordinator.LogStreamState) e rror {
132 » byteLimit := int(req.ByteCount) 126
133 » if byteLimit <= 0 || byteLimit > getBytesLimit { 127 » signURLLifetime := req.SignEntryUrlLifetime.Duration()
134 » » byteLimit = getBytesLimit 128 » if !tail && req.LogCount < 0 && signURLLifetime <= 0 {
129 » » // No log operations are acutally needed, so don't bother instan ting our
130 » » // Storage instance only to do nothing.
131 » » return nil
135 } 132 }
136 133
137 svc := coordinator.GetServices(c) 134 svc := coordinator.GetServices(c)
138 » var st storage.Storage 135 » st, err := svc.StorageForStream(c, lst)
139 » if !lst.ArchivalState().Archived() { 136 » if err != nil {
140 » » log.Debugf(c, "Log is not archived. Fetching from intermediate s torage.") 137 » » return errors.Annotate(err).InternalReason("failed to create sto rage instance").Err()
141
142 » » // Logs are not archived. Fetch from intermediate storage.
143 » » var err error
144 » » st, err = svc.IntermediateStorage(c)
145 » » if err != nil {
146 » » » return nil, err
147 » » }
148 » } else {
149 » » log.Fields{
150 » » » "indexURL": lst.ArchiveIndexURL,
151 » » » "streamURL": lst.ArchiveStreamURL,
152 » » » "archiveTime": lst.ArchivedTime,
153 » » }.Debugf(c, "Log is archived. Fetching from archive storage.")
154
155 » » var err error
156 » » gs, err := svc.GSClient(c)
157 » » if err != nil {
158 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage client.")
159 » » » return nil, err
160 » » }
161 » » defer func() {
162 » » » if err := gs.Close(); err != nil {
163 » » » » log.WithError(err).Warningf(c, "Failed to close Google Storage client.")
164 » » » }
165 » » }()
166
167 » » st, err = archive.New(c, archive.Options{
168 » » » IndexURL: lst.ArchiveIndexURL,
169 » » » StreamURL: lst.ArchiveStreamURL,
170 » » » Client: gs,
171 » » » Cache: svc.StorageCache(),
172 » » })
173 » » if err != nil {
174 » » » log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
175 » » » return nil, err
176 » » }
177 } 138 }
178 defer st.Close() 139 defer st.Close()
179 140
180 project, path := coordinator.Project(c), ls.Path() 141 project, path := coordinator.Project(c), ls.Path()
181 142
182 var fetchedLogs []*logpb.LogEntry
183 var err error
184 if tail { 143 if tail {
185 » » fetchedLogs, err = getTail(c, st, project, path) 144 » » resp.Logs, err = getTail(c, st, project, path)
186 » } else { 145 » } else if req.LogCount >= 0 {
187 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) 146 » » byteLimit := int(req.ByteCount)
147 » » if byteLimit <= 0 || byteLimit > getBytesLimit {
148 » » » byteLimit = getBytesLimit
149 » » }
150
151 » » resp.Logs, err = getHead(c, req, st, project, path, byteLimit)
188 } 152 }
189 if err != nil { 153 if err != nil {
190 log.WithError(err).Errorf(c, "Failed to fetch log records.") 154 log.WithError(err).Errorf(c, "Failed to fetch log records.")
191 » » return nil, err 155 » » return err
192 } 156 }
193 157
194 » return fetchedLogs, nil 158 » // If we're requesting a signedl URL, try and get that too.
159 » if signURLLifetime > 0 {
Vadim Sh. 2016/11/30 21:03:52 is a request with 'tail == true' and 'signURLLifet
dnj 2016/12/01 17:39:30 Nope, b/c that parameter is not part of the TailRe
160 » » value, expire, err := st.SignStreamURL(c, signURLLifetime)
161 » » switch err {
162 » » case nil:
163 » » » resp.SignedEntryUrl = &logdog.GetResponse_SignedEntryUrl {
164 » » » » Value: value,
165 » » » » Expiration: google.NewTimestamp(expire),
166 » » » }
167
168 » » case coordinator.ErrSigningNotSupported:
169 » » » log.Debugf(c, "Signed URL was requested, but is not supp orted by storage.")
170 » » » break
171
172 » » default:
173 » » » return errors.Annotate(err).InternalReason("failed to ge nerate signed URL").Err()
174 » » }
175 » }
176
177 » return nil
195 } 178 }
196 179
197 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName, 180 func getHead(c context.Context, req *logdog.GetRequest, st coordinator.Storage, project config.ProjectName,
198 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) { 181 path types.StreamPath, byteLimit int) ([]*logpb.LogEntry, error) {
199 log.Fields{ 182 log.Fields{
200 "project": project, 183 "project": project,
201 "path": path, 184 "path": path,
202 "index": req.Index, 185 "index": req.Index,
203 "count": req.LogCount, 186 "count": req.LogCount,
204 "bytes": req.ByteCount, 187 "bytes": req.ByteCount,
205 "noncontiguous": req.NonContiguous, 188 "noncontiguous": req.NonContiguous,
206 }.Debugf(c, "Issuing Get request.") 189 }.Debugf(c, "Issuing Get request.")
207 190
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 log.Fields{ 254 log.Fields{
272 log.ErrorKey: err, 255 log.ErrorKey: err,
273 "initialIndex": req.Index, 256 "initialIndex": req.Index,
274 "nextIndex": sreq.Index, 257 "nextIndex": sreq.Index,
275 "count": len(logs), 258 "count": len(logs),
276 }.Errorf(c, "Failed to execute range request.") 259 }.Errorf(c, "Failed to execute range request.")
277 return nil, err 260 return nil, err
278 } 261 }
279 } 262 }
280 263
281 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ( 264 func getTail(c context.Context, st coordinator.Storage, project config.ProjectNa me, path types.StreamPath) (
282 []*logpb.LogEntry, error) { 265 []*logpb.LogEntry, error) {
283 log.Fields{ 266 log.Fields{
284 "project": project, 267 "project": project,
285 "path": path, 268 "path": path,
286 }.Debugf(c, "Issuing Tail request.") 269 }.Debugf(c, "Issuing Tail request.")
287 270
288 var e *storage.Entry 271 var e *storage.Entry
289 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { 272 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) {
290 e, err = st.Tail(project, path) 273 e, err = st.Tail(project, path)
291 return 274 return
(...skipping 13 matching lines...) Expand all
305 return []*logpb.LogEntry{le}, nil 288 return []*logpb.LogEntry{le}, nil
306 289
307 case storage.ErrDoesNotExist: 290 case storage.ErrDoesNotExist:
308 return nil, nil 291 return nil, nil
309 292
310 default: 293 default:
311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 294 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
312 return nil, err 295 return nil, err
313 } 296 }
314 } 297 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698