| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 ds "github.com/luci/gae/service/datastore" | 10 ds "github.com/luci/gae/service/datastore" |
| (...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 161 defer func() { | 161 defer func() { |
| 162 if err := gs.Close(); err != nil { | 162 if err := gs.Close(); err != nil { |
| 163 log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") | 163 log.WithError(err).Warningf(c, "Failed to close
Google Storage client.") |
| 164 } | 164 } |
| 165 }() | 165 }() |
| 166 | 166 |
| 167 st, err = archive.New(c, archive.Options{ | 167 st, err = archive.New(c, archive.Options{ |
| 168 IndexURL: lst.ArchiveIndexURL, | 168 IndexURL: lst.ArchiveIndexURL, |
| 169 StreamURL: lst.ArchiveStreamURL, | 169 StreamURL: lst.ArchiveStreamURL, |
| 170 Client: gs, | 170 Client: gs, |
| 171 Cache: svc.StorageCache(), |
| 171 }) | 172 }) |
| 172 if err != nil { | 173 if err != nil { |
| 173 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | 174 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") |
| 174 return nil, err | 175 return nil, err |
| 175 } | 176 } |
| 176 } | 177 } |
| 177 defer st.Close() | 178 defer st.Close() |
| 178 | 179 |
| 179 project, path := coordinator.Project(c), ls.Path() | 180 project, path := coordinator.Project(c), ls.Path() |
| 180 | 181 |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 212 } | 213 } |
| 213 logs := make([]*logpb.LogEntry, 0, asz) | 214 logs := make([]*logpb.LogEntry, 0, asz) |
| 214 | 215 |
| 215 sreq := storage.GetRequest{ | 216 sreq := storage.GetRequest{ |
| 216 Project: project, | 217 Project: project, |
| 217 Path: path, | 218 Path: path, |
| 218 Index: types.MessageIndex(req.Index), | 219 Index: types.MessageIndex(req.Index), |
| 219 Limit: logCount, | 220 Limit: logCount, |
| 220 } | 221 } |
| 221 | 222 |
| 223 var ierr error |
| 222 count := 0 | 224 count := 0 |
| 223 var ierr error | |
| 224 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 225 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
| 225 // Issue the Get request. This may return a transient error, in
which case | 226 // Issue the Get request. This may return a transient error, in
which case |
| 226 // we will retry. | 227 // we will retry. |
| 227 return st.Get(sreq, func(e *storage.Entry) bool { | 228 return st.Get(sreq, func(e *storage.Entry) bool { |
| 228 var le *logpb.LogEntry | 229 var le *logpb.LogEntry |
| 229 if le, ierr = e.GetLogEntry(); ierr != nil { | 230 if le, ierr = e.GetLogEntry(); ierr != nil { |
| 230 return false | 231 return false |
| 231 } | 232 } |
| 232 | 233 |
| 233 if count > 0 && byteLimit-len(e.D) < 0 { | 234 if count > 0 && byteLimit-len(e.D) < 0 { |
| 234 // Not the first log, and we've exceeded our byt
e limit. | 235 // Not the first log, and we've exceeded our byt
e limit. |
| 235 return false | 236 return false |
| 236 } | 237 } |
| 237 byteLimit -= len(e.D) | |
| 238 | 238 |
| 239 sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded,
so this must. | 239 sidx, _ := e.GetStreamIndex() // GetLogEntry succeeded,
so this must. |
| 240 if !(req.NonContiguous || sidx == sreq.Index) { | 240 if !(req.NonContiguous || sidx == sreq.Index) { |
| 241 return false | 241 return false |
| 242 } | 242 } |
| 243 |
| 243 logs = append(logs, le) | 244 logs = append(logs, le) |
| 244 sreq.Index = sidx + 1 | 245 sreq.Index = sidx + 1 |
| 246 byteLimit -= len(e.D) |
| 245 count++ | 247 count++ |
| 246 return !(logCount > 0 && count >= logCount) | 248 return !(logCount > 0 && count >= logCount) |
| 247 }) | 249 }) |
| 248 }, func(err error, delay time.Duration) { | 250 }, func(err error, delay time.Duration) { |
| 249 log.Fields{ | 251 log.Fields{ |
| 250 log.ErrorKey: err, | 252 log.ErrorKey: err, |
| 251 "delay": delay, | 253 "delay": delay, |
| 252 "initialIndex": req.Index, | 254 "initialIndex": req.Index, |
| 253 "nextIndex": sreq.Index, | 255 "nextIndex": sreq.Index, |
| 254 "count": len(logs), | 256 "count": len(logs), |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 303 return []*logpb.LogEntry{le}, nil | 305 return []*logpb.LogEntry{le}, nil |
| 304 | 306 |
| 305 case storage.ErrDoesNotExist: | 307 case storage.ErrDoesNotExist: |
| 306 return nil, nil | 308 return nil, nil |
| 307 | 309 |
| 308 default: | 310 default: |
| 309 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
| 310 return nil, err | 312 return nil, err |
| 311 } | 313 } |
| 312 } | 314 } |
| OLD | NEW |