Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: java/src/org/apache/tomcat/jni/socket/AprSocketContext.java

Issue 2842333002: Updated netty-tcnative to version 2.0.0.Final (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.jni.socket;
18
19 import java.io.IOException;
20 import java.net.InetSocketAddress;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.RejectedExecutionHandler;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
39
40 import org.apache.tomcat.jni.Address;
41 import org.apache.tomcat.jni.Error;
42 import org.apache.tomcat.jni.Library;
43 import org.apache.tomcat.jni.OS;
44 import org.apache.tomcat.jni.Poll;
45 import org.apache.tomcat.jni.Pool;
46 import org.apache.tomcat.jni.SSL;
47 import org.apache.tomcat.jni.SSLContext;
48 import org.apache.tomcat.jni.SSLExt;
49 import org.apache.tomcat.jni.Socket;
50 import org.apache.tomcat.jni.Status;
51
52 public class AprSocketContext {
53 /**
54 * Called when a chunk of data is sent or received. This is very low
55 * level, used mostly for debugging or stats.
56 */
57 public static interface RawDataHandler {
58 public void rawData(AprSocket ch, boolean input, byte[] data, int pos,
59 int len, int requested, boolean closed);
60 }
61
62 /**
63 * Called in SSL mode after the handshake is completed.
64 *
65 * @see AprSocketContext#customVerification(TlsCertVerifier)
66 */
67 public static interface TlsCertVerifier {
68 public void handshakeDone(AprSocket ch);
69 }
70
71 /**
72 * Delegates loading of persistent info about a host - public certs,
73 * tickets, config, persistent info etc.
74 */
75 public static interface HostInfoLoader {
76 public HostInfo getHostInfo(String name, int port, boolean ssl);
77 }
78
79 private static final Logger log = Logger.getLogger("AprSocketCtx");
80
81 // If interrupt() or thread-safe poll update are not supported - the
82 // poll updates will happen after the poll() timeout.
83 // The poll timeout with interrupt/thread safe updates can be much higher/
84 private static final int FALLBACK_POLL_TIME = 2000;
85
86 // It seems to send the ticket, get server helo / ChangeCipherSpec, but than
87 // SSL3_GET_RECORD:decryption failed or bad record mac in s3_pkt.c:480:
88 // Either bug in openssl, or some combination of ciphers - needs more debugg ing.
89 // ( this can save a roundtrip and CPU on TLS handshake )
90 boolean USE_TICKETS = false;
91
92 private final AprSocket END = new AprSocket(this);
93
94 private static final AtomicInteger contextNumber = new AtomicInteger();
95 private int contextId;
96
97 private final AtomicInteger threadNumber = new AtomicInteger();
98
99 /**
100 * For now - single acceptor thread per connector.
101 */
102 private AcceptorThread acceptor;
103 private AcceptorDispatchThread acceptorDispatch;
104
105 // APR/JNI is thread safe
106 private boolean threadSafe = true;
107
108 /**
109 * Pollers.
110 */
111 private final List<AprPoller> pollers = new ArrayList<>();
112
113 // Set on all accepted or connected sockets.
114 // TODO: add the other properties
115 boolean tcpNoDelay = true;
116
117 protected boolean running = true;
118
119 protected boolean sslMode;
120
121 // onSocket() will be called in accept thread.
122 // If false: use executor ( but that may choke the acceptor thread )
123 private boolean nonBlockingAccept = false;
124
125 private final BlockingQueue<AprSocket> acceptedQueue =
126 new LinkedBlockingQueue<>();
127
128 /**
129 * Root APR memory pool.
130 */
131 private long rootPool = 0;
132
133 /**
134 * SSL context.
135 */
136 private volatile long sslCtx = 0;
137
138 TlsCertVerifier tlsCertVerifier;
139
140 //
141 final int connectTimeout = 20000;
142 final int defaultTimeout = 100000;
143 // TODO: Use this
144 final int keepAliveTimeout = 20000;
145
146 final AtomicInteger open = new AtomicInteger();
147
148 /**
149 * Poll interval, in microseconds. If the platform doesn't support
150 * poll interrupt - it'll take this time to stop the poller.
151 *
152 */
153 private int pollTime = 5 * 1000000;
154
155 private HostInfoLoader hostInfoLoader;
156
157 final RawDataHandler rawDataHandler = null;
158
159 // TODO: do we need this here ?
160 private final Map<String, HostInfo> hosts = new HashMap<>();
161
162 private String certFile;
163 private String keyFile;
164
165 private byte[] spdyNPN;
166
167 private byte[] ticketKey;
168
169 // For resolving DNS ( i.e. connect ), callbacks
170 private ExecutorService threadPool;
171
172 // Separate executor for connect/handshakes
173 final ExecutorService connectExecutor;
174
175 final boolean debugSSL = false;
176 private boolean debugPoll = false;
177
178 private boolean deferAccept = false;
179
180 private int backlog = 100;
181
182 private boolean useSendfile;
183
184 private int sslProtocol = SSL.SSL_PROTOCOL_TLSV1 | SSL.SSL_PROTOCOL_TLSV1_1 | SSL.SSL_PROTOCOL_TLSV1_2;
185
186 /**
187 * Max time spent in a callback ( will be longer for blocking )
188 */
189 final AtomicLong maxHandlerTime = new AtomicLong();
190 final AtomicLong totalHandlerTime = new AtomicLong();
191 final AtomicLong handlerCount = new AtomicLong();
192
193 /**
194 * Total connections handled ( accepted or connected ).
195 */
196 private final AtomicInteger connectionsCount = new AtomicInteger();
197
198
199 public AprSocketContext() {
200 connectExecutor =new ThreadPoolExecutor(0, 64, 5, TimeUnit.SECONDS,
201 new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandle r() {
202 @Override
203 public void rejectedExecution(Runnable r,
204 java.util.concurrent.ThreadPoolExecutor executor) {
205 AprSocket s = (AprSocket) r;
206 log.severe("Rejecting " + s);
207 s.reset();
208 }
209 });
210 contextId = contextNumber.incrementAndGet();
211 }
212
213 /**
214 * Poller thread count.
215 */
216 private int pollerThreadCount = 4;
217 public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadC ount = pollerThreadCount; }
218 public int getPollerThreadCount() { return pollerThreadCount; }
219
220 // to test the limits - default should be lower
221 private int maxConnections = 64 * 1024;
222 public void setMaxconnections(int maxCon) {
223 this.maxConnections = maxCon;
224 }
225
226 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlo g; }
227 public int getBacklog() { return backlog; }
228
229 /**
230 * Defer accept.
231 */
232 public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAc cept; }
233 public boolean getDeferAccept() { return deferAccept; }
234
235 /**
236 * For client:
237 * - ClientHello will include the npn extension ( the ID == 0x3374)
238 * - if ServerHello includes a list of protocols - select one
239 * - send it after ChangeCipherSpec and before Finish
240 *
241 * For server:
242 * - if ClientHello includes the npn extension
243 * -- will send this string as list of supported protocols in ServerHello
244 * - read the selection before Finish.
245 * @param npn
246 */
247 public void setNpn(String npn) {
248 byte[] data = npn.getBytes();
249 byte[] npnB = new byte[data.length + 2];
250
251 System.arraycopy(data, 0, npnB, 1, data.length);
252 npnB[0] = (byte) data.length;
253 npnB[npnB.length - 1] = 0;
254 spdyNPN = npnB;
255
256 }
257
258 public void setNpn(byte[] data) {
259 spdyNPN = data;
260 }
261
262 public void setHostLoader(HostInfoLoader handler) {
263 this.hostInfoLoader = handler;
264 }
265
266 public boolean isServer() {
267 return acceptor != null;
268 }
269
270 protected Executor getExecutor() {
271 if (threadPool == null) {
272 threadPool = Executors.newCachedThreadPool(new ThreadFactory( ) {
273 @Override
274 public Thread newThread(Runnable r) {
275 Thread t = new Thread(r, "AprThread-" + contextId + "-" +
276 threadNumber.incrementAndGet());
277 t.setDaemon(true);
278 return t;
279 }
280 });
281 }
282 return threadPool;
283 }
284
285 /**
286 * All accepted/connected sockets will start handshake automatically.
287 */
288 public AprSocketContext setTls() {
289 this.sslMode = true;
290 return this;
291 }
292
293 public void setTcpNoDelay(boolean b) {
294 tcpNoDelay = b;
295 }
296
297 public void setSslProtocol(String protocol) {
298 protocol = protocol.trim();
299 if ("SSLv2".equalsIgnoreCase(protocol)) {
300 sslProtocol = SSL.SSL_PROTOCOL_SSLV2;
301 } else if ("SSLv3".equalsIgnoreCase(protocol)) {
302 sslProtocol = SSL.SSL_PROTOCOL_SSLV3;
303 } else if ("TLSv1".equalsIgnoreCase(protocol)) {
304 sslProtocol = SSL.SSL_PROTOCOL_TLSV1;
305 } else if ("TLSv1.1".equalsIgnoreCase(protocol)) {
306 sslProtocol = SSL.SSL_PROTOCOL_TLSV1_1;
307 } else if ("TLSv1.2".equalsIgnoreCase(protocol)) {
308 sslProtocol = SSL.SSL_PROTOCOL_TLSV1_2;
309 } else if ("all".equalsIgnoreCase(protocol)) {
310 sslProtocol = SSL.SSL_PROTOCOL_ALL;
311 }
312 }
313
314 public void setTicketKey(byte[] key48Bytes) {
315 if(key48Bytes.length != 48) {
316 throw new RuntimeException("Key must be 48 bytes");
317 }
318 this.ticketKey = key48Bytes;
319 }
320
321 public void customVerification(TlsCertVerifier verifier) {
322 tlsCertVerifier = verifier;
323 }
324
325 // TODO: should have a separate method for switching to tls later.
326 /**
327 * Set certificate, will also enable TLS mode.
328 */
329 public AprSocketContext setKeys(String certPemFile, String keyDerFile) {
330 this.sslMode = true;
331 setTls();
332 certFile = certPemFile;
333 keyFile = keyDerFile;
334 return this;
335 }
336
337 /**
338 * SSL cipher suite.
339 */
340 private String SSLCipherSuite = "ALL";
341 public String getSSLCipherSuite() { return SSLCipherSuite; }
342 public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
343
344 /**
345 * Override or use hostInfoLoader to implement persistent/memcache storage.
346 */
347 public HostInfo getHostInfo(String host, int port, boolean ssl) {
348 if (hostInfoLoader != null) {
349 return hostInfoLoader.getHostInfo(host, port, ssl);
350 }
351 // Use local cache
352 String key = host + ":" + port;
353 HostInfo pi = hosts.get(key);
354 if (pi != null) {
355 return pi;
356 }
357 pi = new HostInfo(host, port, ssl);
358 hosts.put(key, pi);
359 return pi;
360 }
361
362 protected void rawData(AprSocket ch, boolean inp, byte[] data, int pos,
363 int len, int requested, boolean closed) {
364 if (rawDataHandler != null) {
365 rawDataHandler.rawData(ch, inp, data, pos, len, requested, closed);
366 }
367 }
368
369 public void listen(final int port) throws IOException {
370 if (acceptor != null) {
371 throw new IOException("Already accepting on " + acceptor.port);
372 }
373 if (sslMode && certFile == null) {
374 throw new IOException("Missing certificates for server");
375 }
376 if (sslMode || !nonBlockingAccept) {
377 acceptorDispatch = new AcceptorDispatchThread();
378 acceptorDispatch.setName("AprAcceptorDispatch-" + port);
379 acceptorDispatch.start();
380 }
381
382 acceptor = new AcceptorThread(port);
383 acceptor.prepare();
384 acceptor.setName("AprAcceptor-" + port);
385 acceptor.start();
386
387
388 }
389
390 /**
391 * Get a socket for connectiong to host:port.
392 */
393 public AprSocket socket(String host, int port, boolean ssl) {
394 HostInfo hi = getHostInfo(host, port, ssl);
395 return socket(hi);
396 }
397
398 public AprSocket socket(HostInfo hi) {
399 AprSocket sock = newSocket(this);
400 sock.setHost(hi);
401 return sock;
402 }
403
404 public AprSocket socket(long socket) {
405 AprSocket sock = newSocket(this);
406 // Tomcat doesn't set this
407 SSLExt.sslSetMode(socket, SSLExt.SSL_MODE_ENABLE_PARTIAL_WRITE |
408 SSLExt.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
409 sock.setStatus(AprSocket.ACCEPTED);
410 sock.socket = socket;
411 return sock;
412 }
413
414
415 void destroySocket(AprSocket socket) {
416 // TODO: does it need to be done in io thread ?
417 synchronized (socket) {
418 if (socket.socket != 0) {
419 long s = socket.socket;
420 socket.socket = 0;
421 log.info("DESTROY: " + Long.toHexString(s));
422 Socket.destroy(s);
423 }
424 }
425 }
426
427 protected void connectBlocking(AprSocket apr) throws IOException {
428 try {
429 if (!running) {
430 throw new IOException("Stopped");
431 }
432 HostInfo hi = apr.getHost();
433
434 long clientSockP;
435 synchronized (pollers) {
436 long socketpool = Pool.create(getRootPool());
437
438 int family = Socket.APR_INET;
439
440 clientSockP = Socket.create(family,
441 Socket.SOCK_STREAM,
442 Socket.APR_PROTO_TCP, socketpool); // or rootPool ?
443 }
444 Socket.timeoutSet(clientSockP, connectTimeout * 1000);
445 if (OS.IS_UNIX) {
446 Socket.optSet(clientSockP, Socket.APR_SO_REUSEADDR, 1);
447 }
448
449 Socket.optSet(clientSockP, Socket.APR_SO_KEEPALIVE, 1);
450
451 // Blocking
452 // TODO: use socket pool
453 // TODO: cache it ( and TTL ) in hi
454 long inetAddress = Address.info(hi.host, Socket.APR_INET,
455 hi.port, 0, rootPool);
456 // this may take a long time - stop/destroy must wait
457 // at least connect timeout
458 int rc = Socket.connect(clientSockP, inetAddress);
459
460 if (rc != 0) {
461 synchronized (pollers) {
462 Socket.close(clientSockP);
463 Socket.destroy(clientSockP);
464 }
465 /////Pool.destroy(socketpool);
466 throw new IOException("Socket.connect(): " + rc + " " + Error.st rerror(rc) + " " + connectTimeout);
467 }
468 if (!running) {
469 throw new IOException("Stopped");
470 }
471
472 connectionsCount.incrementAndGet();
473 if (tcpNoDelay) {
474 Socket.optSet(clientSockP, Socket.APR_TCP_NODELAY, 1);
475 }
476
477 Socket.timeoutSet(clientSockP, defaultTimeout * 1000);
478
479 apr.socket = clientSockP;
480
481 apr.afterConnect();
482 } catch (IOException e) {
483 apr.reset();
484 throw e;
485 } catch (Throwable e) {
486 apr.reset();
487 e.printStackTrace();
488 throw new IOException(e);
489 }
490 }
491
492 AprSocket newSocket(AprSocketContext context) {
493 return new AprSocket(context);
494 }
495
496 /**
497 * To clean the pools - we could track if all channels are
498 * closed, but this seems simpler and safer.
499 */
500 @Override
501 protected void finalize() throws Throwable {
502 if (rootPool != 0) {
503 log.warning(this + " GC without stop()");
504 try {
505 stop();
506 } catch (Exception e) {
507 //TODO Auto-generated catch block
508 e.printStackTrace();
509 }
510 }
511 super.finalize();
512 }
513
514
515 public void stop() {
516 synchronized (pollers) {
517 if (!running) {
518 return;
519 }
520 running = false;
521 }
522
523 if (rootPool != 0) {
524 if (acceptor != null) {
525 try {
526 acceptor.unblock();
527 acceptor.join();
528 } catch (InterruptedException e) {
529 e.printStackTrace();
530 }
531 }
532 if (acceptorDispatch != null) {
533 acceptedQueue.add(END);
534 try {
535 acceptorDispatch.join();
536 } catch (InterruptedException e) {
537 e.printStackTrace();
538 }
539 }
540 if (threadPool != null) {
541 threadPool.shutdownNow();
542 }
543
544 log.info("Stopping pollers " + contextId);
545
546 while (true) {
547 AprPoller a;
548 synchronized (pollers) {
549 if (pollers.size() == 0) {
550 break;
551 }
552 a = pollers.remove(0);
553 }
554 a.interruptPoll();
555 try {
556 a.join();
557 log.info("Poller " + a.id + " done ");
558 } catch (InterruptedException e) {
559 e.printStackTrace();
560 }
561 }
562 }
563 }
564
565
566 // Called when the last poller has been destroyed.
567 void destroy() {
568 synchronized (pollers) {
569 if (pollers.size() != 0) {
570 return;
571 }
572
573 if (rootPool == 0) {
574 return;
575 }
576 log.info("Destroy root pool " + rootPool);
577 //Pool.destroy(rootPool);
578 //rootPool = 0;
579 }
580 }
581
582 private static IOException noApr;
583 static {
584
585 try {
586 Library.initialize(null);
587 SSL.initialize(null);
588 } catch (Exception e) {
589 noApr = new IOException("APR not present", e);
590 }
591
592 }
593
594 private long getRootPool() throws IOException {
595 if (rootPool == 0) {
596 if (noApr != null) {
597 throw noApr;
598 }
599 // Create the root APR memory pool
600 rootPool = Pool.create(0);
601
602 // Adjust poller sizes
603 if ((OS.IS_WIN32 || OS.IS_WIN64) && (maxConnections > 1024)) {
604 // The maximum per poller to get reasonable performance is 1024
605 pollerThreadCount = maxConnections / 1024;
606 // Adjust poller size so that it won't reach the limit
607 maxConnections = maxConnections - (maxConnections % 1024);
608 }
609 }
610 return rootPool;
611 }
612
613 long getSslCtx() throws Exception {
614 if (sslCtx == 0) {
615 synchronized (AprSocketContext.class) {
616 if (sslCtx == 0) {
617 boolean serverMode = acceptor != null;
618 sslCtx = SSLContext.make(getRootPool(),
619 sslProtocol,
620 serverMode ? SSL.SSL_MODE_SERVER : SSL.SSL_MODE_CLIE NT);
621
622
623 // SSL.SSL_OP_NO_SSLv3
624 int opts = SSL.SSL_OP_NO_SSLv2 |
625 SSL.SSL_OP_SINGLE_DH_USE;
626
627 if (!USE_TICKETS || serverMode && ticketKey == null) {
628 opts |= SSL.SSL_OP_NO_TICKET;
629 }
630
631 SSLContext.setOptions(sslCtx, opts);
632 // Set revocation
633 // SSLContext.setCARevocation(sslContext, SSLCARevoca tionFile, SSLCARevocationPath);
634
635 // Client certificate verification - maybe make it option
636 try {
637 SSLContext.setCipherSuite(sslCtx, SSLCipherSuite);
638
639
640 if (serverMode) {
641 if (ticketKey != null) {
642 //SSLExt.setTicketKeys(sslCtx, ticketKey, ticket Key.length);
643 }
644 if (certFile != null) {
645 boolean rc = SSLContext.setCertificate(sslCtx,
646 certFile,
647 keyFile, null, SSL.SSL_AIDX_DSA);
648 if (!rc) {
649 throw new IOException("Can't set keys");
650 }
651 }
652 SSLContext.setVerify(sslCtx, SSL.SSL_CVERIFY_NONE, 1 0);
653
654 if (spdyNPN != null) {
655 SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length);
656 }
657 } else {
658 if (tlsCertVerifier != null) {
659 // NONE ?
660 SSLContext.setVerify(sslCtx,
661 SSL.SSL_CVERIFY_NONE, 10);
662 } else {
663 SSLContext.setCACertificate(sslCtx,
664 "/etc/ssl/certs/ca-certificates.crt",
665 "/etc/ssl/certs");
666 SSLContext.setVerify(sslCtx,
667 SSL.SSL_CVERIFY_REQUIRE, 10);
668 }
669
670 if (spdyNPN != null) {
671 SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length);
672 }
673 }
674 } catch (IOException e) {
675 throw e;
676 } catch (Exception e) {
677 throw new IOException(e);
678 }
679 }
680 // TODO: try release buffers
681 }
682 }
683 return sslCtx;
684 }
685
686 void findPollerAndAdd(AprSocket ch) throws IOException {
687 if (ch.poller != null) {
688 ch.poller.requestUpdate(ch);
689 return;
690 }
691 assignPoller(ch);
692 }
693
694 void assignPoller(AprSocket ch) throws IOException {
695 AprPoller target = null;
696 synchronized (pollers) {
697 // Make sure we have min number of pollers
698 int needPollers = pollerThreadCount - pollers.size();
699 if (needPollers > 0) {
700 for (int i = needPollers; i > 0; i--) {
701 pollers.add(allocatePoller());
702 }
703 }
704 int max = 0;
705 for (AprPoller poller: pollers) {
706 int rem = poller.remaining();
707 if (rem > max) {
708 target = poller;
709 max = rem;
710 }
711 }
712 }
713 if (target != null && target.add(ch)) {
714 return;
715 }
716
717 // can't be added - add a new poller
718 synchronized (pollers) {
719 AprPoller poller = allocatePoller();
720 poller.add(ch);
721 pollers.add(poller);
722 }
723 }
724
725 /**
726 * Called on each accepted socket (for servers) or after connection (client)
727 * after handshake.
728 */
729 protected void onSocket(@SuppressWarnings("unused") AprSocket s) {
730 // Defaults to NO-OP. Parameter is used by sub-classes.
731 }
732
733 private class AcceptorThread extends Thread {
734 private final int port;
735 private long serverSockPool = 0;
736 private long serverSock = 0;
737
738 private long inetAddress;
739
740 AcceptorThread(int port) {
741 this.port = port;
742 setDaemon(true);
743 }
744
745 void prepare() throws IOException {
746 try {
747 // Create the pool for the server socket
748 serverSockPool = Pool.create(getRootPool());
749
750 int family = Socket.APR_INET;
751 inetAddress =
752 Address.info(null, family, port, 0, serverSockPool);
753
754 // Create the APR server socket
755 serverSock = Socket.create(family,
756 Socket.SOCK_STREAM,
757 Socket.APR_PROTO_TCP, serverSockPool);
758
759 if (OS.IS_UNIX) {
760 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
761 }
762 // Deal with the firewalls that tend to drop the inactive socket s
763 Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
764 // Bind the server socket
765 int ret = Socket.bind(serverSock, inetAddress);
766 if (ret != 0) {
767 throw new IOException("Socket.bind " + ret + " " +
768 Error.strerror(ret) + " port=" + port);
769 }
770 // Start listening on the server socket
771 ret = Socket.listen(serverSock, backlog );
772 if (ret != 0) {
773 throw new IOException("endpoint.init.listen"
774 + ret + " " + Error.strerror(ret));
775 }
776 if (OS.IS_WIN32 || OS.IS_WIN64) {
777 // On Windows set the reuseaddr flag after the bind/listen
778 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
779 }
780
781 // Sendfile usage on systems which don't support it cause major problems
782 if (useSendfile && !Library.APR_HAS_SENDFILE) {
783 useSendfile = false;
784 }
785
786 // Delay accepting of new connections until data is available
787 // Only Linux kernels 2.4 + have that implemented
788 // on other platforms this call is noop and will return APR_ENOT IMPL.
789 if (deferAccept) {
790 if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1 ) == Status.APR_ENOTIMPL) {
791 deferAccept = false;
792 }
793 }
794 } catch (Throwable t) {
795 throw new IOException(t);
796 }
797 }
798
799 void unblock() {
800 try (java.net.Socket sock = new java.net.Socket()) {
801 // Easiest ( maybe safest ) way to interrupt accept
802 // we could have it in non-blocking mode, etc
803 sock.connect(new InetSocketAddress("127.0.0.1", port));
804 } catch (Exception ex) {
805 // ignore - the acceptor may have shut down by itself.
806 }
807 }
808
809 @Override
810 public void run() {
811 while (running) {
812 try {
813 // each socket has a pool.
814 final AprSocket ch = newSocket(AprSocketContext.this);
815 ch.setStatus(AprSocket.ACCEPTED);
816
817 ch.socket = Socket.accept(serverSock);
818 if (!running) {
819 break;
820 }
821 connectionsCount.incrementAndGet();
822 if (connectionsCount.get() % 1000 == 0) {
823 System.err.println("Accepted: " + connectionsCount.get() );
824 }
825
826 if (nonBlockingAccept && !sslMode) {
827 ch.setStatus(AprSocket.CONNECTED);
828 // TODO: SSL really needs a thread.
829 onSocket(ch);
830 } else {
831 acceptedQueue.add(ch);
832 }
833 } catch (Throwable e) {
834 e.printStackTrace();
835 }
836 }
837 Socket.close(serverSock);
838 }
839 }
840
841 private class AcceptorDispatchThread extends Thread {
842
843 AcceptorDispatchThread() {
844 setDaemon(true);
845 }
846
847 @Override
848 public void run() {
849 while(running) {
850 try {
851 AprSocket ch = acceptedQueue.take();
852 if (ch == END) {
853 return;
854 }
855 connectExecutor.execute(ch);
856 } catch (InterruptedException e) {
857 }
858 }
859 }
860 }
861
862 /**
863 * Create the poller. With some versions of APR, the maximum poller size wil l
864 * be 62 (recompiling APR is necessary to remove this limitation).
865 * @throws IOException
866 */
867 AprPoller allocatePoller() throws IOException {
868 long pool = Pool.create(getRootPool());
869 int size = maxConnections / pollerThreadCount;
870
871 long serverPollset = allocatePoller(size, pool);
872
873 if (serverPollset == 0 && size > 1024) {
874 log.severe("Falling back to 1024-sized poll, won't scale");
875 size = 1024;
876 serverPollset = allocatePoller(size, pool);
877 }
878 if (serverPollset == 0) {
879 log.severe("Falling back to 62-sized poll, won't scale");
880 size = 62;
881 serverPollset = allocatePoller(size, pool);
882 }
883
884 AprPoller res = new AprPoller();
885 res.pool = pool;
886 res.serverPollset = serverPollset;
887 res.desc = new long[size * 2];
888 res.size = size;
889 res.id = contextId++;
890 res.setDaemon(true);
891 res.setName("AprPoller-" + res.id);
892 res.start();
893 if (debugPoll && !sizeLogged) {
894 sizeLogged = true;
895 log.info("Poller size " + (res.desc.length / 2));
896 }
897 return res;
898 }
899
900 // Removed the 'thread safe' updates for now, to simplify the code
901 // last test shows a small improvement, can switch later.
902 private static boolean sizeLogged = false;
903
904 protected long allocatePoller(int size, long pool) {
905 int flag = threadSafe ? Poll.APR_POLLSET_THREADSAFE: 0;
906 for (int i = 0; i < 2; i++) {
907 try {
908 // timeout must be -1 - or ttl will take effect, strange result s.
909 return Poll.create(size, pool, flag, -1);
910 } catch (Error e) {
911 e.printStackTrace();
912 if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
913 log.info(" endpoint.poll.limitedpollsize " + size);
914 return 0;
915 } else if (Status.APR_STATUS_IS_ENOTIMPL(e.getError())) {
916 // thread safe not supported
917 log.severe("THREAD SAFE NOT SUPPORTED" + e);
918 threadSafe = false;
919 // try again without the flags
920 continue;
921 } else {
922 log.severe("endpoint.poll.initfail" + e);
923 return 0;
924 }
925 }
926 }
927 log.severe("Unexpected ENOTIMPL with flag==0");
928 return 0;
929 }
930
931 class AprPoller extends Thread {
932
933 private int id;
934 private int size;
935 private long serverPollset = 0;
936 private long pool = 0;
937 private long[] desc;
938
939 private long lastPoll;
940 private long lastPollTime;
941 private final AtomicBoolean inPoll = new AtomicBoolean(false);
942
943 // Should be replaced with socket data.
944 // used only to lookup by socket
945 private final Map<Long, AprSocket> channels = new HashMap<>();
946
947 // Active + pending, must be < desc.length / 2
948 // The channel will also have poller=this when active or pending
949 // How many sockets have poller == this
950 private final AtomicInteger keepAliveCount = new AtomicInteger();
951 // Tracks desc, how many sockets are actively polled
952 private final AtomicInteger polledCount = new AtomicInteger();
953
954 private final AtomicInteger pollCount = new AtomicInteger();
955
956 private final List<AprSocket> updates = new ArrayList<>();
957
958 @Override
959 public void run() {
960 if (!running) {
961 return;
962 }
963 if (debugPoll) {
964 log.info("Starting poller " + id + " " + (isServer() ? "SRV ": " CLI "));
965 }
966 long t0 = System.currentTimeMillis();
967 while (running) {
968 try {
969 updates();
970
971 // Pool for the specified interval. Remove signaled sockets
972 synchronized (this) {
973 inPoll.set(true);
974 }
975 // if updates are added after updates and poll - interrupt w ill have still
976 // work
977
978 int rv = Poll.poll(serverPollset, pollTime, desc, true);
979 synchronized (this) {
980 inPoll.set(false);
981 if (!running) {
982 break;
983 }
984 }
985
986 pollCount.incrementAndGet();
987 lastPoll = System.currentTimeMillis();
988 lastPollTime = lastPoll - t0;
989
990 if (rv > 0) {
991 if (debugPoll) {
992 log.info(" Poll() id=" + id + " rv=" + rv + " keepAl iveCount=" + keepAliveCount +
993 " polled = " + polledCount.get()
994 + " time=" + lastPollTime);
995 }
996 polledCount.addAndGet(-rv);
997 for (int pollIdx = 0; pollIdx < rv; pollIdx++) {
998 long sock = desc[pollIdx * 2 + 1];
999 AprSocket ch;
1000 boolean blocking = false;
1001
1002 synchronized (channels) {
1003 ch = channels.get(Long.valueOf(sock));
1004 if (ch != null) {
1005 blocking = ch.isBlocking();
1006 } else {
1007 log.severe("Polled socket not found !!!!!" + Long.toHexString(sock));
1008 // TODO: destroy/close the raw socket
1009 continue;
1010 }
1011 }
1012 // was removed from polling
1013 ch.clearStatus(AprSocket.POLL);
1014
1015 // We just removed it ( see last param to poll()).
1016 // Check for failed sockets and hand this socket off to a worker
1017 long mask = desc[pollIdx * 2];
1018
1019 boolean err = ((mask & Poll.APR_POLLERR) == Poll.APR _POLLERR);
1020 boolean nval = ((mask & Poll.APR_POLLNVAL) != 0);
1021 if (err || nval) {
1022 System.err.println("ERR " + err + " NVAL " + nva l);
1023 }
1024
1025 boolean out = (mask & Poll.APR_POLLOUT) == Poll.APR_ POLLOUT;
1026 boolean in = (mask & Poll.APR_POLLIN) == Poll.APR_PO LLIN;
1027 if (debugPoll) {
1028 log.info(" Poll channel: " + Long.toHexString(ma sk) +
1029 (out ? " OUT" :"") +
1030 (in ? " IN": "") +
1031 (err ? " ERR" : "") +
1032 " Ch: " + ch);
1033 }
1034
1035 // will be set again in process(), if all read/write is done
1036 ch.clearStatus(AprSocket.POLLOUT);
1037 ch.clearStatus(AprSocket.POLLIN);
1038
1039 // try to send if needed
1040 if (blocking) {
1041 synchronized (ch) {
1042 ch.notifyAll();
1043 }
1044 getExecutor().execute(ch);
1045 } else {
1046 ((AprSocketContext.NonBlockingPollHandler) ch.ha ndler).process(ch, in, out, false);
1047
1048 // Update polling for the channel (in IO thread, safe)
1049 updateIOThread(ch);
1050 }
1051 }
1052 } else if (rv < 0) {
1053 int errn = -rv;
1054 if (errn == Status.TIMEUP) {
1055 // to or interrupt
1056 // if (debugPoll) {
1057 // log.info(" Poll() timeup" + " keepAliveCount=" + keepAliveCount +
1058 // " polled = " + polledCount.get()
1059 // + " time=" + lastPollTime);
1060 // }
1061 } else if (errn == Status.EINTR) {
1062 // interrupt - no need to log
1063 } else {
1064 if (debugPoll) {
1065 log.info(" Poll() rv=" + rv + " keepAliveCount=" + keepAliveCount +
1066 " polled = " + polledCount.get()
1067 + " time=" + lastPollTime);
1068 }
1069 /* Any non timeup or interrupted error is critical * /
1070 if (errn > Status.APR_OS_START_USERERR) {
1071 errn -= Status.APR_OS_START_USERERR;
1072 }
1073 log.severe("endpoint.poll.fail " + errn + " " + Erro r.strerror(errn));
1074 // Handle poll critical failure
1075 synchronized (this) {
1076 destroyPoller(); // will close all sockets
1077 }
1078 continue;
1079 }
1080 }
1081 // TODO: timeouts
1082 } catch (Throwable t) {
1083 log.log(Level.SEVERE, "endpoint.poll.error", t);
1084 }
1085
1086 }
1087 if (!running) {
1088 destroyPoller();
1089 }
1090 }
1091
1092 /**
1093 * Destroy the poller.
1094 */
1095 protected void destroyPoller() {
1096 synchronized (pollers) {
1097 pollers.remove(this);
1098 }
1099 log.info("Poller stopped after cnt=" +
1100 pollCount.get() +
1101 " sockets=" + channels.size() +
1102 " lastPoll=" + lastPoll);
1103
1104 // Close all sockets
1105 synchronized (this) {
1106 if (serverPollset == 0) {
1107 return;
1108 }
1109
1110 // for (AprSocket ch: channels.values()) {
1111 // ch.poller = null;
1112 // ch.reset();
1113 // }
1114 keepAliveCount.set(0);
1115 log.warning("Destroy pollset");
1116 //serverPollset = 0;
1117 }
1118 Pool.destroy(pool);
1119 pool = 0;
1120 synchronized (pollers) {
1121 // Now we can destroy the root pool
1122 if (pollers.size() == 0 && !running) {
1123 log.info("Destroy server context");
1124 // AprSocketContext.this.destroy();
1125 }
1126 }
1127 }
1128
1129 /**
1130 * Called only in poller thread, only used if not thread safe
1131 * @throws IOException
1132 */
1133 protected void updates() throws IOException {
1134 synchronized (this) {
1135 for (AprSocket up: updates) {
1136 updateIOThread(up);
1137 }
1138 updates.clear();
1139 }
1140 }
1141
1142 void interruptPoll() {
1143 try {
1144 int rc = Status.APR_SUCCESS;
1145 synchronized (this) {
1146 if (serverPollset != 0) {
1147 rc = Poll.interrupt(serverPollset);
1148 } else {
1149 log.severe("Interrupt with closed pollset");
1150 }
1151 }
1152 if (rc != Status.APR_SUCCESS) {
1153 log.severe("Failed interrupt and not thread safe");
1154 }
1155 } catch (Throwable t) {
1156 t.printStackTrace();
1157 if (pollTime > FALLBACK_POLL_TIME) {
1158 pollTime = FALLBACK_POLL_TIME;
1159 }
1160 }
1161 }
1162
1163
1164 int remaining() {
1165 synchronized (channels) {
1166 return (desc.length - channels.size() * 2);
1167 }
1168 }
1169
1170
1171
1172 /**
1173 * Called from any thread, return true if we could add it
1174 * to pending.
1175 */
1176 boolean add(AprSocket ch) throws IOException {
1177 synchronized (this) {
1178 if (!running) {
1179 return false;
1180 }
1181 if (keepAliveCount.get() >= size) {
1182 return false;
1183 }
1184 keepAliveCount.incrementAndGet();
1185 ch.poller = this;
1186 }
1187
1188 requestUpdate(ch);
1189
1190 return true;
1191 }
1192
1193 /**
1194 * May be called outside of IOThread.
1195 */
1196 protected void requestUpdate(AprSocket ch) throws IOException {
1197 synchronized (this) {
1198 if (!running) {
1199 return;
1200 }
1201 }
1202 if (isPollerThread()) {
1203 updateIOThread(ch);
1204 } else {
1205 synchronized (this) {
1206 if (!updates.contains(ch)) {
1207 updates.add(ch);
1208 }
1209 interruptPoll();
1210 }
1211 if (debugPoll) {
1212 log.info("Poll: requestUpdate " + id + " " + ch);
1213 }
1214 }
1215 }
1216
1217 private void updateIOThread(AprSocket ch) throws IOException {
1218 if (!running || ch.socket == 0) {
1219 return;
1220 }
1221 // called from IO thread, either in 'updates' or after
1222 // poll.
1223 //synchronized (ch)
1224 boolean polling = ch.checkPreConnect(AprSocket.POLL);
1225
1226 int requested = ch.requestedPolling();
1227 if (requested == 0) {
1228 if (polling) {
1229 removeSafe(ch);
1230 }
1231 if (ch.isClosed()) {
1232 synchronized (channels) {
1233 ch.poller = null;
1234 channels.remove(Long.valueOf(ch.socket));
1235 }
1236 keepAliveCount.decrementAndGet();
1237 ch.reset();
1238 }
1239 } else {
1240 if (polling) {
1241 removeSafe(ch);
1242 }
1243 // will close if error
1244 pollAdd(ch, requested);
1245 }
1246 if (debugPoll) {
1247 log.info("Poll: updated=" + id + " " + ch);
1248 }
1249 }
1250
1251 /**
1252 * Called only from IO thread
1253 */
1254 private void pollAdd(AprSocket up, int req) throws IOException {
1255 boolean failed = false;
1256 int rv;
1257 synchronized (channels) {
1258 if (up.isClosed()) {
1259 return;
1260 }
1261 rv = Poll.add(serverPollset, up.socket, req);
1262 if (rv != Status.APR_SUCCESS) {
1263 up.poller = null;
1264 keepAliveCount.decrementAndGet();
1265 failed = true;
1266 } else {
1267 polledCount.incrementAndGet();
1268 channels.put(Long.valueOf(up.socket), up);
1269 up.setStatus(AprSocket.POLL);
1270 }
1271 }
1272 if (failed) {
1273 up.reset();
1274 throw new IOException("poll add error " + rv + " " + up + " " + Error.strerror(rv));
1275 }
1276 }
1277
1278 /**
1279 * Called only from IO thread. Remove from Poll and channels,
1280 * set POLL bit to false.
1281 */
1282 private void removeSafe(AprSocket up) {
1283 int rv = Status.APR_EGENERAL;
1284 if (running && serverPollset != 0 && up.socket != 0
1285 && !up.isClosed()) {
1286 rv = Poll.remove(serverPollset, up.socket);
1287 }
1288 up.clearStatus(AprSocket.POLL);
1289
1290 if (rv != Status.APR_SUCCESS) {
1291 log.severe("poll remove error " + Error.strerror(rv) + " " + up );
1292 } else {
1293 polledCount.decrementAndGet();
1294 }
1295 }
1296
1297
1298 public boolean isPollerThread() {
1299 return Thread.currentThread() == this;
1300 }
1301
1302 }
1303
1304 /**
1305 * Callback for poll events, will be invoked in a thread pool.
1306 *
1307 */
1308 public static interface BlockingPollHandler {
1309
1310 /**
1311 * Called when the socket has been polled for in, out or closed.
1312 *
1313 *
1314 */
1315 public void process(AprSocket ch, boolean in, boolean out, boolean close );
1316
1317
1318 /**
1319 * Called just before the socket is destroyed
1320 */
1321 public void closed(AprSocket ch);
1322 }
1323
1324 /**
1325 * Additional callbacks for non-blocking.
1326 * This can be much faster - but it's harder to code, should be used only
1327 * for low-level protocol implementation, proxies, etc.
1328 *
1329 * The model is restricted in many ways to avoid complexity and bugs:
1330 *
1331 * - read can only happen in the IO thread associated with the poller
1332 * - user doesn't control poll interest - it is set automatically based
1333 * on read/write results
1334 * - it is only possible to suspend read, for TCP flow control - also
1335 * only from the IO thread. Resume can happen from any thread.
1336 * - it is also possible to call write() from any thread
1337 */
1338 public static interface NonBlockingPollHandler extends BlockingPollHandler {
1339
1340 /**
1341 * Called after connection is established, in a thread pool.
1342 * Process will be called next.
1343 */
1344 public void connected(AprSocket ch);
1345
1346 /**
1347 * Before close, if an exception happens.
1348 */
1349 public void error(AprSocket ch, Throwable t);
1350 }
1351
1352 }
OLDNEW
« no previous file with comments | « java/src/org/apache/tomcat/jni/socket/AprSocket.java ('k') | java/src/org/apache/tomcat/jni/socket/HostInfo.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698