Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(36)

Side by Side Diff: common/chunkstream/reader.go

Issue 1413923013: Add `chunk` segmented data library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package chunkstream
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 )
12
13 // Reader is an io.Reader implementation bound to a view of a chunk Buffer.
14 // A Reader attempts to present the Buffer's ordered Chunk instances as a
15 // contiguous stream of bytes.
16 //
17 // A Reader is tightly coupled to the state of its Buffer. If data is Consumed
18 // from its source Buffer, the Reader becomes invalid, and any further accesses
19 // to its methods are undefined.
20 //
21 // In addition to an io.Reader and io.ByteReader interface, Reader offers a
22 // series of utility functions to efficiently traverse the underlying chunk
23 // space.
24 type Reader struct {
iannucci 2015/11/05 01:10:08 rename to View
dnj 2015/11/13 23:22:04 Done.
25 cur *chunkNode
26 cidx int
iannucci 2015/11/05 01:10:08 wat r this? comment pls?
dnj 2015/11/13 23:22:04 Done.
27 size int64
28 consumed int64
29
30 // gen is the generation from which
31 b *Buffer
32 gen int64
33 walkBuf bytes.Buffer
34 }
35
36 var _ interface {
37 io.Reader
38 io.ByteReader
39 } = (*Reader)(nil)
40
41 func (r *Reader) Read(b []byte) (int, error) {
42 total := int64(0)
43 err := error(nil)
44 for len(b) > 0 {
45 chunk := r.chunkBytes()
46 if len(chunk) == 0 {
47 err = io.EOF
48 break
49 }
50
51 amount := copy(b, chunk)
52 total += int64(amount)
53 b = b[amount:]
54 r.Skip(int64(amount))
55 }
56 if r.Remaining() == 0 {
57 err = io.EOF
58 }
59 return int(total), err
60 }
61
62 // ReadByte implements io.ByteReader, reading a single byte from the buffer.
63 func (r *Reader) ReadByte() (byte, error) {
64 chunk := r.chunkBytes()
65 if len(chunk) == 0 {
66 return 0, io.EOF
67 }
68 r.Skip(1)
69 return chunk[0], nil
70 }
71
72 // Remaining returns the number of bytes remaining in the Reader view.
73 func (r *Reader) Remaining() int64 {
74 return r.size
75 }
76
77 // Consumed returns the number of bytes that have been skipped via Skip or
78 // higher-level calls.
79 func (r *Reader) Consumed() int64 {
80 return r.consumed
81 }
82
83 // Skip advances the Reader's view forwards a fixed number of bytes.
84 func (r *Reader) Skip(count int64) int64 {
iannucci 2015/11/05 01:10:08 why return anything?
dnj 2015/11/13 23:22:04 No good reason.
85 start := r.consumed
86 for count > 0 {
87 if r.cur == nil {
88 panic("cannot skip past end buffer")
89 }
90
91 amount := r.chunkRemaining()
92 if count < int64(amount) {
93 amount = int(count)
94 r.cidx += amount
95 } else {
96 // Finished consuming this chunk, move on to the next.
97 r.cur = r.cur.next
98 r.cidx = 0
99 }
100
101 count -= int64(amount)
102 r.consumed += int64(amount)
103 r.size -= int64(amount)
104 }
105 return (r.consumed - start)
106 }
107
108 // Index scans the Reader for the specified needle bytes. If they are
109 // found, their index in the Reader is returned. Otherwise, Index returns
110 // -1.
111 //
112 // The Reader is not modified during the search.
113 func (r *Reader) Index(needle []byte) int64 {
114 if r.Remaining() == 0 {
115 return -1
116 }
117 if len(needle) == 0 {
118 return 0
119 }
120 return r.Clone().indexDestructive(needle)
121 }
122
123 // indexDestructive implements Index by actively mutating the Reader.
124 func (r *Reader) indexDestructive(needle []byte) int64 {
iannucci 2015/11/05 01:10:08 What about making this public like `SkipTo(needle
dnj 2015/11/13 23:22:04 I did clean the internal interface up a bit, but I
125 tbuf := make([]byte, 2*len(needle))
126 initialConsumed := r.consumed
127 idx := int(0)
128 for {
129 data := r.chunkBytes()
130 if len(data) == 0 {
131 return -1
132 }
133
134 // Scan the current chunk for needle. Note that if the current c hunk is too
135 // small to hold needle, this is a no-op.
136 if idx = bytes.Index(data, needle); idx >= 0 {
137 break
138 }
139 if len(data) > len(needle) {
140 // The needle is definitely not in this space.
141 r.Skip(int64(len(data) - len(needle)))
142 }
143
144 // needle isn't in the current chunk; however, it may begin at t he end of
145 // the current chunk and complete in future chunks.
146 //
147 // We will scan a space twice the size of the needle, as otherwi se, this
148 // would end up scanning for one possibility, incrementing by on e, and
149 // repeating via 'for' loop iterations.
150 //
151 // Afterwards, we advance only the size of the needle, as we don 't want to
152 // preclude the needle starting after our last scan range.
153 //
154 // For example, to find needle "NDL":
155 //
156 // AAAAND|L|AAAA
157 // |------|^- [NDLAAA], 0
158 //
159 // AAAAN|D|NDL|AAAA
160 // |------| [ANDNDL], 3
161 //
162 // AAAA|A|A|NDL
163 // |-------| [AAAAND], -1, consume 3 => A|NDL|
164 //
165 //
166 // Note that we perform the read with a cloned Reader so we don' t
167 // actually consume this data.
168 pr := r.Clone()
169 amt, _ := pr.Read(tbuf)
170 if amt < len(needle) {
171 // All remaining buffers cannot hold the needle.
172 return -1
173 }
174
175 if idx = bytes.Index(tbuf[:amt], needle); idx >= 0 {
176 break
177 }
178 r.Skip(int64(len(needle)))
179 }
180 return r.consumed - initialConsumed + int64(idx)
181 }
182
183 // Clone returns a copy of the Reader view.
184 //
185 // The clone is bound to the same underlying Buffer as the source.
186 func (r *Reader) Clone() *Reader {
187 return r.CloneLimit(r.size)
188 }
189
190 // CloneLimit returns a copy of the Reader view, optionally truncating it.
191 //
192 // The clone is bound to the same underlying Buffer as the source.
193 func (r *Reader) CloneLimit(limit int64) *Reader {
194 c := *r
195 if c.size > limit {
196 c.size = limit
197 }
198 return &c
199 }
200
201 func (r *Reader) chunkRemaining() int {
202 r.checkGen()
203
204 if r.cur == nil {
205 return 0
206 }
207 result := r.cur.Len() - r.cidx
208 if int64(result) > r.size {
209 result = int(r.size)
210 }
211 return result
212 }
213
214 func (r *Reader) chunkBytes() []byte {
215 r.checkGen()
216
217 if r.cur == nil {
218 return nil
219 }
220 data := r.cur.Bytes()[r.cidx:]
221 remaining := r.Remaining()
222 if int64(len(data)) > remaining {
223 data = data[:remaining]
224 }
225 return data
226 }
227
228 func (r *Reader) checkGen() {
229 if r.b.gen != r.gen {
230 panic(fmt.Errorf("generation mismatch (%v != %v)", r.b.gen, r.ge n))
231 }
232 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698