| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package streamserver |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "io" |
| 11 "net" |
| 12 "sync" |
| 13 "time" |
| 14 |
| 15 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
| 16 "github.com/luci/luci-go/common/iotools" |
| 17 log "github.com/luci/luci-go/common/logging" |
| 18 "golang.org/x/net/context" |
| 19 ) |
| 20 |
| 21 // An client ID, used for logging only |
| 22 type clientID int |
| 23 |
| 24 // namedPipeStreamParams are parameters representing a negotiated stream ready |
| 25 // to deliver. |
| 26 type namedPipeStreamParams struct { |
| 27 // The stream's ReadCloser connection. |
| 28 rc io.ReadCloser |
| 29 // Negotiated stream properties. |
| 30 properties *streamproto.Properties |
| 31 } |
| 32 |
| 33 // Base class for OS-specific stream server implementations. |
| 34 type namedPipeServerBase struct { |
| 35 ctx context.Context |
| 36 |
| 37 // serving, if true, indicates that the server is currently serving. |
| 38 serving bool |
| 39 |
| 40 streamParamsC chan *namedPipeStreamParams |
| 41 closedC chan struct{} |
| 42 acceptFinishedC chan struct{} |
| 43 l net.Listener // The Listener to use for client connectio
ns. |
| 44 } |
| 45 |
| 46 // Initializes the base structure members. |
| 47 func (s *namedPipeServerBase) initBase(ctx context.Context) { |
| 48 s.ctx = log.SetFilter(ctx, "streamServer") |
| 49 } |
| 50 |
| 51 // Implements StreamServer.Connect |
| 52 func (s *namedPipeServer) Listen() error { |
| 53 // Initialize our stream channel. |
| 54 s.streamParamsC = make(chan *namedPipeStreamParams) |
| 55 s.closedC = make(chan struct{}) |
| 56 s.acceptFinishedC = make(chan struct{}) |
| 57 |
| 58 // Create a listener (OS-specific). |
| 59 var err error |
| 60 s.l, err = s.createListener() |
| 61 if err != nil { |
| 62 return err |
| 63 } |
| 64 |
| 65 // Poll the Listener for new connections in a separate goroutine. This w
ill |
| 66 // terminate when the server is Close()d. |
| 67 s.serving = true |
| 68 go s.serve() |
| 69 return nil |
| 70 } |
| 71 |
| 72 func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) { |
| 73 if streamParams, ok := <-s.streamParamsC; ok { |
| 74 return streamParams.rc, streamParams.properties |
| 75 } |
| 76 return nil, nil |
| 77 } |
| 78 |
| 79 // Implements StreamServer.Close |
| 80 func (s *namedPipeServer) Close() { |
| 81 if !s.serving { |
| 82 panic("server is not currently serving") |
| 83 } |
| 84 |
| 85 // Close our Listener. This will cause our 'Accept' goroutine to termina
te. |
| 86 s.l.Close() |
| 87 close(s.closedC) |
| 88 <-s.acceptFinishedC |
| 89 |
| 90 // Close our streamParamsC to signal that we're closed. Any blocking Nex
t will |
| 91 // drain the channel, then return with nil. |
| 92 close(s.streamParamsC) |
| 93 s.serving = false |
| 94 } |
| 95 |
| 96 // Continuously pulls connections from the supplied Listener and returns them as |
| 97 // connections to streamParamsC for consumption by Next(). |
| 98 func (s *namedPipeServer) serve() { |
| 99 defer close(s.acceptFinishedC) |
| 100 |
| 101 nextID := clientID(0) |
| 102 clientWG := sync.WaitGroup{} |
| 103 for { |
| 104 log.Debugf(s.ctx, "Beginning Accept() loop cycle.") |
| 105 conn, err := s.l.Accept() |
| 106 if err != nil { |
| 107 log.Errorf(log.SetError(s.ctx, err), "Error during Accep
t().") |
| 108 break |
| 109 } |
| 110 |
| 111 // Spawn a goroutine to handle this connection. This goroutine w
ill take |
| 112 // ownership of the connection, closing it as appropriate. |
| 113 npConn := &namedPipeConn{ |
| 114 closedC: s.closedC, |
| 115 id: nextID, |
| 116 conn: conn, |
| 117 name: conn.RemoteAddr().String(), |
| 118 } |
| 119 clientWG.Add(1) |
| 120 go func() { |
| 121 defer clientWG.Done() |
| 122 |
| 123 c := log.SetFields(s.ctx, log.Fields{ |
| 124 "id": npConn.id, |
| 125 "name": npConn.name, |
| 126 }) |
| 127 defer func() { |
| 128 if r := recover(); r != nil { |
| 129 log.Fields{ |
| 130 log.ErrorKey: r, |
| 131 }.Errorf(c, "Failed to negotitate stream
client.") |
| 132 } |
| 133 }() |
| 134 |
| 135 s.streamParamsC <- npConn.negotiate(s.ctx) |
| 136 }() |
| 137 nextID++ |
| 138 } |
| 139 |
| 140 // Wait for client connections to finish. |
| 141 clientWG.Wait() |
| 142 log.Infof(log.SetFields(s.ctx, log.Fields{ |
| 143 "streamServer": s, |
| 144 "totalConnections": nextID, |
| 145 }), "Exiting serve loop.") |
| 146 } |
| 147 |
| 148 // |
| 149 // namedPipeClient |
| 150 // |
| 151 |
| 152 // Manages a single named pipe connection. |
| 153 type namedPipeConn struct { |
| 154 closedC chan struct{} // Signal channel to indicate that the server has
closed. |
| 155 id clientID // Client ID, used for debugging correlation. |
| 156 conn net.Conn // The underlying client connection. |
| 157 name string // The name of this connection, for debugging purp
oses. |
| 158 |
| 159 // decoupleMu is used to ensure that decoupleConn is called at most one
time. |
| 160 decoupleMu sync.Mutex |
| 161 } |
| 162 |
| 163 // String conversion (for logging). |
| 164 func (c *namedPipeConn) String() string { |
| 165 return fmt.Sprintf("(%d %s)", c.id, c.name) |
| 166 } |
| 167 |
| 168 // negotiate will panic if an error occurs during stream negotiation. |
| 169 func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams { |
| 170 // Close the connection as a failsafe. If we have already decoupled it,
this |
| 171 // will end up being a no-op. |
| 172 defer c.closeConn() |
| 173 |
| 174 ctx = log.SetFields(ctx, log.Fields{ |
| 175 "id": c.id, |
| 176 "local": c.conn.LocalAddr(), |
| 177 "remote": c.conn.RemoteAddr(), |
| 178 }) |
| 179 log.Infof(ctx, "Received new connection.") |
| 180 |
| 181 // Monitor goroutine that will close our connection if our server has sh
ut |
| 182 // down before it's finished handshaking. |
| 183 handshakeFinishedC := make(chan struct{}) |
| 184 defer close(handshakeFinishedC) |
| 185 |
| 186 go func() { |
| 187 select { |
| 188 case <-c.closedC: |
| 189 log.Warningf(ctx, "Received server close signal; closing
client.") |
| 190 c.closeConn() |
| 191 break |
| 192 |
| 193 case <-handshakeFinishedC: |
| 194 break |
| 195 } |
| 196 }() |
| 197 |
| 198 p, err := c.handshake(ctx) |
| 199 if err != nil { |
| 200 panic(fmt.Errorf("failed to negotiate stream config: %v", err)) |
| 201 } |
| 202 |
| 203 // If we have a timeout configured, set it. |
| 204 if p.Timeout > 0 { |
| 205 c.setDeadline(p.Timeout) |
| 206 } |
| 207 |
| 208 // Break off our handshake stream and send it to the Butler for reading. |
| 209 return &namedPipeStreamParams{c.decoupleConn(), p} |
| 210 } |
| 211 |
| 212 // handshake handles the handshaking and registration of a single connection. If |
| 213 // the connection successfully handshakes, it will be registered as a stream and |
| 214 // supplied to the local streamParamsC; otherwise, it will be closed. |
| 215 // |
| 216 // The client connection opens with a handshake protocol. Once complete, the |
| 217 // connection itself becomes the stream. |
| 218 func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties,
error) { |
| 219 // Read the handshake header. |
| 220 log.Infof(ctx, "Beginning handshake.") |
| 221 |
| 222 // Determine which protocol we're using by reading the first (magic) byt
es. |
| 223 protocol, err := getProtocolFromMagic(ctx, c.conn) |
| 224 if err != nil { |
| 225 log.Errorf(log.SetError(ctx, err), |
| 226 "Failed to determine protocol from header.") |
| 227 return nil, errors.New("Failed to determine protocol.") |
| 228 } |
| 229 |
| 230 // Perform the handshake. |
| 231 return protocol.Handshake(ctx, c.conn) |
| 232 } |
| 233 |
| 234 // Closes the underlying connection. |
| 235 func (c *namedPipeConn) closeConn() { |
| 236 conn := c.decoupleConn() |
| 237 if conn != nil { |
| 238 if err := conn.Close(); err != nil { |
| 239 } |
| 240 } |
| 241 } |
| 242 |
| 243 func (c *namedPipeConn) setDeadline(d time.Duration) error { |
| 244 c.conn = &iotools.DeadlineReader{ |
| 245 Conn: c.conn, |
| 246 Deadline: d, |
| 247 } |
| 248 return nil |
| 249 } |
| 250 |
| 251 // Decouples the active connection, returning it and setting the connection to |
| 252 // nil. |
| 253 func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) { |
| 254 c.decoupleMu.Lock() |
| 255 defer c.decoupleMu.Unlock() |
| 256 |
| 257 conn, c.conn = c.conn, nil |
| 258 return |
| 259 } |
| OLD | NEW |