Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 47 type Options struct { | 47 type Options struct { |
| 48 // IndexURL is the Google Storage URL for the stream's index. | 48 // IndexURL is the Google Storage URL for the stream's index. |
| 49 IndexURL string | 49 IndexURL string |
| 50 // StreamURL is the Google Storage URL for the stream's entries. | 50 // StreamURL is the Google Storage URL for the stream's entries. |
| 51 StreamURL string | 51 StreamURL string |
| 52 | 52 |
| 53 // Client is the HTTP client to use for authentication. | 53 // Client is the HTTP client to use for authentication. |
| 54 // | 54 // |
| 55 // Closing this Storage instance does not close the underlying Client. | 55 // Closing this Storage instance does not close the underlying Client. |
| 56 Client gs.Client | 56 Client gs.Client |
| 57 | |
| 58 // MaxBytes, if >0, is the maximum number of bytes to fetch in any given | |
| 59 // request. This should be set for GAE fetches, as large log streams may | |
| 60 // exceed the urlfetch system's maximum response size otherwise. | |
| 61 // | |
| 62 // This is the number of bytes to request, not the number of bytes of lo g data | |
| 63 // to return. The difference is that the former includes the RecordIO fr ame | |
| 64 // headers. | |
| 65 MaxBytes int | |
| 57 } | 66 } |
| 58 | 67 |
| 59 type storageImpl struct { | 68 type storageImpl struct { |
| 60 *Options | 69 *Options |
| 61 context.Context | 70 context.Context |
| 62 | 71 |
| 63 streamPath gs.Path | 72 streamPath gs.Path |
| 64 indexPath gs.Path | 73 indexPath gs.Path |
| 65 | 74 |
| 66 indexMu sync.Mutex | 75 indexMu sync.Mutex |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 103 return err | 112 return err |
| 104 } | 113 } |
| 105 | 114 |
| 106 // Identify the byte offsets that we want to fetch from the entries stre am. | 115 // Identify the byte offsets that we want to fetch from the entries stre am. |
| 107 st := s.buildGetStrategy(&req, idx) | 116 st := s.buildGetStrategy(&req, idx) |
| 108 if st.lastIndex == -1 || req.Index > st.lastIndex { | 117 if st.lastIndex == -1 || req.Index > st.lastIndex { |
| 109 // No records to read. | 118 // No records to read. |
| 110 return nil | 119 return nil |
| 111 } | 120 } |
| 112 | 121 |
| 113 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ | 122 » offset := int64(st.startOffset) |
|
nodir
2016/04/19 23:41:17
st.offset()?
inline it to be consistent with lengt
dnj
2016/04/20 00:45:07
length is calculated, offset is not. There's no re
nodir
2016/04/20 16:42:57
make a function? It is already made. Otherwise why
dnj
2016/04/20 17:07:25
st.startOffset is not a struct member variable, no
dnj
2016/04/20 17:08:41
This should read, "st.startOffset *is* a struct me
nodir
2016/04/20 17:09:48
st variable is of type *getStrategy
*getStrategy h
dnj
2016/04/20 17:16:14
Ah I see, I didn't realize that was there. This is
| |
| 114 » » From: st.from, | 123 » log.Fields{ |
| 115 » » To: st.to, | 124 » » "offset": offset, |
| 116 » }) | 125 » » "length": st.length(), |
| 126 » » "path": s.streamPath, | |
| 127 » }.Debugf(s, "Creating stream reader for range.") | |
| 128 » r, err := s.Client.NewReader(s.streamPath, offset, st.length()) | |
| 117 if err != nil { | 129 if err != nil { |
| 118 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 130 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 119 return err | 131 return err |
| 120 } | 132 } |
| 121 defer func() { | 133 defer func() { |
| 122 if err := r.Close(); err != nil { | 134 if err := r.Close(); err != nil { |
| 123 log.WithError(err).Warningf(s, "Error closing stream Rea der.") | 135 log.WithError(err).Warningf(s, "Error closing stream Rea der.") |
| 124 } | 136 } |
| 125 }() | 137 }() |
| 126 cr := iotools.CountingReader{Reader: r} | 138 cr := iotools.CountingReader{Reader: r} |
| 127 rio := recordio.NewReader(&cr, maxStreamRecordSize) | 139 rio := recordio.NewReader(&cr, maxStreamRecordSize) |
| 128 | 140 |
| 129 buf := bytes.Buffer{} | 141 buf := bytes.Buffer{} |
| 130 le := logpb.LogEntry{} | 142 le := logpb.LogEntry{} |
| 131 » max := req.Limit | 143 » max := st.count |
| 132 for { | 144 for { |
| 133 » » offset := st.from + cr.Count() | 145 » » offset += cr.Count() |
| 134 | 146 |
| 135 sz, r, err := rio.ReadFrame() | 147 sz, r, err := rio.ReadFrame() |
| 136 switch err { | 148 switch err { |
| 137 case nil: | 149 case nil: |
| 138 break | 150 break |
| 139 | 151 |
| 140 case io.EOF: | 152 case io.EOF: |
| 141 return nil | 153 return nil |
| 142 | 154 |
| 143 default: | 155 default: |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 198 return nil, 0, err | 210 return nil, 0, err |
| 199 } | 211 } |
| 200 | 212 |
| 201 // Get the offset of the last record. | 213 // Get the offset of the last record. |
| 202 if len(idx.Entries) == 0 { | 214 if len(idx.Entries) == 0 { |
| 203 return nil, 0, nil | 215 return nil, 0, nil |
| 204 } | 216 } |
| 205 lle := idx.Entries[len(idx.Entries)-1] | 217 lle := idx.Entries[len(idx.Entries)-1] |
| 206 | 218 |
| 207 // Get a Reader for the Tail entry. | 219 // Get a Reader for the Tail entry. |
| 208 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ | 220 » r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1) |
| 209 » » From: int64(lle.Offset), | |
| 210 » }) | |
| 211 if err != nil { | 221 if err != nil { |
| 212 log.Fields{ | 222 log.Fields{ |
| 213 log.ErrorKey: err, | 223 log.ErrorKey: err, |
| 214 "offset": lle.Offset, | 224 "offset": lle.Offset, |
| 215 }.Errorf(s, "Failed to create reader.") | 225 }.Errorf(s, "Failed to create reader.") |
| 216 return nil, 0, err | 226 return nil, 0, err |
| 217 } | 227 } |
| 218 defer func() { | 228 defer func() { |
| 219 if err := r.Close(); err != nil { | 229 if err := r.Close(); err != nil { |
| 220 log.WithError(err).Warningf(s, "Failed to close Reader." ) | 230 log.WithError(err).Warningf(s, "Failed to close Reader." ) |
| 221 } | 231 } |
| 222 }() | 232 }() |
| 223 | 233 |
| 224 rio := recordio.NewReader(r, maxStreamRecordSize) | 234 rio := recordio.NewReader(r, maxStreamRecordSize) |
| 225 d, err := rio.ReadFrameAll() | 235 d, err := rio.ReadFrameAll() |
| 226 if err != nil { | 236 if err != nil { |
| 227 log.WithError(err).Errorf(s, "Failed to read log frame.") | 237 log.WithError(err).Errorf(s, "Failed to read log frame.") |
| 228 return nil, 0, err | 238 return nil, 0, err |
| 229 } | 239 } |
| 230 | 240 |
| 231 return d, types.MessageIndex(lle.StreamIndex), nil | 241 return d, types.MessageIndex(lle.StreamIndex), nil |
| 232 } | 242 } |
| 233 | 243 |
| 234 // getIndex returns the cached log stream index, fetching it if necessary. | 244 // getIndex returns the cached log stream index, fetching it if necessary. |
| 235 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 245 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 236 s.indexMu.Lock() | 246 s.indexMu.Lock() |
| 237 defer s.indexMu.Unlock() | 247 defer s.indexMu.Unlock() |
| 238 | 248 |
| 239 if s.index == nil { | 249 if s.index == nil { |
| 240 » » r, err := s.Client.NewReader(s.indexPath, gs.Options{}) | 250 » » r, err := s.Client.NewReader(s.indexPath, 0, -1) |
| 241 if err != nil { | 251 if err != nil { |
| 242 log.WithError(err).Errorf(s, "Failed to create index Rea der.") | 252 log.WithError(err).Errorf(s, "Failed to create index Rea der.") |
| 243 return nil, err | 253 return nil, err |
| 244 } | 254 } |
| 245 defer func() { | 255 defer func() { |
| 246 if err := r.Close(); err != nil { | 256 if err := r.Close(); err != nil { |
| 247 log.WithError(err).Warningf(s, "Error closing in dex Reader.") | 257 log.WithError(err).Warningf(s, "Error closing in dex Reader.") |
| 248 } | 258 } |
| 249 }() | 259 }() |
| 250 indexData, err := ioutil.ReadAll(r) | 260 indexData, err := ioutil.ReadAll(r) |
| 251 if err != nil { | 261 if err != nil { |
| 252 log.WithError(err).Errorf(s, "Failed to read index.") | 262 log.WithError(err).Errorf(s, "Failed to read index.") |
| 253 return nil, err | 263 return nil, err |
| 254 } | 264 } |
| 255 | 265 |
| 256 index := logpb.LogIndex{} | 266 index := logpb.LogIndex{} |
| 257 if err := proto.Unmarshal(indexData, &index); err != nil { | 267 if err := proto.Unmarshal(indexData, &index); err != nil { |
| 258 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") | 268 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") |
| 259 return nil, err | 269 return nil, err |
| 260 } | 270 } |
| 261 | 271 |
| 262 s.index = &index | 272 s.index = &index |
| 263 } | 273 } |
| 264 return s.index, nil | 274 return s.index, nil |
| 265 } | 275 } |
| 266 | 276 |
| 267 type getStrategy struct { | 277 type getStrategy struct { |
| 268 » // from is the beginning byte offset of the log entry stream. | 278 » // startOffset is the beginning byte offset of the log entry stream. |
| 269 » from int64 | 279 » startOffset uint64 |
| 270 » // to is the ending byte offset of the log entry stream. | 280 » // endOffset is the ending byte offset of the log entry stream. |
| 271 » to int64 | 281 » endOffset uint64 |
| 272 | 282 |
| 283 // count is the number of log entries that will be fetched. | |
| 284 count int | |
| 273 // lastIndex is the last log entry index in the stream. This will be -1 if | 285 // lastIndex is the last log entry index in the stream. This will be -1 if |
| 274 // there are no entries in the stream. | 286 // there are no entries in the stream. |
| 275 lastIndex types.MessageIndex | 287 lastIndex types.MessageIndex |
| 276 } | 288 } |
| 277 | 289 |
| 290 func (gs *getStrategy) offset() int64 { | |
| 291 return int64(gs.startOffset) | |
| 292 } | |
| 293 | |
| 294 func (gs *getStrategy) length() int64 { | |
| 295 if gs.startOffset < gs.endOffset { | |
| 296 return int64(gs.endOffset - gs.startOffset) | |
| 297 } | |
| 298 return -1 | |
| 299 } | |
| 300 | |
| 301 // setCount sets the `count` field. If called multiple times, the smallest | |
| 302 // assigned value will be retained. | |
| 303 func (gs *getStrategy) setCount(v int) { | |
| 304 if gs.count <= 0 || gs.count > v { | |
| 305 gs.count = v | |
| 306 } | |
| 307 } | |
| 308 | |
| 309 // setEndOffset sets the `length` field. If called multiple times, the smallest | |
| 310 // assigned value will be retained. | |
| 311 func (gs *getStrategy) setEndOffset(v uint64) { | |
| 312 if gs.endOffset == 0 || gs.endOffset > v { | |
| 313 gs.endOffset = v | |
| 314 } | |
| 315 } | |
| 316 | |
| 278 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { | 317 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn dex) *getStrategy { |
| 279 st := getStrategy{} | 318 st := getStrategy{} |
| 280 | 319 |
| 281 if len(idx.Entries) == 0 { | 320 if len(idx.Entries) == 0 { |
| 282 st.lastIndex = -1 | 321 st.lastIndex = -1 |
| 283 return &st | 322 return &st |
| 284 } | 323 } |
| 285 | 324 |
| 286 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index) | 325 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream Index) |
| 287 startIdx := indexEntryFor(idx.Entries, req.Index) | 326 startIdx := indexEntryFor(idx.Entries, req.Index) |
| 288 if startIdx < 0 { | 327 if startIdx < 0 { |
| 289 startIdx = 0 | 328 startIdx = 0 |
| 290 } | 329 } |
| 291 le := idx.Entries[startIdx] | 330 le := idx.Entries[startIdx] |
| 292 » st.from = int64(le.Offset) | 331 » st.startOffset = le.Offset |
| 293 | 332 |
| 294 » // If we have a limit, and we have enough index entries to upper-bound o ur | 333 » // Determine an upper bound based on our limits. |
| 295 » // stream based on that limit, use that. | |
| 296 // | 334 // |
| 297 » // Note that this may overshoot if the index and/or stream is sparse. We know | 335 » // If we have a count limit, and we have enough index entries to upper-b ound |
| 298 » // for sure that we have one LogEntry per index entry, so that's the bes t we | 336 » // our stream based on that limit, use that. Note that this may overshoo t if |
| 299 » // can do. | 337 » // the index and/or stream is sparse. We know for sure that we have one |
| 338 » // LogEntry per index entry, so that's the best we can do. | |
| 300 if req.Limit > 0 { | 339 if req.Limit > 0 { |
| 301 if ub := startIdx + req.Limit; ub < len(idx.Entries) { | 340 if ub := startIdx + req.Limit; ub < len(idx.Entries) { |
| 302 » » » st.to = int64(idx.Entries[ub].Offset) | 341 » » » st.setEndOffset(idx.Entries[ub].Offset) |
| 342 » » } | |
| 343 » » st.setCount(req.Limit) | |
| 344 » } | |
| 345 | |
| 346 » // If we have a byte limit, count the entry sizes until we reach that li mit. | |
| 347 » if mb := int64(s.MaxBytes); mb > 0 { | |
| 348 » » mb := uint64(mb) | |
| 349 | |
| 350 » » for i, e := range idx.Entries[startIdx:] { | |
| 351 » » » if e.Offset < st.startOffset { | |
| 352 » » » » // This shouldn't really happen, but it could ha ppen if there is a | |
| 353 » » » » // corrupt index. | |
| 354 » » » » continue | |
| 355 » » » } | |
| 356 | |
| 357 » » » // Calculate the request offset and truncate if we've ex ceeded our maximum | |
| 358 » » » // request bytes. | |
| 359 » » » if size := (e.Offset - st.startOffset); size > mb { | |
| 360 » » » » st.setEndOffset(e.Offset) | |
| 361 » » » » st.setCount(i) | |
| 362 » » » » break | |
| 363 » » » } | |
| 303 } | 364 } |
| 304 } | 365 } |
| 305 return &st | 366 return &st |
| 306 } | 367 } |
| 307 | 368 |
| 308 // indexEntryFor identifies the log index entry closest (<=) to the specified | 369 // indexEntryFor identifies the log index entry closest (<=) to the specified |
| 309 // index. | 370 // index. |
| 310 // | 371 // |
| 311 // If the first index entry is greater than our search index, -1 will be | 372 // If the first index entry is greater than our search index, -1 will be |
| 312 // returned. This should never happen in practice, though, since our index | 373 // returned. This should never happen in practice, though, since our index |
| 313 // construction always indexes log entry #0. | 374 // construction always indexes log entry #0. |
| 314 // | 375 // |
| 315 // It does this by performing a binary search over the index entries. | 376 // It does this by performing a binary search over the index entries. |
| 316 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { | 377 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { |
| 317 ui := uint64(i) | 378 ui := uint64(i) |
| 318 s := sort.Search(len(entries), func(i int) bool { | 379 s := sort.Search(len(entries), func(i int) bool { |
| 319 return entries[i].StreamIndex > ui | 380 return entries[i].StreamIndex > ui |
| 320 }) | 381 }) |
| 321 | 382 |
| 322 // The returned index is the one immediately after the index that we wan t. If | 383 // The returned index is the one immediately after the index that we wan t. If |
| 323 // our search returned 0, the first index entry is > our search entry, a nd we | 384 // our search returned 0, the first index entry is > our search entry, a nd we |
| 324 // will return nil. | 385 // will return nil. |
| 325 return s - 1 | 386 return s - 1 |
| 326 } | 387 } |
| OLD | NEW |