| Index: common/recordio/reader.go
|
| diff --git a/common/recordio/reader.go b/common/recordio/reader.go
|
| index 97ae03237a5810997c07bef53af43c0151da9ea8..b71d1d266f4ebc72c775b8619977304e15d71dd3 100644
|
| --- a/common/recordio/reader.go
|
| +++ b/common/recordio/reader.go
|
| @@ -5,6 +5,7 @@
|
| package recordio
|
|
|
| import (
|
| + "bytes"
|
| "encoding/binary"
|
| "fmt"
|
| "io"
|
| @@ -102,3 +103,35 @@ func (r *simpleByteReader) ReadByte() (byte, error) {
|
| _, err := r.Read(r.buf[:])
|
| return r.buf[0], err
|
| }
|
| +
|
| +// Split splits the supplied buffer into its component records.
|
| +//
|
| +// This method implements zero-copy segmentation, so the individual records are
|
| +// slices of the original data set.
|
| +func Split(data []byte) (records [][]byte, err error) {
|
| + br := bytes.NewReader(data)
|
| +
|
| + for br.Len() > 0 {
|
| + var size uint64
|
| + size, err = binary.ReadUvarint(br)
|
| + if err != nil {
|
| + return
|
| + }
|
| + if size > uint64(br.Len()) {
|
| + err = ErrFrameTooLarge
|
| + return
|
| + }
|
| +
|
| + // Pull out the record from the original byte stream without copying.
|
| + // Casting size to an integer is safe at this point, since we have asserted
|
| + // that it is less than the remaining length in the buffer, which is an int.
|
| + offset := len(data) - br.Len()
|
| + records = append(records, data[offset:offset+int(size)])
|
| +
|
| + if _, err := br.Seek(int64(size), 1); err != nil {
|
| + // Our measurements should protect us from this being an invalid seek.
|
| + panic(err)
|
| + }
|
| + }
|
| + return records, nil
|
| +}
|
|
|