| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "sync" |
| 9 "time" |
| 10 |
| 11 "github.com/luci/luci-go/common/chunkstream" |
| 12 ) |
| 13 |
| 14 type dataPoolRegistry struct { |
| 15 sync.Mutex |
| 16 |
| 17 // pools is a pool of Data instances. It is keyed on buffer size. |
| 18 pools map[int]*dataPool |
| 19 } |
| 20 |
| 21 // globaldataPoolRegistry is the default data pool to use by the stream |
| 22 // package. |
| 23 var globalDataPoolRegistry = dataPoolRegistry{} |
| 24 |
| 25 func (r *dataPoolRegistry) getPool(s int) *dataPool { |
| 26 r.Lock() |
| 27 defer r.Unlock() |
| 28 |
| 29 if r.pools == nil { |
| 30 r.pools = map[int]*dataPool{} |
| 31 } |
| 32 |
| 33 pool := r.pools[s] |
| 34 if pool == nil { |
| 35 pool = newPool(s) |
| 36 r.pools[s] = pool |
| 37 } |
| 38 return pool |
| 39 } |
| 40 |
| 41 type dataPool struct { |
| 42 sync.Pool |
| 43 size int |
| 44 } |
| 45 |
| 46 func newPool(size int) *dataPool { |
| 47 p := dataPool{ |
| 48 size: size, |
| 49 } |
| 50 p.New = p.newData |
| 51 return &p |
| 52 } |
| 53 |
| 54 func (p *dataPool) newData() interface{} { |
| 55 return &streamData{ |
| 56 bufferBase: make([]byte, p.size), |
| 57 releaseFunc: p.release, |
| 58 } |
| 59 } |
| 60 |
| 61 func (p *dataPool) getData() Data { |
| 62 d := p.Get().(*streamData) |
| 63 d.reset() |
| 64 return d |
| 65 } |
| 66 |
| 67 func (p *dataPool) release(d *streamData) { |
| 68 p.Put(d) |
| 69 } |
| 70 |
| 71 // Data is a reusable data buffer that is used by Stream instances to ingest |
| 72 // data. |
| 73 // |
| 74 // Data is initially an empty buffer. Once data is loaded into it, the buffer is |
| 75 // resized to the bound data and a timestamp is attached via Bind. |
| 76 type Data interface { |
| 77 chunkstream.Chunk |
| 78 |
| 79 // Bind resizes the Chunk buffer and records a timestamp to associate wi
th the |
| 80 // data chunk. |
| 81 Bind(int, time.Time) Data |
| 82 |
| 83 // Timestamp returns the bound timestamp. This will be zero if no timest
amp |
| 84 // has been bound. |
| 85 Timestamp() time.Time |
| 86 } |
| 87 |
| 88 // streamData is an implementation of the Chunk interface for Bundler chunks. |
| 89 // |
| 90 // It includes the ability to bind to a size/timestamp. |
| 91 type streamData struct { |
| 92 bufferBase []byte |
| 93 buffer []byte |
| 94 ts time.Time |
| 95 |
| 96 releaseFunc func(*streamData) |
| 97 } |
| 98 |
| 99 var _ Data = (*streamData)(nil) |
| 100 |
| 101 func (d *streamData) reset() { |
| 102 d.buffer = d.bufferBase |
| 103 } |
| 104 |
| 105 func (d *streamData) Bytes() []byte { |
| 106 return d.buffer |
| 107 } |
| 108 |
| 109 func (d *streamData) Len() int { |
| 110 return len(d.buffer) |
| 111 } |
| 112 |
| 113 func (d *streamData) Bind(amount int, ts time.Time) Data { |
| 114 d.buffer = d.buffer[:amount] |
| 115 d.ts = ts |
| 116 return d |
| 117 } |
| 118 |
| 119 func (d *streamData) Timestamp() time.Time { |
| 120 return d.ts |
| 121 } |
| 122 |
| 123 func (d *streamData) Release() { |
| 124 if d.releaseFunc != nil { |
| 125 d.releaseFunc(d) |
| 126 } |
| 127 } |
| OLD | NEW |