Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(459)

Side by Side Diff: common/chunkstream/view.go

Issue 1413923013: Add `chunk` segmented data library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Use errors in pacnic. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/chunkstream/chunk_test.go ('k') | common/chunkstream/view_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package chunkstream
6
7 import (
8 "bytes"
9 "errors"
10 "io"
11 )
12
13 // View is static read-only snapshot of the contents of the Buffer, presented
14 // as a contiguous stream of bytes.
15 //
16 // View implements the io.Reader and io.ByteReader interfaces. It also offers a
17 // series of utility functions optimized for the chunks.
18 type View struct {
19 // cur is the first node of the view.
20 cur *chunkNode
21 // cidx is the byte offset within cur of the current byte.
22 cidx int
23 // size is the size of thew view. Accesses beyond this size will fail.
24 size int64
25 // consumed is a count of the number of bytes in the view that have been
26 // consumed via Skip().
27 consumed int64
28
29 // b is the Buffer from which this View's snapshot was taken.
30 b *Buffer
31 }
32
33 var _ interface {
34 io.Reader
35 io.ByteReader
36 } = (*View)(nil)
37
38 func (r *View) Read(b []byte) (int, error) {
39 total := int64(0)
40 err := error(nil)
41 for len(b) > 0 {
42 chunk := r.chunkBytes()
43 if len(chunk) == 0 {
44 err = io.EOF
45 break
46 }
47
48 amount := copy(b, chunk)
49 total += int64(amount)
50 b = b[amount:]
51 r.Skip(int64(amount))
52 }
53 if r.Remaining() == 0 {
54 err = io.EOF
55 }
56 return int(total), err
57 }
58
59 // ReadByte implements io.ByteReader, reading a single byte from the buffer.
60 func (r *View) ReadByte() (byte, error) {
61 chunk := r.chunkBytes()
62 if len(chunk) == 0 {
63 return 0, io.EOF
64 }
65 r.Skip(1)
66 return chunk[0], nil
67 }
68
69 // Remaining returns the number of bytes remaining in the Reader view.
70 func (r *View) Remaining() int64 {
71 return r.size
72 }
73
74 // Consumed returns the number of bytes that have been skipped via Skip or
75 // higher-level calls.
76 func (r *View) Consumed() int64 {
77 return r.consumed
78 }
79
80 // Skip advances the View forwards a fixed number of bytes.
81 func (r *View) Skip(count int64) {
82 for count > 0 {
83 if r.cur == nil {
84 panic(errors.New("cannot skip past end buffer"))
85 }
86
87 amount := r.chunkRemaining()
88 if count < int64(amount) {
89 amount = int(count)
90 r.cidx += amount
91 } else {
92 // Finished consuming this chunk, move on to the next.
93 r.cur = r.cur.next
94 r.cidx = 0
95 }
96
97 count -= int64(amount)
98 r.consumed += int64(amount)
99 r.size -= int64(amount)
100 }
101 }
102
103 // Index scans the View for the specified needle bytes. If they are
104 // found, their index in the View is returned. Otherwise, Index returns
105 // -1.
106 //
107 // The View is not modified during the search.
108 func (r *View) Index(needle []byte) int64 {
109 if r.Remaining() == 0 {
110 return -1
111 }
112 if len(needle) == 0 {
113 return 0
114 }
115
116 rc := r.Clone()
117 if !rc.indexDestructive(needle) {
118 return -1
119 }
120 return rc.consumed - r.consumed
121 }
122
123 // indexDestructive implements Index by actively mutating the View.
124 //
125 // It returns true if the needle was found, and false if not. The view will be
126 // mutated regardless.
127 func (r *View) indexDestructive(needle []byte) bool {
128 tbuf := make([]byte, 2*len(needle))
129 idx := int64(0)
130 for {
131 data := r.chunkBytes()
132 if len(data) == 0 {
133 return false
134 }
135
136 // Scan the current chunk for needle. Note that if the current c hunk is too
137 // small to hold needle, this is a no-op.
138 if idx = int64(bytes.Index(data, needle)); idx >= 0 {
139 r.Skip(idx)
140 return true
141 }
142 if len(data) > len(needle) {
143 // The needle is definitely not in this space.
144 r.Skip(int64(len(data) - len(needle)))
145 }
146
147 // needle isn't in the current chunk; however, it may begin at t he end of
148 // the current chunk and complete in future chunks.
149 //
150 // We will scan a space twice the size of the needle, as otherwi se, this
151 // would end up scanning for one possibility, incrementing by on e, and
152 // repeating via 'for' loop iterations.
153 //
154 // Afterwards, we advance only the size of the needle, as we don 't want to
155 // preclude the needle starting after our last scan range.
156 //
157 // For example, to find needle "NDL":
158 //
159 // AAAAND|L|AAAA
160 // |------|^- [NDLAAA], 0
161 //
162 // AAAAN|D|NDL|AAAA
163 // |------| [ANDNDL], 3
164 //
165 // AAAA|A|A|NDL
166 // |-------| [AAAAND], -1, consume 3 => A|NDL|
167 //
168 //
169 // Note that we perform the read with a cloned View so we don't
170 // actually consume this data.
171 pr := r.Clone()
172 amt, _ := pr.Read(tbuf)
173 if amt < len(needle) {
174 // All remaining buffers cannot hold the needle.
175 return false
176 }
177
178 if idx = int64(bytes.Index(tbuf[:amt], needle)); idx >= 0 {
179 r.Skip(idx)
180 return true
181 }
182 r.Skip(int64(len(needle)))
183 }
184 }
185
186 // Clone returns a copy of the View view.
187 //
188 // The clone is bound to the same underlying Buffer as the source.
189 func (r *View) Clone() *View {
190 return r.CloneLimit(r.size)
191 }
192
193 // CloneLimit returns a copy of the View view, optionally truncating it.
194 //
195 // The clone is bound to the same underlying Buffer as the source.
196 func (r *View) CloneLimit(limit int64) *View {
197 c := *r
198 if c.size > limit {
199 c.size = limit
200 }
201 return &c
202 }
203
204 func (r *View) chunkRemaining() int {
205 if r.cur == nil {
206 return 0
207 }
208 result := r.cur.length() - r.cidx
209 if int64(result) > r.size {
210 result = int(r.size)
211 }
212 return result
213 }
214
215 func (r *View) chunkBytes() []byte {
216 if r.cur == nil {
217 return nil
218 }
219 data := r.cur.Bytes()[r.cidx:]
220 remaining := r.Remaining()
221 if int64(len(data)) > remaining {
222 data = data[:remaining]
223 }
224 return data
225 }
OLDNEW
« no previous file with comments | « common/chunkstream/chunk_test.go ('k') | common/chunkstream/view_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698