Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(854)

Side by Side Diff: client/internal/logdog/butler/streamserver/namedPipe.go

Issue 1429993002: LogDog: Add Butler stream server package. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-butlerproto
Patch Set: Bind POSIX test to POSIX domains. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package streamserver
6
7 import (
8 "fmt"
9 "io"
10 "net"
11 "sync"
12 "time"
13
14 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
15 "github.com/luci/luci-go/common/iotools"
16 log "github.com/luci/luci-go/common/logging"
17 "golang.org/x/net/context"
18 )
19
20 // An client ID, used for logging only
21 type clientID int
22
23 // namedPipeStreamParams are parameters representing a negotiated stream ready
24 // to deliver.
25 type namedPipeStreamParams struct {
26 // The stream's ReadCloser connection.
27 rc io.ReadCloser
28 // Negotiated stream properties.
29 properties *streamproto.Properties
30 }
31
32 type namedPipeGenFunc func() (net.Listener, error)
33
34 // Base class for OS-specific stream server implementations.
35 type namedPipeServer struct {
36 context.Context
37
38 gen func() (net.Listener, error)
39 streamParamsC chan *namedPipeStreamParams
40 closedC chan struct{}
41 acceptFinishedC chan struct{}
42 l net.Listener // The Listener to use for client connectio ns.
43 }
44
45 // Initializes the base structure members.
46 func createNamedPipeServer(ctx context.Context, gen namedPipeGenFunc) *namedPipe Server {
47 return &namedPipeServer{
48 Context: log.SetFilter(ctx, "streamServer"),
49 gen: gen,
50 streamParamsC: make(chan *namedPipeStreamParams),
51 closedC: make(chan struct{}),
52 acceptFinishedC: make(chan struct{}),
53 }
54 }
55
56 // Implements StreamServer.Connect
57 func (s *namedPipeServer) Listen() error {
58 // Create a listener (OS-specific).
59 var err error
60 s.l, err = s.gen()
61 if err != nil {
62 return err
63 }
64
65 // Poll the Listener for new connections in a separate goroutine. This w ill
66 // terminate when the server is Close()d.
67 go s.serve()
68 return nil
69 }
70
71 func (s *namedPipeServer) Next() (io.ReadCloser, *streamproto.Properties) {
72 if streamParams, ok := <-s.streamParamsC; ok {
73 return streamParams.rc, streamParams.properties
74 }
75 return nil, nil
76 }
77
78 // Implements StreamServer.Close
79 func (s *namedPipeServer) Close() {
80 if s.l == nil {
81 panic("server is not currently serving")
82 }
83
84 // Close our Listener. This will cause our 'Accept' goroutine to termina te.
85 s.l.Close()
86 close(s.closedC)
87 <-s.acceptFinishedC
88
89 // Close our streamParamsC to signal that we're closed. Any blocking Nex t will
90 // drain the channel, then return with nil.
91 close(s.streamParamsC)
92 s.l = nil
93 }
94
95 // Continuously pulls connections from the supplied Listener and returns them as
96 // connections to streamParamsC for consumption by Next().
97 func (s *namedPipeServer) serve() {
98 defer close(s.acceptFinishedC)
99
100 nextID := clientID(0)
101 clientWG := sync.WaitGroup{}
102 for {
103 log.Debugf(s, "Beginning Accept() loop cycle.")
104 conn, err := s.l.Accept()
105 if err != nil {
106 log.Fields{
107 log.ErrorKey: err,
108 }.Errorf(s, "Error during Accept().")
109 break
110 }
111
112 // Spawn a goroutine to handle this connection. This goroutine w ill take
113 // ownership of the connection, closing it as appropriate.
114 npConn := &namedPipeConn{
115 closedC: s.closedC,
116 id: nextID,
117 conn: conn,
118 name: conn.RemoteAddr().String(),
119 }
120 npConn.Context = log.SetFields(s, log.Fields{
121 "id": npConn.id,
122 "name": npConn.name,
123 })
124
125 clientWG.Add(1)
126 go func() {
127 defer clientWG.Done()
128
129 defer func() {
130 if r := recover(); r != nil {
131 log.Fields{
132 log.ErrorKey: r,
133 }.Errorf(npConn, "Failed to negotitate s tream client.")
134 }
135 }()
136
137 s.streamParamsC <- npConn.negotiate(s)
138 }()
139 nextID++
140 }
141
142 // Wait for client connections to finish.
143 clientWG.Wait()
144 log.Fields{
145 "streamServer": s,
146 "totalConnections": nextID,
147 }.Infof(s, "Exiting serve loop.")
148 }
149
150 //
151 // namedPipeClient
152 //
153
154 // Manages a single named pipe connection.
155 type namedPipeConn struct {
156 context.Context
157
158 closedC chan struct{} // Signal channel to indicate that the server has closed.
159 id clientID // Client ID, used for debugging correlation.
160 conn net.Conn // The underlying client connection.
161 name string // The name of this connection, for debugging purp oses.
162
163 // decoupleMu is used to ensure that decoupleConn is called at most one time.
164 decoupleMu sync.Mutex
165 }
166
167 // String conversion (for logging).
168 func (c *namedPipeConn) String() string {
169 return fmt.Sprintf("(%d %s)", c.id, c.name)
170 }
171
172 // negotiate will panic if an error occurs during stream negotiation.
173 func (c *namedPipeConn) negotiate(ctx context.Context) *namedPipeStreamParams {
174 // Close the connection as a failsafe. If we have already decoupled it, this
175 // will end up being a no-op.
176 defer c.closeConn()
177
178 ctx = log.SetFields(ctx, log.Fields{
179 "id": c.id,
180 "local": c.conn.LocalAddr(),
181 "remote": c.conn.RemoteAddr(),
182 })
183 log.Infof(ctx, "Received new connection.")
184
185 // Monitor goroutine that will close our connection if our server has sh ut
186 // down before it's finished handshaking.
187 handshakeFinishedC := make(chan struct{})
188 defer close(handshakeFinishedC)
189
190 go func() {
191 select {
192 case <-c.closedC:
193 log.Warningf(ctx, "Received server close signal; closing client.")
194 c.closeConn()
195 break
196
197 case <-handshakeFinishedC:
198 break
199 }
200 }()
201
202 p, err := c.handshake(ctx)
203 if err != nil {
204 panic(fmt.Errorf("failed to negotiate stream config: %v", err))
205 }
206
207 // If we have a timeout configured, set it.
208 if p.Timeout > 0 {
209 c.setDeadline(p.Timeout)
210 }
211
212 // Break off our handshake stream and send it to the Butler for reading.
213 return &namedPipeStreamParams{c.decoupleConn(), p}
214 }
215
216 // handshake handles the handshaking and registration of a single connection. If
217 // the connection successfully handshakes, it will be registered as a stream and
218 // supplied to the local streamParamsC; otherwise, it will be closed.
219 //
220 // The client connection opens with a handshake protocol. Once complete, the
221 // connection itself becomes the stream.
222 func (c *namedPipeConn) handshake(ctx context.Context) (*streamproto.Properties, error) {
223 // Read the handshake header.
224 log.Infof(ctx, "Beginning handshake.")
225
226 // Perform the handshake.
227 hs := handshakeProtocol{}
228 return hs.Handshake(ctx, c.conn)
229 }
230
231 // Closes the underlying connection.
232 func (c *namedPipeConn) closeConn() {
233 conn := c.decoupleConn()
234 if conn != nil {
235 if err := conn.Close(); err != nil {
236 log.Fields{
237 log.ErrorKey: err,
238 }.Warningf(c, "Error on connection close.")
239 }
240 }
241 }
242
243 func (c *namedPipeConn) setDeadline(d time.Duration) error {
244 c.conn = &iotools.DeadlineReader{
245 Conn: c.conn,
246 Deadline: d,
247 }
248 return nil
249 }
250
251 // Decouples the active connection, returning it and setting the connection to
252 // nil.
253 func (c *namedPipeConn) decoupleConn() (conn io.ReadCloser) {
254 c.decoupleMu.Lock()
255 defer c.decoupleMu.Unlock()
256
257 conn, c.conn = c.conn, nil
258 return
259 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698