| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamserver |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "io" |
| 10 "net" |
| 11 "sync" |
| 12 "time" |
| 13 |
| 14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 15 "github.com/luci/luci-go/common/iotools" |
| 16 log "github.com/luci/luci-go/common/logging" |
| 17 "golang.org/x/net/context" |
| 18 ) |
| 19 |
| 20 // An client ID, used for logging only |
| 21 type clientID int |
| 22 |
| 23 // namedPipeStreamParams are parameters representing a negotiated stream ready |
| 24 // to deliver. |
| 25 type namedPipeStreamParams struct { |
| 26 // The stream's ReadCloser connection. |
| 27 rc io.ReadCloser |
| 28 // Negotiated stream properties. |
| 29 properties *streamproto.Properties |
| 30 } |
| 31 |
| 32 type namedPipeGenFunc func() (net.Listener, error) |
| 33 |
| 34 // Base class for OS-specific stream server implementations. |
| 35 type namedPipeServer struct { |
| 36 context.Context |
| 37 |
| 38 gen func() (net.Listener, error) |
| 39 streamParamsC chan *namedPipeStreamParams |
| 40 closedC chan struct{} |
| 41 acceptFinishedC chan struct{} |
| 42 l net.Listener // The Listener to use for client connectio
ns. |
| 43 } |
| 44 |
| 45 // Initializes the base structure members. |
| 46 func createNamedPipeServer(ctx context.Context, gen namedPipeGenFunc) *namedPipe
Server { |
| 47 return &namedPipeServer{ |
| 48 Context: log.SetFilter(ctx, "streamServer"), |
| 49 gen: gen, |
| 50 streamParamsC: make(chan *namedPipeStreamParams), |
| 51 closedC: make(chan struct{}), |
| 52 acceptFinishedC: make(chan struct{}), |
| 53 } |
| 54 } |
| 55 |
| 56 // Implements StreamServer.Connect |
| 57 func (s *namedPipeServer) Listen() error { |
| 58 // Create a listener (OS-specific). |
| 59 var err error |
| 60 s.l, err = s.gen() |
| 61 if err != nil { |
| 62 return err |
| 63 } |
| 64 |
| 65 // Poll the Listener for new connections in a separate goroutine. This w
ill |
| 66 // terminate when the server is Close()d. |
| 67 go s.serve() |
| 68 return nil |
| 69 } |
| 70 |
| 71 func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) { |
| 72 if streamParams, ok := <-s.streamParamsC; ok { |
| 73 return streamParams.rc, streamParams.properties |
| 74 } |
| 75 return nil, nil |
| 76 } |
| 77 |
| 78 // Implements StreamServer.Close |
| 79 func (s *namedPipeServer) Close() { |
| 80 if s.l == nil { |
| 81 panic("server is not currently serving") |
| 82 } |
| 83 |
| 84 // Close our Listener. This will cause our 'Accept' goroutine to termina
te. |
| 85 s.l.Close() |
| 86 close(s.closedC) |
| 87 <-s.acceptFinishedC |
| 88 |
| 89 // Close our streamParamsC to signal that we're closed. Any blocking Nex
t will |
| 90 // drain the channel, then return with nil. |
| 91 close(s.streamParamsC) |
| 92 s.l = nil |
| 93 } |
| 94 |
| 95 // Continuously pulls connections from the supplied Listener and returns them as |
| 96 // connections to streamParamsC for consumption by Next(). |
| 97 func (s *namedPipeServer) serve() { |
| 98 defer close(s.acceptFinishedC) |
| 99 |
| 100 nextID := clientID(0) |
| 101 clientWG := sync.WaitGroup{} |
| 102 for { |
| 103 log.Debugf(s, "Beginning Accept() loop cycle.") |
| 104 conn, err := s.l.Accept() |
| 105 if err != nil { |
| 106 log.Fields{ |
| 107 log.ErrorKey: err, |
| 108 }.Errorf(s, "Error during Accept().") |
| 109 break |
| 110 } |
| 111 |
| 112 // Spawn a goroutine to handle this connection. This goroutine w
ill take |
| 113 // ownership of the connection, closing it as appropriate. |
| 114 npConn := &namedPipeConn{ |
| 115 closedC: s.closedC, |
| 116 id: nextID, |
| 117 conn: conn, |
| 118 name: conn.RemoteAddr().String(), |
| 119 } |
| 120 npConn.Context = log.SetFields(s, log.Fields{ |
| 121 "id": npConn.id, |
| 122 "name": npConn.name, |
| 123 }) |
| 124 |
| 125 clientWG.Add(1) |
| 126 go func() { |
| 127 defer clientWG.Done() |
| 128 |
| 129 defer func() { |
| 130 if r := recover(); r != nil { |
| 131 log.Fields{ |
| 132 log.ErrorKey: r, |
| 133 }.Errorf(npConn, "Failed to negotitate s
tream client.") |
| 134 } |
| 135 }() |
| 136 |
| 137 s.streamParamsC <- npConn.negotiate(s) |
| 138 }() |
| 139 nextID++ |
| 140 } |
| 141 |
| 142 // Wait for client connections to finish. |
| 143 clientWG.Wait() |
| 144 log.Fields{ |
| 145 "streamServer": s, |
| 146 "totalConnections": nextID, |
| 147 }.Infof(s, "Exiting serve loop.") |
| 148 } |
| 149 |
| 150 // |
| 151 // namedPipeClient |
| 152 // |
| 153 |
| 154 // Manages a single named pipe connection. |
| 155 type namedPipeConn struct { |
| 156 context.Context |
| 157 |
| 158 closedC chan struct{} // Signal channel to indicate that the server has
closed. |
| 159 id clientID // Client ID, used for debugging correlation. |
| 160 conn net.Conn // The underlying client connection. |
| 161 name string // The name of this connection, for debugging purp
oses. |
| 162 |
| 163 // decoupleMu is used to ensure that decoupleConn is called at most one
time. |
| 164 decoupleMu sync.Mutex |
| 165 } |
| 166 |
| 167 // String conversion (for logging). |
| 168 func (c *namedPipeConn) String() string { |
| 169 return fmt.Sprintf("(%d %s)", c.id, c.name) |
| 170 } |
| 171 |
| 172 // negotiate will panic if an error occurs during stream negotiation. |
| 173 func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams { |
| 174 // Close the connection as a failsafe. If we have already decoupled it,
this |
| 175 // will end up being a no-op. |
| 176 defer c.closeConn() |
| 177 |
| 178 ctx = log.SetFields(ctx, log.Fields{ |
| 179 "id": c.id, |
| 180 "local": c.conn.LocalAddr(), |
| 181 "remote": c.conn.RemoteAddr(), |
| 182 }) |
| 183 log.Infof(ctx, "Received new connection.") |
| 184 |
| 185 // Monitor goroutine that will close our connection if our server has sh
ut |
| 186 // down before it's finished handshaking. |
| 187 handshakeFinishedC := make(chan struct{}) |
| 188 defer close(handshakeFinishedC) |
| 189 |
| 190 go func() { |
| 191 select { |
| 192 case <-c.closedC: |
| 193 log.Warningf(ctx, "Received server close signal; closing
client.") |
| 194 c.closeConn() |
| 195 break |
| 196 |
| 197 case <-handshakeFinishedC: |
| 198 break |
| 199 } |
| 200 }() |
| 201 |
| 202 p, err := c.handshake(ctx) |
| 203 if err != nil { |
| 204 panic(fmt.Errorf("failed to negotiate stream config: %v", err)) |
| 205 } |
| 206 |
| 207 // If we have a timeout configured, set it. |
| 208 if p.Timeout > 0 { |
| 209 c.setDeadline(p.Timeout) |
| 210 } |
| 211 |
| 212 // Break off our handshake stream and send it to the Butler for reading. |
| 213 return &namedPipeStreamParams{c.decoupleConn(), p} |
| 214 } |
| 215 |
| 216 // handshake handles the handshaking and registration of a single connection. If |
| 217 // the connection successfully handshakes, it will be registered as a stream and |
| 218 // supplied to the local streamParamsC; otherwise, it will be closed. |
| 219 // |
| 220 // The client connection opens with a handshake protocol. Once complete, the |
| 221 // connection itself becomes the stream. |
| 222 func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties,
error) { |
| 223 // Read the handshake header. |
| 224 log.Infof(ctx, "Beginning handshake.") |
| 225 |
| 226 // Perform the handshake. |
| 227 hs := handshakeProtocol{} |
| 228 return hs.Handshake(ctx, c.conn) |
| 229 } |
| 230 |
| 231 // Closes the underlying connection. |
| 232 func (c *namedPipeConn) closeConn() { |
| 233 conn := c.decoupleConn() |
| 234 if conn != nil { |
| 235 if err := conn.Close(); err != nil { |
| 236 log.Fields{ |
| 237 log.ErrorKey: err, |
| 238 }.Warningf(c, "Error on connection close.") |
| 239 } |
| 240 } |
| 241 } |
| 242 |
| 243 func (c *namedPipeConn) setDeadline(d time.Duration) error { |
| 244 c.conn = &iotools.DeadlineReader{ |
| 245 Conn: c.conn, |
| 246 Deadline: d, |
| 247 } |
| 248 return nil |
| 249 } |
| 250 |
| 251 // Decouples the active connection, returning it and setting the connection to |
| 252 // nil. |
| 253 func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) { |
| 254 c.decoupleMu.Lock() |
| 255 defer c.decoupleMu.Unlock() |
| 256 |
| 257 conn, c.conn = c.conn, nil |
| 258 return |
| 259 } |
| OLD | NEW |