Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: common/chunkstream/view.go

Issue 1413923013: Add `chunk` segmented data library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Code review comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package chunkstream
6
7 import (
8 "bytes"
9 "io"
10 )
11
12 // View is static read-only snapshot of the contents of the Buffer, presented
13 // as a contiguous stream of bytes.
14 //
15 // View implements the io.Reader and io.ByteReader interfaces. It also offers a
16 // series of utility functions optimized for the chunks.
17 type View struct {
18 // cur is the first node of the view.
19 cur *chunkNode
20 // cidx is the byte offset within cur of the current byte.
21 cidx int
22 // size is the size of thew view. Accesses beyond this size will fail.
23 size int64
24 // consumed is a count of the number of bytes in the view that have been
25 // consumed via Skip().
26 consumed int64
27
28 // b is the Buffer from which this View's snapshot was taken.
29 b *Buffer
30 }
31
32 var _ interface {
33 io.Reader
34 io.ByteReader
35 } = (*View)(nil)
36
37 func (r *View) Read(b []byte) (int, error) {
38 total := int64(0)
39 err := error(nil)
40 for len(b) > 0 {
41 chunk := r.chunkBytes()
42 if len(chunk) == 0 {
43 err = io.EOF
44 break
45 }
46
47 amount := copy(b, chunk)
48 total += int64(amount)
49 b = b[amount:]
50 r.Skip(int64(amount))
51 }
52 if r.Remaining() == 0 {
53 err = io.EOF
54 }
55 return int(total), err
56 }
57
58 // ReadByte implements io.ByteReader, reading a single byte from the buffer.
59 func (r *View) ReadByte() (byte, error) {
60 chunk := r.chunkBytes()
61 if len(chunk) == 0 {
62 return 0, io.EOF
63 }
64 r.Skip(1)
65 return chunk[0], nil
66 }
67
68 // Remaining returns the number of bytes remaining in the Reader view.
69 func (r *View) Remaining() int64 {
70 return r.size
71 }
72
73 // Consumed returns the number of bytes that have been skipped via Skip or
74 // higher-level calls.
75 func (r *View) Consumed() int64 {
76 return r.consumed
77 }
78
79 // Skip advances the View forwards a fixed number of bytes.
80 func (r *View) Skip(count int64) {
81 for count > 0 {
82 if r.cur == nil {
83 panic("cannot skip past end buffer")
84 }
85
86 amount := r.chunkRemaining()
87 if count < int64(amount) {
88 amount = int(count)
89 r.cidx += amount
90 } else {
91 // Finished consuming this chunk, move on to the next.
92 r.cur = r.cur.next
93 r.cidx = 0
94 }
95
96 count -= int64(amount)
97 r.consumed += int64(amount)
98 r.size -= int64(amount)
99 }
100 }
101
102 // Index scans the View for the specified needle bytes. If they are
103 // found, their index in the View is returned. Otherwise, Index returns
104 // -1.
105 //
106 // The View is not modified during the search.
107 func (r *View) Index(needle []byte) int64 {
108 if r.Remaining() == 0 {
109 return -1
110 }
111 if len(needle) == 0 {
112 return 0
113 }
114
115 rc := r.Clone()
116 if !rc.indexDestructive(needle) {
117 return -1
118 }
119 return rc.consumed - r.consumed
120 }
121
122 // indexDestructive implements Index by actively mutating the View.
123 //
124 // It returns true if the needle was found, and false if not. The view will be
125 // mutated regardless.
126 func (r *View) indexDestructive(needle []byte) bool {
127 tbuf := make([]byte, 2*len(needle))
128 idx := int64(0)
129 for {
130 data := r.chunkBytes()
131 if len(data) == 0 {
132 return false
133 }
134
135 // Scan the current chunk for needle. Note that if the current c hunk is too
136 // small to hold needle, this is a no-op.
137 if idx = int64(bytes.Index(data, needle)); idx >= 0 {
138 r.Skip(idx)
139 return true
140 }
141 if len(data) > len(needle) {
142 // The needle is definitely not in this space.
143 r.Skip(int64(len(data) - len(needle)))
144 }
145
146 // needle isn't in the current chunk; however, it may begin at t he end of
147 // the current chunk and complete in future chunks.
148 //
149 // We will scan a space twice the size of the needle, as otherwi se, this
150 // would end up scanning for one possibility, incrementing by on e, and
151 // repeating via 'for' loop iterations.
152 //
153 // Afterwards, we advance only the size of the needle, as we don 't want to
154 // preclude the needle starting after our last scan range.
155 //
156 // For example, to find needle "NDL":
157 //
158 // AAAAND|L|AAAA
159 // |------|^- [NDLAAA], 0
160 //
161 // AAAAN|D|NDL|AAAA
162 // |------| [ANDNDL], 3
163 //
164 // AAAA|A|A|NDL
165 // |-------| [AAAAND], -1, consume 3 => A|NDL|
166 //
167 //
168 // Note that we perform the read with a cloned View so we don't
169 // actually consume this data.
170 pr := r.Clone()
171 amt, _ := pr.Read(tbuf)
172 if amt < len(needle) {
173 // All remaining buffers cannot hold the needle.
174 return false
175 }
176
177 if idx = int64(bytes.Index(tbuf[:amt], needle)); idx >= 0 {
178 r.Skip(idx)
179 return true
180 }
181 r.Skip(int64(len(needle)))
182 }
183 }
184
185 // Clone returns a copy of the View view.
186 //
187 // The clone is bound to the same underlying Buffer as the source.
188 func (r *View) Clone() *View {
189 return r.CloneLimit(r.size)
190 }
191
192 // CloneLimit returns a copy of the View view, optionally truncating it.
193 //
194 // The clone is bound to the same underlying Buffer as the source.
195 func (r *View) CloneLimit(limit int64) *View {
196 c := *r
197 if c.size > limit {
198 c.size = limit
199 }
200 return &c
201 }
202
203 func (r *View) chunkRemaining() int {
204 if r.cur == nil {
205 return 0
206 }
207 result := r.cur.length() - r.cidx
208 if int64(result) > r.size {
209 result = int(r.size)
210 }
211 return result
212 }
213
214 func (r *View) chunkBytes() []byte {
215 if r.cur == nil {
216 return nil
217 }
218 data := r.cur.Bytes()[r.cidx:]
219 remaining := r.Remaining()
220 if int64(len(data)) > remaining {
221 data = data[:remaining]
222 }
223 return data
224 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698