| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "time" |
| 10 "unicode/utf8" |
| 11 |
| 12 "github.com/luci/luci-go/common/logdog/protocol" |
| 13 ) |
| 14 |
| 15 const ( |
| 16 posixNewline = "\n" |
| 17 windowsNewline = "\r\n" |
| 18 ) |
| 19 |
| 20 var ( |
| 21 posixNewlineBytes = []byte(posixNewline) |
| 22 ) |
| 23 |
| 24 // textParser is a parser implementation for the LogDog TEXT stream type. |
| 25 type textParser struct { |
| 26 baseParser |
| 27 |
| 28 sequence int64 |
| 29 buf bytes.Buffer |
| 30 } |
| 31 |
| 32 var _ parser = (*textParser)(nil) |
| 33 |
| 34 func (p *textParser) nextEntry(c *constraints) (*protocol.LogEntry, error) { |
| 35 limit := int64(c.limit) |
| 36 ts := time.Time{} |
| 37 txt := protocol.Text{} |
| 38 lineCount := 0 |
| 39 for limit > 0 { |
| 40 br := p.ViewLimit(limit) |
| 41 if br.Remaining() == 0 { |
| 42 // Exceeded either limit or available buffer data. |
| 43 break |
| 44 } |
| 45 |
| 46 // Use the timestamp of the first data chunk. |
| 47 if len(txt.Lines) == 0 { |
| 48 ts, _ = p.firstChunkTime() |
| 49 } else if ct, _ := p.firstChunkTime(); !ct.Equal(ts) { |
| 50 // New timestamp, so need new LogEntry. |
| 51 break |
| 52 } |
| 53 |
| 54 // Find the index of our delimiter. |
| 55 // |
| 56 // We do this using a cross-platform approach that works on POSI
X systems, |
| 57 // Mac (>=OSX), and Windows: we scan for "\n", then look backwar
ds to see if |
| 58 // it was preceded by "\r" (for Windows-style newlines, "\r\n"). |
| 59 idx := br.Index(posixNewlineBytes) |
| 60 |
| 61 newline := "" |
| 62 if idx >= 0 { |
| 63 br = br.CloneLimit(idx) |
| 64 newline = posixNewline |
| 65 } else if !c.allowSplit { |
| 66 // No delimiter within our limit, and we're not allowed
to split, so we're |
| 67 // done. |
| 68 break |
| 69 } |
| 70 |
| 71 // Load the exportable data into our buffer. |
| 72 p.buf.Reset() |
| 73 p.buf.ReadFrom(br) |
| 74 |
| 75 // Does our exportable buffer end with "\r"? If so, treat it as
a possible |
| 76 // Windows newline sequence. |
| 77 if p.buf.Len() > 0 && p.buf.Bytes()[p.buf.Len()-1] == byte('\r')
{ |
| 78 split := false |
| 79 if newline != "" { |
| 80 // "\n" => "\r\n" |
| 81 newline = windowsNewline |
| 82 split = true |
| 83 } else { |
| 84 // If we're closed and this is the last byte in
the stream, it is a |
| 85 // dangling "\r" and we should include it. Other
wise, leave it for the |
| 86 // next round. |
| 87 split = !(c.closed && int64(p.buf.Len()) == p.Le
n()) |
| 88 } |
| 89 |
| 90 if split { |
| 91 p.buf.Truncate(p.buf.Len() - 1) |
| 92 } |
| 93 } |
| 94 |
| 95 partial := (idx < 0) |
| 96 if !partial { |
| 97 lineCount++ |
| 98 } |
| 99 |
| 100 // If we didn't have a delimiter, make sure we don't terminate i
n the middle |
| 101 // of a UTF8 character. |
| 102 if partial { |
| 103 count := 0 |
| 104 lidx := -1 |
| 105 b := p.buf.Bytes() |
| 106 for len(b) > 0 { |
| 107 r, sz := utf8.DecodeRune(b) |
| 108 count += sz |
| 109 if r != utf8.RuneError { |
| 110 lidx = count |
| 111 } |
| 112 b = b[sz:] |
| 113 } |
| 114 if lidx < 0 { |
| 115 break |
| 116 } |
| 117 p.buf.Truncate(lidx) |
| 118 } |
| 119 |
| 120 txt.Lines = append(txt.Lines, &protocol.Text_Line{ |
| 121 Value: p.buf.String(), |
| 122 Delimiter: newline, |
| 123 }) |
| 124 p.Consume(int64(p.buf.Len() + len(newline))) |
| 125 limit -= int64(p.buf.Len() + len(newline)) |
| 126 } |
| 127 |
| 128 if len(txt.Lines) == 0 { |
| 129 return nil, nil |
| 130 } |
| 131 le := p.baseLogEntry(ts) |
| 132 le.Sequence = uint64(p.sequence) |
| 133 le.Content = &protocol.LogEntry_Text{Text: &txt} |
| 134 |
| 135 p.sequence += int64(lineCount) |
| 136 return le, nil |
| 137 } |
| OLD | NEW |