| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "sync" |
| 10 "sync/atomic" |
| 11 |
| 12 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" |
| 13 "github.com/luci/luci-go/common/logdog/types" |
| 14 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| 15 "github.com/luci/luci-go/server/logdog/storage" |
| 16 "golang.org/x/net/context" |
| 17 ) |
| 18 |
| 19 // testCoordinatorClient is an implementation of CoordinatorClient that can be |
| 20 // used for testing. |
| 21 type testCoordinatorClient struct { |
| 22 sync.Mutex |
| 23 |
| 24 // calls is the number of calls made to the interface's methods. |
| 25 calls int32 |
| 26 // callC, if not nil, will have a token pushed to it when a call is made
. |
| 27 callC chan struct{} |
| 28 // errC is a channel that error status will be read from if not nil. |
| 29 errC chan error |
| 30 |
| 31 // state is the latest tracked stream state. |
| 32 state map[string]*stateProxy |
| 33 } |
| 34 |
| 35 func (c *testCoordinatorClient) register(s stateProxy) stateProxy { |
| 36 c.Lock() |
| 37 defer c.Unlock() |
| 38 |
| 39 // Update our state. |
| 40 if c.state == nil { |
| 41 c.state = make(map[string]*stateProxy) |
| 42 } |
| 43 if sp := c.state[string(s.path)]; sp != nil { |
| 44 return *sp |
| 45 } |
| 46 c.state[string(s.path)] = &s |
| 47 return s |
| 48 } |
| 49 |
| 50 func (c *testCoordinatorClient) RegisterStream(ctx context.Context, s cc.State)
(*cc.State, error) { |
| 51 if err := c.incCalls(); err != nil { |
| 52 return nil, err |
| 53 } |
| 54 |
| 55 // Update our state. |
| 56 sp := c.register(stateProxy{ |
| 57 path: s.Path, |
| 58 proto: s.ProtoVersion, |
| 59 secret: s.Secret, |
| 60 terminalIndex: -1, |
| 61 archived: s.Archived(), |
| 62 purged: (s.State != nil && s.State.Purged), |
| 63 }) |
| 64 |
| 65 // Set the ProtoVersion to differentiate the output State from the input
. |
| 66 rs := cc.State{ |
| 67 Path: s.Path, |
| 68 ProtoVersion: "remote", |
| 69 Secret: s.Secret, |
| 70 Descriptor: s.Descriptor, |
| 71 State: &service.LogStreamState{ |
| 72 TerminalIndex: int64(sp.terminalIndex), |
| 73 Purged: sp.purged, |
| 74 }, |
| 75 } |
| 76 if sp.archived { |
| 77 rs.State.ArchiveStreamURL = "something so we are marked as archi
ved" |
| 78 } |
| 79 return &rs, nil |
| 80 } |
| 81 |
| 82 func (c *testCoordinatorClient) TerminateStream(ctx context.Context, p types.Str
eamPath, s []byte, idx types.MessageIndex) error { |
| 83 if err := c.incCalls(); err != nil { |
| 84 return err |
| 85 } |
| 86 |
| 87 c.Lock() |
| 88 defer c.Unlock() |
| 89 |
| 90 // Update our state. |
| 91 sp, ok := c.state[string(p)] |
| 92 if !ok { |
| 93 return fmt.Errorf("no such stream: %v", p) |
| 94 } |
| 95 if sp.terminalIndex >= 0 && idx != sp.terminalIndex { |
| 96 return fmt.Errorf("incompatible terminal indexes: %d != %d", idx
, sp.terminalIndex) |
| 97 } |
| 98 |
| 99 sp.terminalIndex = idx |
| 100 return nil |
| 101 } |
| 102 |
| 103 // incCalls is an entry point for client goroutines. It offers the opportunity |
| 104 // to track call count as well as trap executing goroutines within client calls. |
| 105 // |
| 106 // This must not be called while the lock is held, else it could lead to |
| 107 // deadlock if multiple goroutines are trapped. |
| 108 func (c *testCoordinatorClient) incCalls() error { |
| 109 if c.callC != nil { |
| 110 c.callC <- struct{}{} |
| 111 } |
| 112 |
| 113 atomic.AddInt32(&c.calls, 1) |
| 114 |
| 115 if c.errC != nil { |
| 116 return <-c.errC |
| 117 } |
| 118 return nil |
| 119 } |
| 120 |
| 121 func (c *testCoordinatorClient) stream(name string) (int, bool) { |
| 122 c.Lock() |
| 123 defer c.Unlock() |
| 124 |
| 125 sp, ok := c.state[name] |
| 126 if !ok { |
| 127 return 0, false |
| 128 } |
| 129 return int(sp.terminalIndex), true |
| 130 } |
| 131 |
| 132 // testStorage is a testing storage instance that returns errors. |
| 133 type testStorage struct { |
| 134 storage.Storage |
| 135 err func() error |
| 136 } |
| 137 |
| 138 func (s *testStorage) Put(r *storage.PutRequest) error { |
| 139 if s.err != nil { |
| 140 if err := s.err(); err != nil { |
| 141 return err |
| 142 } |
| 143 } |
| 144 return s.Storage.Put(r) |
| 145 } |
| OLD | NEW |