Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get.go

Issue 1909073002: LogDog: Add project namespace to logs endpoint. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-storage
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "net/url"
9 "time" 8 "time"
10 9
11 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
12 ds "github.com/luci/gae/service/datastore" 11 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/luci-go/appengine/logdog/coordinator" 12 "github.com/luci/luci-go/appengine/logdog/coordinator"
14 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 13 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
14 "github.com/luci/luci-go/common/config"
15 "github.com/luci/luci-go/common/grpcutil" 15 "github.com/luci/luci-go/common/grpcutil"
16 "github.com/luci/luci-go/common/logdog/types" 16 "github.com/luci/luci-go/common/logdog/types"
17 log "github.com/luci/luci-go/common/logging" 17 log "github.com/luci/luci-go/common/logging"
18 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
19 "github.com/luci/luci-go/common/retry" 19 "github.com/luci/luci-go/common/retry"
20 "github.com/luci/luci-go/server/logdog/storage" 20 "github.com/luci/luci-go/server/logdog/storage"
21 "github.com/luci/luci-go/server/logdog/storage/archive" 21 "github.com/luci/luci-go/server/logdog/storage/archive"
22 "golang.org/x/net/context" 22 "golang.org/x/net/context"
23 "google.golang.org/grpc/codes" 23 "google.golang.org/grpc/codes"
24 ) 24 )
(...skipping 14 matching lines...) Expand all
39 ) 39 )
40 40
41 // Get returns state and log data for a single log stream. 41 // Get returns state and log data for a single log stream.
42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp onse, error) { 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp onse, error) {
43 return s.getImpl(c, req, false) 43 return s.getImpl(c, req, false)
44 } 44 }
45 45
46 // Tail returns the last log entry for a given log stream. 46 // Tail returns the last log entry for a given log stream.
47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe sponse, error) { 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe sponse, error) {
48 r := logdog.GetRequest{ 48 r := logdog.GetRequest{
49 » » Path: req.Path, 49 » » Project: req.Project,
50 » » State: req.State, 50 » » Path: req.Path,
51 » » State: req.State,
51 } 52 }
52 return s.getImpl(c, &r, true) 53 return s.getImpl(c, &r, true)
53 } 54 }
54 55
55 // getImpl is common code shared between Get and Tail endpoints. 56 // getImpl is common code shared between Get and Tail endpoints.
56 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) { 57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) ( *logdog.GetResponse, error) {
57 log.Fields{ 58 log.Fields{
58 » » "path": req.Path, 59 » » "project": req.Project,
59 » » "index": req.Index, 60 » » "path": req.Path,
60 » » "tail": tail, 61 » » "index": req.Index,
62 » » "tail": tail,
61 }.Debugf(c, "Received get request.") 63 }.Debugf(c, "Received get request.")
62 64
63 » // Fetch the log stream state for this log stream. 65 » ls, err := coordinator.NewLogStream(req.Path)
64 » u, err := url.Parse(req.Path)
65 if err != nil { 66 if err != nil {
66 » » log.Fields{ 67 » » log.WithError(err).Errorf(c, "Invalid path supplied.")
67 » » » log.ErrorKey: err,
68 » » » "path": req.Path,
69 » » }.Errorf(c, "Could not parse path URL.")
70 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e ncoding")
71 » }
72 » ls, err := coordinator.NewLogStream(u.Path)
73 » if err != nil {
74 » » log.Fields{
75 » » » log.ErrorKey: err,
76 » » » "path": u.Path,
77 » » }.Errorf(c, "Invalid path supplied.")
78 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue") 68 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v alue")
79 } 69 }
80 70
71 // The user may supply a hash instead of a full path. Once resolved, log
72 // the original log stream.
73 path := ls.Path()
74 if req.Path != string(path) {
75 log.Fields{
76 "hashPath": req.Path,
77 "streamPath": path,
78 }.Debugf(c, "Resolved hash path.")
79 }
80
81 // If this log entry is Purged and we're not admin, pretend it doesn't e xist. 81 // If this log entry is Purged and we're not admin, pretend it doesn't e xist.
82 err = ds.Get(c).Get(ls) 82 err = ds.Get(c).Get(ls)
83 switch err { 83 switch err {
84 case nil: 84 case nil:
85 if ls.Purged { 85 if ls.Purged {
86 if authErr := coordinator.IsAdminUser(c); authErr != nil { 86 if authErr := coordinator.IsAdminUser(c); authErr != nil {
87 log.Fields{ 87 log.Fields{
88 log.ErrorKey: authErr, 88 log.ErrorKey: authErr,
89 }.Warningf(c, "Non-superuser requested purged lo g.") 89 }.Warningf(c, "Non-superuser requested purged lo g.")
90 return nil, grpcutil.Errf(codes.NotFound, "path not found") 90 return nil, grpcutil.Errf(codes.NotFound, "path not found")
91 } 91 }
92 } 92 }
93 93
94 case ds.ErrNoSuchEntity: 94 case ds.ErrNoSuchEntity:
95 » » log.Fields{ 95 » » log.Errorf(c, "Log stream does not exist.")
96 » » » "path": u.Path,
97 » » }.Errorf(c, "Log stream does not exist.")
98 return nil, grpcutil.Errf(codes.NotFound, "path not found") 96 return nil, grpcutil.Errf(codes.NotFound, "path not found")
99 97
100 default: 98 default:
101 » » log.Fields{ 99 » » log.WithError(err).Errorf(c, "Failed to look up log stream.")
102 » » » log.ErrorKey: err,
103 » » » "path": u.Path,
104 » » }.Errorf(c, "Failed to look up log stream.")
105 return nil, grpcutil.Internal 100 return nil, grpcutil.Internal
106 } 101 }
107 path := ls.Path()
108 102
109 // If nothing was requested, return nothing. 103 // If nothing was requested, return nothing.
110 resp := logdog.GetResponse{} 104 resp := logdog.GetResponse{}
111 if !(req.State || tail) && req.LogCount < 0 { 105 if !(req.State || tail) && req.LogCount < 0 {
112 return &resp, nil 106 return &resp, nil
113 } 107 }
114 108
115 if req.State { 109 if req.State {
116 resp.State = loadLogStreamState(ls) 110 resp.State = loadLogStreamState(ls)
117 111
118 var err error 112 var err error
119 resp.Desc, err = ls.DescriptorValue() 113 resp.Desc, err = ls.DescriptorValue()
120 if err != nil { 114 if err != nil {
121 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.") 115 log.WithError(err).Errorf(c, "Failed to deserialize desc riptor protobuf.")
122 return nil, grpcutil.Internal 116 return nil, grpcutil.Internal
123 } 117 }
124 } 118 }
125 119
126 // Retrieve requested logs from storage, if requested. 120 // Retrieve requested logs from storage, if requested.
127 if tail || req.LogCount >= 0 { 121 if tail || req.LogCount >= 0 {
128 resp.Logs, err = s.getLogs(c, req, tail, ls) 122 resp.Logs, err = s.getLogs(c, req, tail, ls)
129 if err != nil { 123 if err != nil {
130 » » » log.Fields{ 124 » » » log.WithError(err).Errorf(c, "Failed to get logs.")
131 » » » » log.ErrorKey: err,
132 » » » » "path": path,
133 » » » }.Errorf(c, "Failed to get logs.")
134 return nil, grpcutil.Internal 125 return nil, grpcutil.Internal
135 } 126 }
136 } 127 }
137 128
138 log.Fields{ 129 log.Fields{
139 "logCount": len(resp.Logs), 130 "logCount": len(resp.Logs),
140 }.Debugf(c, "Get request completed successfully.") 131 }.Debugf(c, "Get request completed successfully.")
141 return &resp, nil 132 return &resp, nil
142 } 133 }
143 134
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 Client: gs, 175 Client: gs,
185 MaxBytes: byteLimit, 176 MaxBytes: byteLimit,
186 }) 177 })
187 if err != nil { 178 if err != nil {
188 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.") 179 log.WithError(err).Errorf(c, "Failed to create Google St orage storage instance.")
189 return nil, err 180 return nil, err
190 } 181 }
191 } 182 }
192 defer st.Close() 183 defer st.Close()
193 184
194 » path := ls.Path() 185 » project, path := coordinator.Project(c), ls.Path()
195 186
196 var fetchedLogs [][]byte 187 var fetchedLogs [][]byte
197 var err error 188 var err error
198 if tail { 189 if tail {
199 » » fetchedLogs, err = getTail(c, st, path) 190 » » fetchedLogs, err = getTail(c, st, project, path)
200 } else { 191 } else {
201 » » fetchedLogs, err = getHead(c, req, st, path, byteLimit) 192 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit)
202 } 193 }
203 if err != nil { 194 if err != nil {
204 log.WithError(err).Errorf(c, "Failed to fetch log records.") 195 log.WithError(err).Errorf(c, "Failed to fetch log records.")
205 return nil, err 196 return nil, err
206 } 197 }
207 198
208 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) 199 logEntries := make([]*logpb.LogEntry, len(fetchedLogs))
209 for idx, ld := range fetchedLogs { 200 for idx, ld := range fetchedLogs {
210 // Deserialize the log entry, then convert it to output value. 201 // Deserialize the log entry, then convert it to output value.
211 le := logpb.LogEntry{} 202 le := logpb.LogEntry{}
212 if err := proto.Unmarshal(ld, &le); err != nil { 203 if err := proto.Unmarshal(ld, &le); err != nil {
213 log.Fields{ 204 log.Fields{
214 log.ErrorKey: err, 205 log.ErrorKey: err,
215 "index": idx, 206 "index": idx,
216 }.Errorf(c, "Failed to generate response log entry.") 207 }.Errorf(c, "Failed to generate response log entry.")
217 return nil, err 208 return nil, err
218 } 209 }
219 logEntries[idx] = &le 210 logEntries[idx] = &le
220 } 211 }
221 return logEntries, nil 212 return logEntries, nil
222 } 213 }
223 214
224 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty pes.StreamPath, byteLimit int) ( 215 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj ect config.ProjectName,
225 » [][]byte, error) { 216 » path types.StreamPath, byteLimit int) ([][]byte, error) {
226 » c = log.SetFields(c, log.Fields{ 217 » log.Fields{
227 » » "path": p, 218 » » "project": project,
219 » » "path": path,
228 "index": req.Index, 220 "index": req.Index,
229 "count": req.LogCount, 221 "count": req.LogCount,
230 "bytes": req.ByteCount, 222 "bytes": req.ByteCount,
231 "noncontiguous": req.NonContiguous, 223 "noncontiguous": req.NonContiguous,
232 » }) 224 » }.Debugf(c, "Issuing Get request.")
233 225
234 // Allocate result logs array. 226 // Allocate result logs array.
235 logCount := int(req.LogCount) 227 logCount := int(req.LogCount)
236 asz := getInitialArraySize 228 asz := getInitialArraySize
237 if logCount > 0 && logCount < asz { 229 if logCount > 0 && logCount < asz {
238 asz = logCount 230 asz = logCount
239 } 231 }
240 logs := make([][]byte, 0, asz) 232 logs := make([][]byte, 0, asz)
241 233
242 sreq := storage.GetRequest{ 234 sreq := storage.GetRequest{
243 » » Path: p, 235 » » Project: project,
244 » » Index: types.MessageIndex(req.Index), 236 » » Path: path,
245 » » Limit: logCount, 237 » » Index: types.MessageIndex(req.Index),
238 » » Limit: logCount,
246 } 239 }
247 240
248 count := 0 241 count := 0
249 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { 242 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
250 // Issue the Get request. This may return a transient error, in which case 243 // Issue the Get request. This may return a transient error, in which case
251 // we will retry. 244 // we will retry.
252 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool { 245 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool {
253 if count > 0 && byteLimit-len(ld) < 0 { 246 if count > 0 && byteLimit-len(ld) < 0 {
254 // Not the first log, and we've exceeded our byt e limit. 247 // Not the first log, and we've exceeded our byt e limit.
255 return false 248 return false
(...skipping 10 matching lines...) Expand all
266 }) 259 })
267 }, func(err error, delay time.Duration) { 260 }, func(err error, delay time.Duration) {
268 log.Fields{ 261 log.Fields{
269 log.ErrorKey: err, 262 log.ErrorKey: err,
270 "delay": delay, 263 "delay": delay,
271 "initialIndex": req.Index, 264 "initialIndex": req.Index,
272 "nextIndex": sreq.Index, 265 "nextIndex": sreq.Index,
273 "count": len(logs), 266 "count": len(logs),
274 }.Warningf(c, "Transient error while loading logs; retrying.") 267 }.Warningf(c, "Transient error while loading logs; retrying.")
275 }) 268 })
276 » if err != nil { 269 » switch err {
270 » case nil:
271 » » return logs, nil
272
273 » case storage.ErrDoesNotExist:
274 » » return nil, nil
275
276 » default:
277 log.Fields{ 277 log.Fields{
278 log.ErrorKey: err, 278 log.ErrorKey: err,
279 "initialIndex": req.Index, 279 "initialIndex": req.Index,
280 "nextIndex": sreq.Index, 280 "nextIndex": sreq.Index,
281 "count": len(logs), 281 "count": len(logs),
282 }.Errorf(c, "Failed to execute range request.") 282 }.Errorf(c, "Failed to execute range request.")
283 return nil, err 283 return nil, err
284 } 284 }
285
286 return logs, nil
287 } 285 }
288 286
289 func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt e, error) { 287 func getTail(c context.Context, st storage.Storage, project config.ProjectName, path types.StreamPath) ([][]byte, error) {
288 » log.Fields{
289 » » "project": project,
290 » » "path": path,
291 » }.Debugf(c, "Issuing Tail request.")
292
290 var data []byte 293 var data []byte
291 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) { 294 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er ror) {
292 » » data, _, err = st.Tail("", p) 295 » » data, _, err = st.Tail(project, path)
293 return 296 return
294 }, func(err error, delay time.Duration) { 297 }, func(err error, delay time.Duration) {
295 log.Fields{ 298 log.Fields{
296 log.ErrorKey: err, 299 log.ErrorKey: err,
297 "delay": delay, 300 "delay": delay,
298 }.Warningf(c, "Transient error while fetching tail log; retrying .") 301 }.Warningf(c, "Transient error while fetching tail log; retrying .")
299 }) 302 })
300 » if err != nil { 303 » switch err {
304 » case nil:
305 » » return [][]byte{data}, nil
306
307 » case storage.ErrDoesNotExist:
308 » » return nil, nil
309
310 » default:
301 log.WithError(err).Errorf(c, "Failed to fetch tail log.") 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.")
302 return nil, err 312 return nil, err
303 } 313 }
304 return [][]byte{data}, err
305 } 314 }
OLDNEW
« no previous file with comments | « no previous file | appengine/logdog/coordinator/endpoints/logs/get_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698