OLD | NEW |
| (Empty) |
1 /* | |
2 * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 * contributor license agreements. See the NOTICE file distributed with | |
4 * this work for additional information regarding copyright ownership. | |
5 * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 * (the "License"); you may not use this file except in compliance with | |
7 * the License. You may obtain a copy of the License at | |
8 * | |
9 * http://www.apache.org/licenses/LICENSE-2.0 | |
10 * | |
11 * Unless required by applicable law or agreed to in writing, software | |
12 * distributed under the License is distributed on an "AS IS" BASIS, | |
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 * See the License for the specific language governing permissions and | |
15 * limitations under the License. | |
16 */ | |
17 package org.apache.tomcat.jni.socket; | |
18 | |
19 import java.io.IOException; | |
20 import java.net.InetSocketAddress; | |
21 import java.util.ArrayList; | |
22 import java.util.HashMap; | |
23 import java.util.List; | |
24 import java.util.Map; | |
25 import java.util.concurrent.BlockingQueue; | |
26 import java.util.concurrent.Executor; | |
27 import java.util.concurrent.ExecutorService; | |
28 import java.util.concurrent.Executors; | |
29 import java.util.concurrent.LinkedBlockingQueue; | |
30 import java.util.concurrent.RejectedExecutionHandler; | |
31 import java.util.concurrent.ThreadFactory; | |
32 import java.util.concurrent.ThreadPoolExecutor; | |
33 import java.util.concurrent.TimeUnit; | |
34 import java.util.concurrent.atomic.AtomicBoolean; | |
35 import java.util.concurrent.atomic.AtomicInteger; | |
36 import java.util.concurrent.atomic.AtomicLong; | |
37 import java.util.logging.Level; | |
38 import java.util.logging.Logger; | |
39 | |
40 import org.apache.tomcat.jni.Address; | |
41 import org.apache.tomcat.jni.Error; | |
42 import org.apache.tomcat.jni.Library; | |
43 import org.apache.tomcat.jni.OS; | |
44 import org.apache.tomcat.jni.Poll; | |
45 import org.apache.tomcat.jni.Pool; | |
46 import org.apache.tomcat.jni.SSL; | |
47 import org.apache.tomcat.jni.SSLContext; | |
48 import org.apache.tomcat.jni.SSLExt; | |
49 import org.apache.tomcat.jni.Socket; | |
50 import org.apache.tomcat.jni.Status; | |
51 | |
52 public class AprSocketContext { | |
53 /** | |
54 * Called when a chunk of data is sent or received. This is very low | |
55 * level, used mostly for debugging or stats. | |
56 */ | |
57 public static interface RawDataHandler { | |
58 public void rawData(AprSocket ch, boolean input, byte[] data, int pos, | |
59 int len, int requested, boolean closed); | |
60 } | |
61 | |
62 /** | |
63 * Called in SSL mode after the handshake is completed. | |
64 * | |
65 * @see AprSocketContext#customVerification(TlsCertVerifier) | |
66 */ | |
67 public static interface TlsCertVerifier { | |
68 public void handshakeDone(AprSocket ch); | |
69 } | |
70 | |
71 /** | |
72 * Delegates loading of persistent info about a host - public certs, | |
73 * tickets, config, persistent info etc. | |
74 */ | |
75 public static interface HostInfoLoader { | |
76 public HostInfo getHostInfo(String name, int port, boolean ssl); | |
77 } | |
78 | |
79 private static final Logger log = Logger.getLogger("AprSocketCtx"); | |
80 | |
81 // If interrupt() or thread-safe poll update are not supported - the | |
82 // poll updates will happen after the poll() timeout. | |
83 // The poll timeout with interrupt/thread safe updates can be much higher/ | |
84 private static final int FALLBACK_POLL_TIME = 2000; | |
85 | |
86 // It seems to send the ticket, get server helo / ChangeCipherSpec, but than | |
87 // SSL3_GET_RECORD:decryption failed or bad record mac in s3_pkt.c:480: | |
88 // Either bug in openssl, or some combination of ciphers - needs more debugg
ing. | |
89 // ( this can save a roundtrip and CPU on TLS handshake ) | |
90 boolean USE_TICKETS = false; | |
91 | |
92 private final AprSocket END = new AprSocket(this); | |
93 | |
94 private static final AtomicInteger contextNumber = new AtomicInteger(); | |
95 private int contextId; | |
96 | |
97 private final AtomicInteger threadNumber = new AtomicInteger(); | |
98 | |
99 /** | |
100 * For now - single acceptor thread per connector. | |
101 */ | |
102 private AcceptorThread acceptor; | |
103 private AcceptorDispatchThread acceptorDispatch; | |
104 | |
105 // APR/JNI is thread safe | |
106 private boolean threadSafe = true; | |
107 | |
108 /** | |
109 * Pollers. | |
110 */ | |
111 private final List<AprPoller> pollers = new ArrayList<>(); | |
112 | |
113 // Set on all accepted or connected sockets. | |
114 // TODO: add the other properties | |
115 boolean tcpNoDelay = true; | |
116 | |
117 protected boolean running = true; | |
118 | |
119 protected boolean sslMode; | |
120 | |
121 // onSocket() will be called in accept thread. | |
122 // If false: use executor ( but that may choke the acceptor thread ) | |
123 private boolean nonBlockingAccept = false; | |
124 | |
125 private final BlockingQueue<AprSocket> acceptedQueue = | |
126 new LinkedBlockingQueue<>(); | |
127 | |
128 /** | |
129 * Root APR memory pool. | |
130 */ | |
131 private long rootPool = 0; | |
132 | |
133 /** | |
134 * SSL context. | |
135 */ | |
136 private volatile long sslCtx = 0; | |
137 | |
138 TlsCertVerifier tlsCertVerifier; | |
139 | |
140 // | |
141 final int connectTimeout = 20000; | |
142 final int defaultTimeout = 100000; | |
143 // TODO: Use this | |
144 final int keepAliveTimeout = 20000; | |
145 | |
146 final AtomicInteger open = new AtomicInteger(); | |
147 | |
148 /** | |
149 * Poll interval, in microseconds. If the platform doesn't support | |
150 * poll interrupt - it'll take this time to stop the poller. | |
151 * | |
152 */ | |
153 private int pollTime = 5 * 1000000; | |
154 | |
155 private HostInfoLoader hostInfoLoader; | |
156 | |
157 final RawDataHandler rawDataHandler = null; | |
158 | |
159 // TODO: do we need this here ? | |
160 private final Map<String, HostInfo> hosts = new HashMap<>(); | |
161 | |
162 private String certFile; | |
163 private String keyFile; | |
164 | |
165 private byte[] spdyNPN; | |
166 | |
167 private byte[] ticketKey; | |
168 | |
169 // For resolving DNS ( i.e. connect ), callbacks | |
170 private ExecutorService threadPool; | |
171 | |
172 // Separate executor for connect/handshakes | |
173 final ExecutorService connectExecutor; | |
174 | |
175 final boolean debugSSL = false; | |
176 private boolean debugPoll = false; | |
177 | |
178 private boolean deferAccept = false; | |
179 | |
180 private int backlog = 100; | |
181 | |
182 private boolean useSendfile; | |
183 | |
184 private int sslProtocol = SSL.SSL_PROTOCOL_TLSV1 | SSL.SSL_PROTOCOL_TLSV1_1
| SSL.SSL_PROTOCOL_TLSV1_2; | |
185 | |
186 /** | |
187 * Max time spent in a callback ( will be longer for blocking ) | |
188 */ | |
189 final AtomicLong maxHandlerTime = new AtomicLong(); | |
190 final AtomicLong totalHandlerTime = new AtomicLong(); | |
191 final AtomicLong handlerCount = new AtomicLong(); | |
192 | |
193 /** | |
194 * Total connections handled ( accepted or connected ). | |
195 */ | |
196 private final AtomicInteger connectionsCount = new AtomicInteger(); | |
197 | |
198 | |
199 public AprSocketContext() { | |
200 connectExecutor =new ThreadPoolExecutor(0, 64, 5, TimeUnit.SECONDS, | |
201 new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandle
r() { | |
202 @Override | |
203 public void rejectedExecution(Runnable r, | |
204 java.util.concurrent.ThreadPoolExecutor executor) { | |
205 AprSocket s = (AprSocket) r; | |
206 log.severe("Rejecting " + s); | |
207 s.reset(); | |
208 } | |
209 }); | |
210 contextId = contextNumber.incrementAndGet(); | |
211 } | |
212 | |
213 /** | |
214 * Poller thread count. | |
215 */ | |
216 private int pollerThreadCount = 4; | |
217 public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadC
ount = pollerThreadCount; } | |
218 public int getPollerThreadCount() { return pollerThreadCount; } | |
219 | |
220 // to test the limits - default should be lower | |
221 private int maxConnections = 64 * 1024; | |
222 public void setMaxconnections(int maxCon) { | |
223 this.maxConnections = maxCon; | |
224 } | |
225 | |
226 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlo
g; } | |
227 public int getBacklog() { return backlog; } | |
228 | |
229 /** | |
230 * Defer accept. | |
231 */ | |
232 public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAc
cept; } | |
233 public boolean getDeferAccept() { return deferAccept; } | |
234 | |
235 /** | |
236 * For client: | |
237 * - ClientHello will include the npn extension ( the ID == 0x3374) | |
238 * - if ServerHello includes a list of protocols - select one | |
239 * - send it after ChangeCipherSpec and before Finish | |
240 * | |
241 * For server: | |
242 * - if ClientHello includes the npn extension | |
243 * -- will send this string as list of supported protocols in ServerHello | |
244 * - read the selection before Finish. | |
245 * @param npn | |
246 */ | |
247 public void setNpn(String npn) { | |
248 byte[] data = npn.getBytes(); | |
249 byte[] npnB = new byte[data.length + 2]; | |
250 | |
251 System.arraycopy(data, 0, npnB, 1, data.length); | |
252 npnB[0] = (byte) data.length; | |
253 npnB[npnB.length - 1] = 0; | |
254 spdyNPN = npnB; | |
255 | |
256 } | |
257 | |
258 public void setNpn(byte[] data) { | |
259 spdyNPN = data; | |
260 } | |
261 | |
262 public void setHostLoader(HostInfoLoader handler) { | |
263 this.hostInfoLoader = handler; | |
264 } | |
265 | |
266 public boolean isServer() { | |
267 return acceptor != null; | |
268 } | |
269 | |
270 protected Executor getExecutor() { | |
271 if (threadPool == null) { | |
272 threadPool = Executors.newCachedThreadPool(new ThreadFactory( ) { | |
273 @Override | |
274 public Thread newThread(Runnable r) { | |
275 Thread t = new Thread(r, "AprThread-" + contextId + "-" + | |
276 threadNumber.incrementAndGet()); | |
277 t.setDaemon(true); | |
278 return t; | |
279 } | |
280 }); | |
281 } | |
282 return threadPool; | |
283 } | |
284 | |
285 /** | |
286 * All accepted/connected sockets will start handshake automatically. | |
287 */ | |
288 public AprSocketContext setTls() { | |
289 this.sslMode = true; | |
290 return this; | |
291 } | |
292 | |
293 public void setTcpNoDelay(boolean b) { | |
294 tcpNoDelay = b; | |
295 } | |
296 | |
297 public void setSslProtocol(String protocol) { | |
298 protocol = protocol.trim(); | |
299 if ("SSLv2".equalsIgnoreCase(protocol)) { | |
300 sslProtocol = SSL.SSL_PROTOCOL_SSLV2; | |
301 } else if ("SSLv3".equalsIgnoreCase(protocol)) { | |
302 sslProtocol = SSL.SSL_PROTOCOL_SSLV3; | |
303 } else if ("TLSv1".equalsIgnoreCase(protocol)) { | |
304 sslProtocol = SSL.SSL_PROTOCOL_TLSV1; | |
305 } else if ("TLSv1.1".equalsIgnoreCase(protocol)) { | |
306 sslProtocol = SSL.SSL_PROTOCOL_TLSV1_1; | |
307 } else if ("TLSv1.2".equalsIgnoreCase(protocol)) { | |
308 sslProtocol = SSL.SSL_PROTOCOL_TLSV1_2; | |
309 } else if ("all".equalsIgnoreCase(protocol)) { | |
310 sslProtocol = SSL.SSL_PROTOCOL_ALL; | |
311 } | |
312 } | |
313 | |
314 public void setTicketKey(byte[] key48Bytes) { | |
315 if(key48Bytes.length != 48) { | |
316 throw new RuntimeException("Key must be 48 bytes"); | |
317 } | |
318 this.ticketKey = key48Bytes; | |
319 } | |
320 | |
321 public void customVerification(TlsCertVerifier verifier) { | |
322 tlsCertVerifier = verifier; | |
323 } | |
324 | |
325 // TODO: should have a separate method for switching to tls later. | |
326 /** | |
327 * Set certificate, will also enable TLS mode. | |
328 */ | |
329 public AprSocketContext setKeys(String certPemFile, String keyDerFile) { | |
330 this.sslMode = true; | |
331 setTls(); | |
332 certFile = certPemFile; | |
333 keyFile = keyDerFile; | |
334 return this; | |
335 } | |
336 | |
337 /** | |
338 * SSL cipher suite. | |
339 */ | |
340 private String SSLCipherSuite = "ALL"; | |
341 public String getSSLCipherSuite() { return SSLCipherSuite; } | |
342 public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite =
SSLCipherSuite; } | |
343 | |
344 /** | |
345 * Override or use hostInfoLoader to implement persistent/memcache storage. | |
346 */ | |
347 public HostInfo getHostInfo(String host, int port, boolean ssl) { | |
348 if (hostInfoLoader != null) { | |
349 return hostInfoLoader.getHostInfo(host, port, ssl); | |
350 } | |
351 // Use local cache | |
352 String key = host + ":" + port; | |
353 HostInfo pi = hosts.get(key); | |
354 if (pi != null) { | |
355 return pi; | |
356 } | |
357 pi = new HostInfo(host, port, ssl); | |
358 hosts.put(key, pi); | |
359 return pi; | |
360 } | |
361 | |
362 protected void rawData(AprSocket ch, boolean inp, byte[] data, int pos, | |
363 int len, int requested, boolean closed) { | |
364 if (rawDataHandler != null) { | |
365 rawDataHandler.rawData(ch, inp, data, pos, len, requested, closed); | |
366 } | |
367 } | |
368 | |
369 public void listen(final int port) throws IOException { | |
370 if (acceptor != null) { | |
371 throw new IOException("Already accepting on " + acceptor.port); | |
372 } | |
373 if (sslMode && certFile == null) { | |
374 throw new IOException("Missing certificates for server"); | |
375 } | |
376 if (sslMode || !nonBlockingAccept) { | |
377 acceptorDispatch = new AcceptorDispatchThread(); | |
378 acceptorDispatch.setName("AprAcceptorDispatch-" + port); | |
379 acceptorDispatch.start(); | |
380 } | |
381 | |
382 acceptor = new AcceptorThread(port); | |
383 acceptor.prepare(); | |
384 acceptor.setName("AprAcceptor-" + port); | |
385 acceptor.start(); | |
386 | |
387 | |
388 } | |
389 | |
390 /** | |
391 * Get a socket for connectiong to host:port. | |
392 */ | |
393 public AprSocket socket(String host, int port, boolean ssl) { | |
394 HostInfo hi = getHostInfo(host, port, ssl); | |
395 return socket(hi); | |
396 } | |
397 | |
398 public AprSocket socket(HostInfo hi) { | |
399 AprSocket sock = newSocket(this); | |
400 sock.setHost(hi); | |
401 return sock; | |
402 } | |
403 | |
404 public AprSocket socket(long socket) { | |
405 AprSocket sock = newSocket(this); | |
406 // Tomcat doesn't set this | |
407 SSLExt.sslSetMode(socket, SSLExt.SSL_MODE_ENABLE_PARTIAL_WRITE | | |
408 SSLExt.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); | |
409 sock.setStatus(AprSocket.ACCEPTED); | |
410 sock.socket = socket; | |
411 return sock; | |
412 } | |
413 | |
414 | |
415 void destroySocket(AprSocket socket) { | |
416 // TODO: does it need to be done in io thread ? | |
417 synchronized (socket) { | |
418 if (socket.socket != 0) { | |
419 long s = socket.socket; | |
420 socket.socket = 0; | |
421 log.info("DESTROY: " + Long.toHexString(s)); | |
422 Socket.destroy(s); | |
423 } | |
424 } | |
425 } | |
426 | |
427 protected void connectBlocking(AprSocket apr) throws IOException { | |
428 try { | |
429 if (!running) { | |
430 throw new IOException("Stopped"); | |
431 } | |
432 HostInfo hi = apr.getHost(); | |
433 | |
434 long clientSockP; | |
435 synchronized (pollers) { | |
436 long socketpool = Pool.create(getRootPool()); | |
437 | |
438 int family = Socket.APR_INET; | |
439 | |
440 clientSockP = Socket.create(family, | |
441 Socket.SOCK_STREAM, | |
442 Socket.APR_PROTO_TCP, socketpool); // or rootPool ? | |
443 } | |
444 Socket.timeoutSet(clientSockP, connectTimeout * 1000); | |
445 if (OS.IS_UNIX) { | |
446 Socket.optSet(clientSockP, Socket.APR_SO_REUSEADDR, 1); | |
447 } | |
448 | |
449 Socket.optSet(clientSockP, Socket.APR_SO_KEEPALIVE, 1); | |
450 | |
451 // Blocking | |
452 // TODO: use socket pool | |
453 // TODO: cache it ( and TTL ) in hi | |
454 long inetAddress = Address.info(hi.host, Socket.APR_INET, | |
455 hi.port, 0, rootPool); | |
456 // this may take a long time - stop/destroy must wait | |
457 // at least connect timeout | |
458 int rc = Socket.connect(clientSockP, inetAddress); | |
459 | |
460 if (rc != 0) { | |
461 synchronized (pollers) { | |
462 Socket.close(clientSockP); | |
463 Socket.destroy(clientSockP); | |
464 } | |
465 /////Pool.destroy(socketpool); | |
466 throw new IOException("Socket.connect(): " + rc + " " + Error.st
rerror(rc) + " " + connectTimeout); | |
467 } | |
468 if (!running) { | |
469 throw new IOException("Stopped"); | |
470 } | |
471 | |
472 connectionsCount.incrementAndGet(); | |
473 if (tcpNoDelay) { | |
474 Socket.optSet(clientSockP, Socket.APR_TCP_NODELAY, 1); | |
475 } | |
476 | |
477 Socket.timeoutSet(clientSockP, defaultTimeout * 1000); | |
478 | |
479 apr.socket = clientSockP; | |
480 | |
481 apr.afterConnect(); | |
482 } catch (IOException e) { | |
483 apr.reset(); | |
484 throw e; | |
485 } catch (Throwable e) { | |
486 apr.reset(); | |
487 e.printStackTrace(); | |
488 throw new IOException(e); | |
489 } | |
490 } | |
491 | |
492 AprSocket newSocket(AprSocketContext context) { | |
493 return new AprSocket(context); | |
494 } | |
495 | |
496 /** | |
497 * To clean the pools - we could track if all channels are | |
498 * closed, but this seems simpler and safer. | |
499 */ | |
500 @Override | |
501 protected void finalize() throws Throwable { | |
502 if (rootPool != 0) { | |
503 log.warning(this + " GC without stop()"); | |
504 try { | |
505 stop(); | |
506 } catch (Exception e) { | |
507 //TODO Auto-generated catch block | |
508 e.printStackTrace(); | |
509 } | |
510 } | |
511 super.finalize(); | |
512 } | |
513 | |
514 | |
515 public void stop() { | |
516 synchronized (pollers) { | |
517 if (!running) { | |
518 return; | |
519 } | |
520 running = false; | |
521 } | |
522 | |
523 if (rootPool != 0) { | |
524 if (acceptor != null) { | |
525 try { | |
526 acceptor.unblock(); | |
527 acceptor.join(); | |
528 } catch (InterruptedException e) { | |
529 e.printStackTrace(); | |
530 } | |
531 } | |
532 if (acceptorDispatch != null) { | |
533 acceptedQueue.add(END); | |
534 try { | |
535 acceptorDispatch.join(); | |
536 } catch (InterruptedException e) { | |
537 e.printStackTrace(); | |
538 } | |
539 } | |
540 if (threadPool != null) { | |
541 threadPool.shutdownNow(); | |
542 } | |
543 | |
544 log.info("Stopping pollers " + contextId); | |
545 | |
546 while (true) { | |
547 AprPoller a; | |
548 synchronized (pollers) { | |
549 if (pollers.size() == 0) { | |
550 break; | |
551 } | |
552 a = pollers.remove(0); | |
553 } | |
554 a.interruptPoll(); | |
555 try { | |
556 a.join(); | |
557 log.info("Poller " + a.id + " done "); | |
558 } catch (InterruptedException e) { | |
559 e.printStackTrace(); | |
560 } | |
561 } | |
562 } | |
563 } | |
564 | |
565 | |
566 // Called when the last poller has been destroyed. | |
567 void destroy() { | |
568 synchronized (pollers) { | |
569 if (pollers.size() != 0) { | |
570 return; | |
571 } | |
572 | |
573 if (rootPool == 0) { | |
574 return; | |
575 } | |
576 log.info("Destroy root pool " + rootPool); | |
577 //Pool.destroy(rootPool); | |
578 //rootPool = 0; | |
579 } | |
580 } | |
581 | |
582 private static IOException noApr; | |
583 static { | |
584 | |
585 try { | |
586 Library.initialize(null); | |
587 SSL.initialize(null); | |
588 } catch (Exception e) { | |
589 noApr = new IOException("APR not present", e); | |
590 } | |
591 | |
592 } | |
593 | |
594 private long getRootPool() throws IOException { | |
595 if (rootPool == 0) { | |
596 if (noApr != null) { | |
597 throw noApr; | |
598 } | |
599 // Create the root APR memory pool | |
600 rootPool = Pool.create(0); | |
601 | |
602 // Adjust poller sizes | |
603 if ((OS.IS_WIN32 || OS.IS_WIN64) && (maxConnections > 1024)) { | |
604 // The maximum per poller to get reasonable performance is 1024 | |
605 pollerThreadCount = maxConnections / 1024; | |
606 // Adjust poller size so that it won't reach the limit | |
607 maxConnections = maxConnections - (maxConnections % 1024); | |
608 } | |
609 } | |
610 return rootPool; | |
611 } | |
612 | |
613 long getSslCtx() throws Exception { | |
614 if (sslCtx == 0) { | |
615 synchronized (AprSocketContext.class) { | |
616 if (sslCtx == 0) { | |
617 boolean serverMode = acceptor != null; | |
618 sslCtx = SSLContext.make(getRootPool(), | |
619 sslProtocol, | |
620 serverMode ? SSL.SSL_MODE_SERVER : SSL.SSL_MODE_CLIE
NT); | |
621 | |
622 | |
623 // SSL.SSL_OP_NO_SSLv3 | |
624 int opts = SSL.SSL_OP_NO_SSLv2 | | |
625 SSL.SSL_OP_SINGLE_DH_USE; | |
626 | |
627 if (!USE_TICKETS || serverMode && ticketKey == null) { | |
628 opts |= SSL.SSL_OP_NO_TICKET; | |
629 } | |
630 | |
631 SSLContext.setOptions(sslCtx, opts); | |
632 // Set revocation | |
633 // SSLContext.setCARevocation(sslContext, SSLCARevoca
tionFile, SSLCARevocationPath); | |
634 | |
635 // Client certificate verification - maybe make it option | |
636 try { | |
637 SSLContext.setCipherSuite(sslCtx, SSLCipherSuite); | |
638 | |
639 | |
640 if (serverMode) { | |
641 if (ticketKey != null) { | |
642 //SSLExt.setTicketKeys(sslCtx, ticketKey, ticket
Key.length); | |
643 } | |
644 if (certFile != null) { | |
645 boolean rc = SSLContext.setCertificate(sslCtx, | |
646 certFile, | |
647 keyFile, null, SSL.SSL_AIDX_DSA); | |
648 if (!rc) { | |
649 throw new IOException("Can't set keys"); | |
650 } | |
651 } | |
652 SSLContext.setVerify(sslCtx, SSL.SSL_CVERIFY_NONE, 1
0); | |
653 | |
654 if (spdyNPN != null) { | |
655 SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); | |
656 } | |
657 } else { | |
658 if (tlsCertVerifier != null) { | |
659 // NONE ? | |
660 SSLContext.setVerify(sslCtx, | |
661 SSL.SSL_CVERIFY_NONE, 10); | |
662 } else { | |
663 SSLContext.setCACertificate(sslCtx, | |
664 "/etc/ssl/certs/ca-certificates.crt", | |
665 "/etc/ssl/certs"); | |
666 SSLContext.setVerify(sslCtx, | |
667 SSL.SSL_CVERIFY_REQUIRE, 10); | |
668 } | |
669 | |
670 if (spdyNPN != null) { | |
671 SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); | |
672 } | |
673 } | |
674 } catch (IOException e) { | |
675 throw e; | |
676 } catch (Exception e) { | |
677 throw new IOException(e); | |
678 } | |
679 } | |
680 // TODO: try release buffers | |
681 } | |
682 } | |
683 return sslCtx; | |
684 } | |
685 | |
686 void findPollerAndAdd(AprSocket ch) throws IOException { | |
687 if (ch.poller != null) { | |
688 ch.poller.requestUpdate(ch); | |
689 return; | |
690 } | |
691 assignPoller(ch); | |
692 } | |
693 | |
694 void assignPoller(AprSocket ch) throws IOException { | |
695 AprPoller target = null; | |
696 synchronized (pollers) { | |
697 // Make sure we have min number of pollers | |
698 int needPollers = pollerThreadCount - pollers.size(); | |
699 if (needPollers > 0) { | |
700 for (int i = needPollers; i > 0; i--) { | |
701 pollers.add(allocatePoller()); | |
702 } | |
703 } | |
704 int max = 0; | |
705 for (AprPoller poller: pollers) { | |
706 int rem = poller.remaining(); | |
707 if (rem > max) { | |
708 target = poller; | |
709 max = rem; | |
710 } | |
711 } | |
712 } | |
713 if (target != null && target.add(ch)) { | |
714 return; | |
715 } | |
716 | |
717 // can't be added - add a new poller | |
718 synchronized (pollers) { | |
719 AprPoller poller = allocatePoller(); | |
720 poller.add(ch); | |
721 pollers.add(poller); | |
722 } | |
723 } | |
724 | |
725 /** | |
726 * Called on each accepted socket (for servers) or after connection (client) | |
727 * after handshake. | |
728 */ | |
729 protected void onSocket(@SuppressWarnings("unused") AprSocket s) { | |
730 // Defaults to NO-OP. Parameter is used by sub-classes. | |
731 } | |
732 | |
733 private class AcceptorThread extends Thread { | |
734 private final int port; | |
735 private long serverSockPool = 0; | |
736 private long serverSock = 0; | |
737 | |
738 private long inetAddress; | |
739 | |
740 AcceptorThread(int port) { | |
741 this.port = port; | |
742 setDaemon(true); | |
743 } | |
744 | |
745 void prepare() throws IOException { | |
746 try { | |
747 // Create the pool for the server socket | |
748 serverSockPool = Pool.create(getRootPool()); | |
749 | |
750 int family = Socket.APR_INET; | |
751 inetAddress = | |
752 Address.info(null, family, port, 0, serverSockPool); | |
753 | |
754 // Create the APR server socket | |
755 serverSock = Socket.create(family, | |
756 Socket.SOCK_STREAM, | |
757 Socket.APR_PROTO_TCP, serverSockPool); | |
758 | |
759 if (OS.IS_UNIX) { | |
760 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); | |
761 } | |
762 // Deal with the firewalls that tend to drop the inactive socket
s | |
763 Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1); | |
764 // Bind the server socket | |
765 int ret = Socket.bind(serverSock, inetAddress); | |
766 if (ret != 0) { | |
767 throw new IOException("Socket.bind " + ret + " " + | |
768 Error.strerror(ret) + " port=" + port); | |
769 } | |
770 // Start listening on the server socket | |
771 ret = Socket.listen(serverSock, backlog ); | |
772 if (ret != 0) { | |
773 throw new IOException("endpoint.init.listen" | |
774 + ret + " " + Error.strerror(ret)); | |
775 } | |
776 if (OS.IS_WIN32 || OS.IS_WIN64) { | |
777 // On Windows set the reuseaddr flag after the bind/listen | |
778 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); | |
779 } | |
780 | |
781 // Sendfile usage on systems which don't support it cause major
problems | |
782 if (useSendfile && !Library.APR_HAS_SENDFILE) { | |
783 useSendfile = false; | |
784 } | |
785 | |
786 // Delay accepting of new connections until data is available | |
787 // Only Linux kernels 2.4 + have that implemented | |
788 // on other platforms this call is noop and will return APR_ENOT
IMPL. | |
789 if (deferAccept) { | |
790 if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1
) == Status.APR_ENOTIMPL) { | |
791 deferAccept = false; | |
792 } | |
793 } | |
794 } catch (Throwable t) { | |
795 throw new IOException(t); | |
796 } | |
797 } | |
798 | |
799 void unblock() { | |
800 try (java.net.Socket sock = new java.net.Socket()) { | |
801 // Easiest ( maybe safest ) way to interrupt accept | |
802 // we could have it in non-blocking mode, etc | |
803 sock.connect(new InetSocketAddress("127.0.0.1", port)); | |
804 } catch (Exception ex) { | |
805 // ignore - the acceptor may have shut down by itself. | |
806 } | |
807 } | |
808 | |
809 @Override | |
810 public void run() { | |
811 while (running) { | |
812 try { | |
813 // each socket has a pool. | |
814 final AprSocket ch = newSocket(AprSocketContext.this); | |
815 ch.setStatus(AprSocket.ACCEPTED); | |
816 | |
817 ch.socket = Socket.accept(serverSock); | |
818 if (!running) { | |
819 break; | |
820 } | |
821 connectionsCount.incrementAndGet(); | |
822 if (connectionsCount.get() % 1000 == 0) { | |
823 System.err.println("Accepted: " + connectionsCount.get()
); | |
824 } | |
825 | |
826 if (nonBlockingAccept && !sslMode) { | |
827 ch.setStatus(AprSocket.CONNECTED); | |
828 // TODO: SSL really needs a thread. | |
829 onSocket(ch); | |
830 } else { | |
831 acceptedQueue.add(ch); | |
832 } | |
833 } catch (Throwable e) { | |
834 e.printStackTrace(); | |
835 } | |
836 } | |
837 Socket.close(serverSock); | |
838 } | |
839 } | |
840 | |
841 private class AcceptorDispatchThread extends Thread { | |
842 | |
843 AcceptorDispatchThread() { | |
844 setDaemon(true); | |
845 } | |
846 | |
847 @Override | |
848 public void run() { | |
849 while(running) { | |
850 try { | |
851 AprSocket ch = acceptedQueue.take(); | |
852 if (ch == END) { | |
853 return; | |
854 } | |
855 connectExecutor.execute(ch); | |
856 } catch (InterruptedException e) { | |
857 } | |
858 } | |
859 } | |
860 } | |
861 | |
862 /** | |
863 * Create the poller. With some versions of APR, the maximum poller size wil
l | |
864 * be 62 (recompiling APR is necessary to remove this limitation). | |
865 * @throws IOException | |
866 */ | |
867 AprPoller allocatePoller() throws IOException { | |
868 long pool = Pool.create(getRootPool()); | |
869 int size = maxConnections / pollerThreadCount; | |
870 | |
871 long serverPollset = allocatePoller(size, pool); | |
872 | |
873 if (serverPollset == 0 && size > 1024) { | |
874 log.severe("Falling back to 1024-sized poll, won't scale"); | |
875 size = 1024; | |
876 serverPollset = allocatePoller(size, pool); | |
877 } | |
878 if (serverPollset == 0) { | |
879 log.severe("Falling back to 62-sized poll, won't scale"); | |
880 size = 62; | |
881 serverPollset = allocatePoller(size, pool); | |
882 } | |
883 | |
884 AprPoller res = new AprPoller(); | |
885 res.pool = pool; | |
886 res.serverPollset = serverPollset; | |
887 res.desc = new long[size * 2]; | |
888 res.size = size; | |
889 res.id = contextId++; | |
890 res.setDaemon(true); | |
891 res.setName("AprPoller-" + res.id); | |
892 res.start(); | |
893 if (debugPoll && !sizeLogged) { | |
894 sizeLogged = true; | |
895 log.info("Poller size " + (res.desc.length / 2)); | |
896 } | |
897 return res; | |
898 } | |
899 | |
900 // Removed the 'thread safe' updates for now, to simplify the code | |
901 // last test shows a small improvement, can switch later. | |
902 private static boolean sizeLogged = false; | |
903 | |
904 protected long allocatePoller(int size, long pool) { | |
905 int flag = threadSafe ? Poll.APR_POLLSET_THREADSAFE: 0; | |
906 for (int i = 0; i < 2; i++) { | |
907 try { | |
908 // timeout must be -1 - or ttl will take effect, strange result
s. | |
909 return Poll.create(size, pool, flag, -1); | |
910 } catch (Error e) { | |
911 e.printStackTrace(); | |
912 if (Status.APR_STATUS_IS_EINVAL(e.getError())) { | |
913 log.info(" endpoint.poll.limitedpollsize " + size); | |
914 return 0; | |
915 } else if (Status.APR_STATUS_IS_ENOTIMPL(e.getError())) { | |
916 // thread safe not supported | |
917 log.severe("THREAD SAFE NOT SUPPORTED" + e); | |
918 threadSafe = false; | |
919 // try again without the flags | |
920 continue; | |
921 } else { | |
922 log.severe("endpoint.poll.initfail" + e); | |
923 return 0; | |
924 } | |
925 } | |
926 } | |
927 log.severe("Unexpected ENOTIMPL with flag==0"); | |
928 return 0; | |
929 } | |
930 | |
931 class AprPoller extends Thread { | |
932 | |
933 private int id; | |
934 private int size; | |
935 private long serverPollset = 0; | |
936 private long pool = 0; | |
937 private long[] desc; | |
938 | |
939 private long lastPoll; | |
940 private long lastPollTime; | |
941 private final AtomicBoolean inPoll = new AtomicBoolean(false); | |
942 | |
943 // Should be replaced with socket data. | |
944 // used only to lookup by socket | |
945 private final Map<Long, AprSocket> channels = new HashMap<>(); | |
946 | |
947 // Active + pending, must be < desc.length / 2 | |
948 // The channel will also have poller=this when active or pending | |
949 // How many sockets have poller == this | |
950 private final AtomicInteger keepAliveCount = new AtomicInteger(); | |
951 // Tracks desc, how many sockets are actively polled | |
952 private final AtomicInteger polledCount = new AtomicInteger(); | |
953 | |
954 private final AtomicInteger pollCount = new AtomicInteger(); | |
955 | |
956 private final List<AprSocket> updates = new ArrayList<>(); | |
957 | |
958 @Override | |
959 public void run() { | |
960 if (!running) { | |
961 return; | |
962 } | |
963 if (debugPoll) { | |
964 log.info("Starting poller " + id + " " + (isServer() ? "SRV ": "
CLI ")); | |
965 } | |
966 long t0 = System.currentTimeMillis(); | |
967 while (running) { | |
968 try { | |
969 updates(); | |
970 | |
971 // Pool for the specified interval. Remove signaled sockets | |
972 synchronized (this) { | |
973 inPoll.set(true); | |
974 } | |
975 // if updates are added after updates and poll - interrupt w
ill have still | |
976 // work | |
977 | |
978 int rv = Poll.poll(serverPollset, pollTime, desc, true); | |
979 synchronized (this) { | |
980 inPoll.set(false); | |
981 if (!running) { | |
982 break; | |
983 } | |
984 } | |
985 | |
986 pollCount.incrementAndGet(); | |
987 lastPoll = System.currentTimeMillis(); | |
988 lastPollTime = lastPoll - t0; | |
989 | |
990 if (rv > 0) { | |
991 if (debugPoll) { | |
992 log.info(" Poll() id=" + id + " rv=" + rv + " keepAl
iveCount=" + keepAliveCount + | |
993 " polled = " + polledCount.get() | |
994 + " time=" + lastPollTime); | |
995 } | |
996 polledCount.addAndGet(-rv); | |
997 for (int pollIdx = 0; pollIdx < rv; pollIdx++) { | |
998 long sock = desc[pollIdx * 2 + 1]; | |
999 AprSocket ch; | |
1000 boolean blocking = false; | |
1001 | |
1002 synchronized (channels) { | |
1003 ch = channels.get(Long.valueOf(sock)); | |
1004 if (ch != null) { | |
1005 blocking = ch.isBlocking(); | |
1006 } else { | |
1007 log.severe("Polled socket not found !!!!!" +
Long.toHexString(sock)); | |
1008 // TODO: destroy/close the raw socket | |
1009 continue; | |
1010 } | |
1011 } | |
1012 // was removed from polling | |
1013 ch.clearStatus(AprSocket.POLL); | |
1014 | |
1015 // We just removed it ( see last param to poll()). | |
1016 // Check for failed sockets and hand this socket off
to a worker | |
1017 long mask = desc[pollIdx * 2]; | |
1018 | |
1019 boolean err = ((mask & Poll.APR_POLLERR) == Poll.APR
_POLLERR); | |
1020 boolean nval = ((mask & Poll.APR_POLLNVAL) != 0); | |
1021 if (err || nval) { | |
1022 System.err.println("ERR " + err + " NVAL " + nva
l); | |
1023 } | |
1024 | |
1025 boolean out = (mask & Poll.APR_POLLOUT) == Poll.APR_
POLLOUT; | |
1026 boolean in = (mask & Poll.APR_POLLIN) == Poll.APR_PO
LLIN; | |
1027 if (debugPoll) { | |
1028 log.info(" Poll channel: " + Long.toHexString(ma
sk) + | |
1029 (out ? " OUT" :"") + | |
1030 (in ? " IN": "") + | |
1031 (err ? " ERR" : "") + | |
1032 " Ch: " + ch); | |
1033 } | |
1034 | |
1035 // will be set again in process(), if all read/write
is done | |
1036 ch.clearStatus(AprSocket.POLLOUT); | |
1037 ch.clearStatus(AprSocket.POLLIN); | |
1038 | |
1039 // try to send if needed | |
1040 if (blocking) { | |
1041 synchronized (ch) { | |
1042 ch.notifyAll(); | |
1043 } | |
1044 getExecutor().execute(ch); | |
1045 } else { | |
1046 ((AprSocketContext.NonBlockingPollHandler) ch.ha
ndler).process(ch, in, out, false); | |
1047 | |
1048 // Update polling for the channel (in IO thread,
safe) | |
1049 updateIOThread(ch); | |
1050 } | |
1051 } | |
1052 } else if (rv < 0) { | |
1053 int errn = -rv; | |
1054 if (errn == Status.TIMEUP) { | |
1055 // to or interrupt | |
1056 // if (debugPoll) { | |
1057 // log.info(" Poll() timeup" + " keepAliveCount="
+ keepAliveCount + | |
1058 // " polled = " + polledCount.get() | |
1059 // + " time=" + lastPollTime); | |
1060 // } | |
1061 } else if (errn == Status.EINTR) { | |
1062 // interrupt - no need to log | |
1063 } else { | |
1064 if (debugPoll) { | |
1065 log.info(" Poll() rv=" + rv + " keepAliveCount="
+ keepAliveCount + | |
1066 " polled = " + polledCount.get() | |
1067 + " time=" + lastPollTime); | |
1068 } | |
1069 /* Any non timeup or interrupted error is critical *
/ | |
1070 if (errn > Status.APR_OS_START_USERERR) { | |
1071 errn -= Status.APR_OS_START_USERERR; | |
1072 } | |
1073 log.severe("endpoint.poll.fail " + errn + " " + Erro
r.strerror(errn)); | |
1074 // Handle poll critical failure | |
1075 synchronized (this) { | |
1076 destroyPoller(); // will close all sockets | |
1077 } | |
1078 continue; | |
1079 } | |
1080 } | |
1081 // TODO: timeouts | |
1082 } catch (Throwable t) { | |
1083 log.log(Level.SEVERE, "endpoint.poll.error", t); | |
1084 } | |
1085 | |
1086 } | |
1087 if (!running) { | |
1088 destroyPoller(); | |
1089 } | |
1090 } | |
1091 | |
1092 /** | |
1093 * Destroy the poller. | |
1094 */ | |
1095 protected void destroyPoller() { | |
1096 synchronized (pollers) { | |
1097 pollers.remove(this); | |
1098 } | |
1099 log.info("Poller stopped after cnt=" + | |
1100 pollCount.get() + | |
1101 " sockets=" + channels.size() + | |
1102 " lastPoll=" + lastPoll); | |
1103 | |
1104 // Close all sockets | |
1105 synchronized (this) { | |
1106 if (serverPollset == 0) { | |
1107 return; | |
1108 } | |
1109 | |
1110 // for (AprSocket ch: channels.values()) { | |
1111 // ch.poller = null; | |
1112 // ch.reset(); | |
1113 // } | |
1114 keepAliveCount.set(0); | |
1115 log.warning("Destroy pollset"); | |
1116 //serverPollset = 0; | |
1117 } | |
1118 Pool.destroy(pool); | |
1119 pool = 0; | |
1120 synchronized (pollers) { | |
1121 // Now we can destroy the root pool | |
1122 if (pollers.size() == 0 && !running) { | |
1123 log.info("Destroy server context"); | |
1124 // AprSocketContext.this.destroy(); | |
1125 } | |
1126 } | |
1127 } | |
1128 | |
1129 /** | |
1130 * Called only in poller thread, only used if not thread safe | |
1131 * @throws IOException | |
1132 */ | |
1133 protected void updates() throws IOException { | |
1134 synchronized (this) { | |
1135 for (AprSocket up: updates) { | |
1136 updateIOThread(up); | |
1137 } | |
1138 updates.clear(); | |
1139 } | |
1140 } | |
1141 | |
1142 void interruptPoll() { | |
1143 try { | |
1144 int rc = Status.APR_SUCCESS; | |
1145 synchronized (this) { | |
1146 if (serverPollset != 0) { | |
1147 rc = Poll.interrupt(serverPollset); | |
1148 } else { | |
1149 log.severe("Interrupt with closed pollset"); | |
1150 } | |
1151 } | |
1152 if (rc != Status.APR_SUCCESS) { | |
1153 log.severe("Failed interrupt and not thread safe"); | |
1154 } | |
1155 } catch (Throwable t) { | |
1156 t.printStackTrace(); | |
1157 if (pollTime > FALLBACK_POLL_TIME) { | |
1158 pollTime = FALLBACK_POLL_TIME; | |
1159 } | |
1160 } | |
1161 } | |
1162 | |
1163 | |
1164 int remaining() { | |
1165 synchronized (channels) { | |
1166 return (desc.length - channels.size() * 2); | |
1167 } | |
1168 } | |
1169 | |
1170 | |
1171 | |
1172 /** | |
1173 * Called from any thread, return true if we could add it | |
1174 * to pending. | |
1175 */ | |
1176 boolean add(AprSocket ch) throws IOException { | |
1177 synchronized (this) { | |
1178 if (!running) { | |
1179 return false; | |
1180 } | |
1181 if (keepAliveCount.get() >= size) { | |
1182 return false; | |
1183 } | |
1184 keepAliveCount.incrementAndGet(); | |
1185 ch.poller = this; | |
1186 } | |
1187 | |
1188 requestUpdate(ch); | |
1189 | |
1190 return true; | |
1191 } | |
1192 | |
1193 /** | |
1194 * May be called outside of IOThread. | |
1195 */ | |
1196 protected void requestUpdate(AprSocket ch) throws IOException { | |
1197 synchronized (this) { | |
1198 if (!running) { | |
1199 return; | |
1200 } | |
1201 } | |
1202 if (isPollerThread()) { | |
1203 updateIOThread(ch); | |
1204 } else { | |
1205 synchronized (this) { | |
1206 if (!updates.contains(ch)) { | |
1207 updates.add(ch); | |
1208 } | |
1209 interruptPoll(); | |
1210 } | |
1211 if (debugPoll) { | |
1212 log.info("Poll: requestUpdate " + id + " " + ch); | |
1213 } | |
1214 } | |
1215 } | |
1216 | |
1217 private void updateIOThread(AprSocket ch) throws IOException { | |
1218 if (!running || ch.socket == 0) { | |
1219 return; | |
1220 } | |
1221 // called from IO thread, either in 'updates' or after | |
1222 // poll. | |
1223 //synchronized (ch) | |
1224 boolean polling = ch.checkPreConnect(AprSocket.POLL); | |
1225 | |
1226 int requested = ch.requestedPolling(); | |
1227 if (requested == 0) { | |
1228 if (polling) { | |
1229 removeSafe(ch); | |
1230 } | |
1231 if (ch.isClosed()) { | |
1232 synchronized (channels) { | |
1233 ch.poller = null; | |
1234 channels.remove(Long.valueOf(ch.socket)); | |
1235 } | |
1236 keepAliveCount.decrementAndGet(); | |
1237 ch.reset(); | |
1238 } | |
1239 } else { | |
1240 if (polling) { | |
1241 removeSafe(ch); | |
1242 } | |
1243 // will close if error | |
1244 pollAdd(ch, requested); | |
1245 } | |
1246 if (debugPoll) { | |
1247 log.info("Poll: updated=" + id + " " + ch); | |
1248 } | |
1249 } | |
1250 | |
1251 /** | |
1252 * Called only from IO thread | |
1253 */ | |
1254 private void pollAdd(AprSocket up, int req) throws IOException { | |
1255 boolean failed = false; | |
1256 int rv; | |
1257 synchronized (channels) { | |
1258 if (up.isClosed()) { | |
1259 return; | |
1260 } | |
1261 rv = Poll.add(serverPollset, up.socket, req); | |
1262 if (rv != Status.APR_SUCCESS) { | |
1263 up.poller = null; | |
1264 keepAliveCount.decrementAndGet(); | |
1265 failed = true; | |
1266 } else { | |
1267 polledCount.incrementAndGet(); | |
1268 channels.put(Long.valueOf(up.socket), up); | |
1269 up.setStatus(AprSocket.POLL); | |
1270 } | |
1271 } | |
1272 if (failed) { | |
1273 up.reset(); | |
1274 throw new IOException("poll add error " + rv + " " + up + " " +
Error.strerror(rv)); | |
1275 } | |
1276 } | |
1277 | |
1278 /** | |
1279 * Called only from IO thread. Remove from Poll and channels, | |
1280 * set POLL bit to false. | |
1281 */ | |
1282 private void removeSafe(AprSocket up) { | |
1283 int rv = Status.APR_EGENERAL; | |
1284 if (running && serverPollset != 0 && up.socket != 0 | |
1285 && !up.isClosed()) { | |
1286 rv = Poll.remove(serverPollset, up.socket); | |
1287 } | |
1288 up.clearStatus(AprSocket.POLL); | |
1289 | |
1290 if (rv != Status.APR_SUCCESS) { | |
1291 log.severe("poll remove error " + Error.strerror(rv) + " " + up
); | |
1292 } else { | |
1293 polledCount.decrementAndGet(); | |
1294 } | |
1295 } | |
1296 | |
1297 | |
1298 public boolean isPollerThread() { | |
1299 return Thread.currentThread() == this; | |
1300 } | |
1301 | |
1302 } | |
1303 | |
1304 /** | |
1305 * Callback for poll events, will be invoked in a thread pool. | |
1306 * | |
1307 */ | |
1308 public static interface BlockingPollHandler { | |
1309 | |
1310 /** | |
1311 * Called when the socket has been polled for in, out or closed. | |
1312 * | |
1313 * | |
1314 */ | |
1315 public void process(AprSocket ch, boolean in, boolean out, boolean close
); | |
1316 | |
1317 | |
1318 /** | |
1319 * Called just before the socket is destroyed | |
1320 */ | |
1321 public void closed(AprSocket ch); | |
1322 } | |
1323 | |
1324 /** | |
1325 * Additional callbacks for non-blocking. | |
1326 * This can be much faster - but it's harder to code, should be used only | |
1327 * for low-level protocol implementation, proxies, etc. | |
1328 * | |
1329 * The model is restricted in many ways to avoid complexity and bugs: | |
1330 * | |
1331 * - read can only happen in the IO thread associated with the poller | |
1332 * - user doesn't control poll interest - it is set automatically based | |
1333 * on read/write results | |
1334 * - it is only possible to suspend read, for TCP flow control - also | |
1335 * only from the IO thread. Resume can happen from any thread. | |
1336 * - it is also possible to call write() from any thread | |
1337 */ | |
1338 public static interface NonBlockingPollHandler extends BlockingPollHandler { | |
1339 | |
1340 /** | |
1341 * Called after connection is established, in a thread pool. | |
1342 * Process will be called next. | |
1343 */ | |
1344 public void connected(AprSocket ch); | |
1345 | |
1346 /** | |
1347 * Before close, if an exception happens. | |
1348 */ | |
1349 public void error(AprSocket ch, Throwable t); | |
1350 } | |
1351 | |
1352 } | |
OLD | NEW |