Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: client/internal/logdog/butler/bundler/data.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "sync"
9 "time"
10
11 "github.com/luci/luci-go/common/chunkstream"
12 )
13
14 type dataPoolRegistry struct {
15 sync.Mutex
16
17 // pools is a pool of Data instances. It is keyed on buffer size.
18 pools map[int]*dataPool
19 }
20
21 // globaldataPoolRegistry is the default data pool to use by the stream
22 // package.
23 var globalDataPoolRegistry = dataPoolRegistry{}
24
25 func (r *dataPoolRegistry) getPool(s int) *dataPool {
26 r.Lock()
27 defer r.Unlock()
28
29 if r.pools == nil {
30 r.pools = map[int]*dataPool{}
31 }
32
33 pool := r.pools[s]
34 if pool == nil {
35 pool = newPool(s)
36 r.pools[s] = pool
37 }
38 return pool
39 }
40
41 type dataPool struct {
42 sync.Pool
43 size int
44 }
45
46 func newPool(size int) *dataPool {
47 p := dataPool{
48 size: size,
49 }
50 p.New = p.newData
51 return &p
52 }
53
54 func (p *dataPool) newData() interface{} {
55 return &streamData{
56 bufferBase: make([]byte, p.size),
57 releaseFunc: p.release,
58 }
59 }
60
61 func (p *dataPool) getData() Data {
62 d := p.Get().(*streamData)
63 d.reset()
64 return d
65 }
66
67 func (p *dataPool) release(d *streamData) {
68 p.Put(d)
69 }
70
71 // Data is a reusable data buffer that is used by Stream instances to ingest
72 // data.
73 //
74 // Data is initially an empty buffer. Once data is loaded into it, the buffer is
75 // resized to the bound data and a timestamp is attached via Bind.
76 type Data interface {
77 chunkstream.Chunk
78
79 // Bind resizes the Chunk buffer and records a timestamp to associate wi th the
80 // data chunk.
81 Bind(int, time.Time) Data
82
83 // Timestamp returns the bound timestamp. This will be zero if no timest amp
84 // has been bound.
85 Timestamp() time.Time
86 }
87
88 // streamData is an implementation of the Chunk interface for Bundler chunks.
89 //
90 // It includes the ability to bind to a size/timestamp.
91 type streamData struct {
92 bufferBase []byte
93 buffer []byte
94 ts time.Time
95
96 releaseFunc func(*streamData)
97 }
98
99 var _ Data = (*streamData)(nil)
100
101 func (d *streamData) reset() {
102 d.buffer = d.bufferBase
103 }
104
105 func (d *streamData) Bytes() []byte {
106 return d.buffer
107 }
108
109 func (d *streamData) Len() int {
110 return len(d.buffer)
111 }
112
113 func (d *streamData) Bind(amount int, ts time.Time) Data {
114 d.buffer = d.buffer[:amount]
115 d.ts = ts
116 return d
117 }
118
119 func (d *streamData) Timestamp() time.Time {
120 return d.ts
121 }
122
123 func (d *streamData) Release() {
124 if d.releaseFunc != nil {
125 d.releaseFunc(d)
126 }
127 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/counter.go ('k') | client/internal/logdog/butler/bundler/data_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698