Index: java/src/org/apache/tomcat/jni/socket/AprSocketContext.java |
diff --git a/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java b/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java |
deleted file mode 100644 |
index 03036c9d23302465cf198163cd48156363edea37..0000000000000000000000000000000000000000 |
--- a/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java |
+++ /dev/null |
@@ -1,1352 +0,0 @@ |
-/* |
- * Licensed to the Apache Software Foundation (ASF) under one or more |
- * contributor license agreements. See the NOTICE file distributed with |
- * this work for additional information regarding copyright ownership. |
- * The ASF licenses this file to You under the Apache License, Version 2.0 |
- * (the "License"); you may not use this file except in compliance with |
- * the License. You may obtain a copy of the License at |
- * |
- * http://www.apache.org/licenses/LICENSE-2.0 |
- * |
- * Unless required by applicable law or agreed to in writing, software |
- * distributed under the License is distributed on an "AS IS" BASIS, |
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
- * See the License for the specific language governing permissions and |
- * limitations under the License. |
- */ |
-package org.apache.tomcat.jni.socket; |
- |
-import java.io.IOException; |
-import java.net.InetSocketAddress; |
-import java.util.ArrayList; |
-import java.util.HashMap; |
-import java.util.List; |
-import java.util.Map; |
-import java.util.concurrent.BlockingQueue; |
-import java.util.concurrent.Executor; |
-import java.util.concurrent.ExecutorService; |
-import java.util.concurrent.Executors; |
-import java.util.concurrent.LinkedBlockingQueue; |
-import java.util.concurrent.RejectedExecutionHandler; |
-import java.util.concurrent.ThreadFactory; |
-import java.util.concurrent.ThreadPoolExecutor; |
-import java.util.concurrent.TimeUnit; |
-import java.util.concurrent.atomic.AtomicBoolean; |
-import java.util.concurrent.atomic.AtomicInteger; |
-import java.util.concurrent.atomic.AtomicLong; |
-import java.util.logging.Level; |
-import java.util.logging.Logger; |
- |
-import org.apache.tomcat.jni.Address; |
-import org.apache.tomcat.jni.Error; |
-import org.apache.tomcat.jni.Library; |
-import org.apache.tomcat.jni.OS; |
-import org.apache.tomcat.jni.Poll; |
-import org.apache.tomcat.jni.Pool; |
-import org.apache.tomcat.jni.SSL; |
-import org.apache.tomcat.jni.SSLContext; |
-import org.apache.tomcat.jni.SSLExt; |
-import org.apache.tomcat.jni.Socket; |
-import org.apache.tomcat.jni.Status; |
- |
-public class AprSocketContext { |
- /** |
- * Called when a chunk of data is sent or received. This is very low |
- * level, used mostly for debugging or stats. |
- */ |
- public static interface RawDataHandler { |
- public void rawData(AprSocket ch, boolean input, byte[] data, int pos, |
- int len, int requested, boolean closed); |
- } |
- |
- /** |
- * Called in SSL mode after the handshake is completed. |
- * |
- * @see AprSocketContext#customVerification(TlsCertVerifier) |
- */ |
- public static interface TlsCertVerifier { |
- public void handshakeDone(AprSocket ch); |
- } |
- |
- /** |
- * Delegates loading of persistent info about a host - public certs, |
- * tickets, config, persistent info etc. |
- */ |
- public static interface HostInfoLoader { |
- public HostInfo getHostInfo(String name, int port, boolean ssl); |
- } |
- |
- private static final Logger log = Logger.getLogger("AprSocketCtx"); |
- |
- // If interrupt() or thread-safe poll update are not supported - the |
- // poll updates will happen after the poll() timeout. |
- // The poll timeout with interrupt/thread safe updates can be much higher/ |
- private static final int FALLBACK_POLL_TIME = 2000; |
- |
- // It seems to send the ticket, get server helo / ChangeCipherSpec, but than |
- // SSL3_GET_RECORD:decryption failed or bad record mac in s3_pkt.c:480: |
- // Either bug in openssl, or some combination of ciphers - needs more debugging. |
- // ( this can save a roundtrip and CPU on TLS handshake ) |
- boolean USE_TICKETS = false; |
- |
- private final AprSocket END = new AprSocket(this); |
- |
- private static final AtomicInteger contextNumber = new AtomicInteger(); |
- private int contextId; |
- |
- private final AtomicInteger threadNumber = new AtomicInteger(); |
- |
- /** |
- * For now - single acceptor thread per connector. |
- */ |
- private AcceptorThread acceptor; |
- private AcceptorDispatchThread acceptorDispatch; |
- |
- // APR/JNI is thread safe |
- private boolean threadSafe = true; |
- |
- /** |
- * Pollers. |
- */ |
- private final List<AprPoller> pollers = new ArrayList<>(); |
- |
- // Set on all accepted or connected sockets. |
- // TODO: add the other properties |
- boolean tcpNoDelay = true; |
- |
- protected boolean running = true; |
- |
- protected boolean sslMode; |
- |
- // onSocket() will be called in accept thread. |
- // If false: use executor ( but that may choke the acceptor thread ) |
- private boolean nonBlockingAccept = false; |
- |
- private final BlockingQueue<AprSocket> acceptedQueue = |
- new LinkedBlockingQueue<>(); |
- |
- /** |
- * Root APR memory pool. |
- */ |
- private long rootPool = 0; |
- |
- /** |
- * SSL context. |
- */ |
- private volatile long sslCtx = 0; |
- |
- TlsCertVerifier tlsCertVerifier; |
- |
- // |
- final int connectTimeout = 20000; |
- final int defaultTimeout = 100000; |
- // TODO: Use this |
- final int keepAliveTimeout = 20000; |
- |
- final AtomicInteger open = new AtomicInteger(); |
- |
- /** |
- * Poll interval, in microseconds. If the platform doesn't support |
- * poll interrupt - it'll take this time to stop the poller. |
- * |
- */ |
- private int pollTime = 5 * 1000000; |
- |
- private HostInfoLoader hostInfoLoader; |
- |
- final RawDataHandler rawDataHandler = null; |
- |
- // TODO: do we need this here ? |
- private final Map<String, HostInfo> hosts = new HashMap<>(); |
- |
- private String certFile; |
- private String keyFile; |
- |
- private byte[] spdyNPN; |
- |
- private byte[] ticketKey; |
- |
- // For resolving DNS ( i.e. connect ), callbacks |
- private ExecutorService threadPool; |
- |
- // Separate executor for connect/handshakes |
- final ExecutorService connectExecutor; |
- |
- final boolean debugSSL = false; |
- private boolean debugPoll = false; |
- |
- private boolean deferAccept = false; |
- |
- private int backlog = 100; |
- |
- private boolean useSendfile; |
- |
- private int sslProtocol = SSL.SSL_PROTOCOL_TLSV1 | SSL.SSL_PROTOCOL_TLSV1_1 | SSL.SSL_PROTOCOL_TLSV1_2; |
- |
- /** |
- * Max time spent in a callback ( will be longer for blocking ) |
- */ |
- final AtomicLong maxHandlerTime = new AtomicLong(); |
- final AtomicLong totalHandlerTime = new AtomicLong(); |
- final AtomicLong handlerCount = new AtomicLong(); |
- |
- /** |
- * Total connections handled ( accepted or connected ). |
- */ |
- private final AtomicInteger connectionsCount = new AtomicInteger(); |
- |
- |
- public AprSocketContext() { |
- connectExecutor =new ThreadPoolExecutor(0, 64, 5, TimeUnit.SECONDS, |
- new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() { |
- @Override |
- public void rejectedExecution(Runnable r, |
- java.util.concurrent.ThreadPoolExecutor executor) { |
- AprSocket s = (AprSocket) r; |
- log.severe("Rejecting " + s); |
- s.reset(); |
- } |
- }); |
- contextId = contextNumber.incrementAndGet(); |
- } |
- |
- /** |
- * Poller thread count. |
- */ |
- private int pollerThreadCount = 4; |
- public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } |
- public int getPollerThreadCount() { return pollerThreadCount; } |
- |
- // to test the limits - default should be lower |
- private int maxConnections = 64 * 1024; |
- public void setMaxconnections(int maxCon) { |
- this.maxConnections = maxCon; |
- } |
- |
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } |
- public int getBacklog() { return backlog; } |
- |
- /** |
- * Defer accept. |
- */ |
- public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; } |
- public boolean getDeferAccept() { return deferAccept; } |
- |
- /** |
- * For client: |
- * - ClientHello will include the npn extension ( the ID == 0x3374) |
- * - if ServerHello includes a list of protocols - select one |
- * - send it after ChangeCipherSpec and before Finish |
- * |
- * For server: |
- * - if ClientHello includes the npn extension |
- * -- will send this string as list of supported protocols in ServerHello |
- * - read the selection before Finish. |
- * @param npn |
- */ |
- public void setNpn(String npn) { |
- byte[] data = npn.getBytes(); |
- byte[] npnB = new byte[data.length + 2]; |
- |
- System.arraycopy(data, 0, npnB, 1, data.length); |
- npnB[0] = (byte) data.length; |
- npnB[npnB.length - 1] = 0; |
- spdyNPN = npnB; |
- |
- } |
- |
- public void setNpn(byte[] data) { |
- spdyNPN = data; |
- } |
- |
- public void setHostLoader(HostInfoLoader handler) { |
- this.hostInfoLoader = handler; |
- } |
- |
- public boolean isServer() { |
- return acceptor != null; |
- } |
- |
- protected Executor getExecutor() { |
- if (threadPool == null) { |
- threadPool = Executors.newCachedThreadPool(new ThreadFactory( ) { |
- @Override |
- public Thread newThread(Runnable r) { |
- Thread t = new Thread(r, "AprThread-" + contextId + "-" + |
- threadNumber.incrementAndGet()); |
- t.setDaemon(true); |
- return t; |
- } |
- }); |
- } |
- return threadPool; |
- } |
- |
- /** |
- * All accepted/connected sockets will start handshake automatically. |
- */ |
- public AprSocketContext setTls() { |
- this.sslMode = true; |
- return this; |
- } |
- |
- public void setTcpNoDelay(boolean b) { |
- tcpNoDelay = b; |
- } |
- |
- public void setSslProtocol(String protocol) { |
- protocol = protocol.trim(); |
- if ("SSLv2".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_SSLV2; |
- } else if ("SSLv3".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_SSLV3; |
- } else if ("TLSv1".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1; |
- } else if ("TLSv1.1".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1_1; |
- } else if ("TLSv1.2".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1_2; |
- } else if ("all".equalsIgnoreCase(protocol)) { |
- sslProtocol = SSL.SSL_PROTOCOL_ALL; |
- } |
- } |
- |
- public void setTicketKey(byte[] key48Bytes) { |
- if(key48Bytes.length != 48) { |
- throw new RuntimeException("Key must be 48 bytes"); |
- } |
- this.ticketKey = key48Bytes; |
- } |
- |
- public void customVerification(TlsCertVerifier verifier) { |
- tlsCertVerifier = verifier; |
- } |
- |
- // TODO: should have a separate method for switching to tls later. |
- /** |
- * Set certificate, will also enable TLS mode. |
- */ |
- public AprSocketContext setKeys(String certPemFile, String keyDerFile) { |
- this.sslMode = true; |
- setTls(); |
- certFile = certPemFile; |
- keyFile = keyDerFile; |
- return this; |
- } |
- |
- /** |
- * SSL cipher suite. |
- */ |
- private String SSLCipherSuite = "ALL"; |
- public String getSSLCipherSuite() { return SSLCipherSuite; } |
- public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } |
- |
- /** |
- * Override or use hostInfoLoader to implement persistent/memcache storage. |
- */ |
- public HostInfo getHostInfo(String host, int port, boolean ssl) { |
- if (hostInfoLoader != null) { |
- return hostInfoLoader.getHostInfo(host, port, ssl); |
- } |
- // Use local cache |
- String key = host + ":" + port; |
- HostInfo pi = hosts.get(key); |
- if (pi != null) { |
- return pi; |
- } |
- pi = new HostInfo(host, port, ssl); |
- hosts.put(key, pi); |
- return pi; |
- } |
- |
- protected void rawData(AprSocket ch, boolean inp, byte[] data, int pos, |
- int len, int requested, boolean closed) { |
- if (rawDataHandler != null) { |
- rawDataHandler.rawData(ch, inp, data, pos, len, requested, closed); |
- } |
- } |
- |
- public void listen(final int port) throws IOException { |
- if (acceptor != null) { |
- throw new IOException("Already accepting on " + acceptor.port); |
- } |
- if (sslMode && certFile == null) { |
- throw new IOException("Missing certificates for server"); |
- } |
- if (sslMode || !nonBlockingAccept) { |
- acceptorDispatch = new AcceptorDispatchThread(); |
- acceptorDispatch.setName("AprAcceptorDispatch-" + port); |
- acceptorDispatch.start(); |
- } |
- |
- acceptor = new AcceptorThread(port); |
- acceptor.prepare(); |
- acceptor.setName("AprAcceptor-" + port); |
- acceptor.start(); |
- |
- |
- } |
- |
- /** |
- * Get a socket for connectiong to host:port. |
- */ |
- public AprSocket socket(String host, int port, boolean ssl) { |
- HostInfo hi = getHostInfo(host, port, ssl); |
- return socket(hi); |
- } |
- |
- public AprSocket socket(HostInfo hi) { |
- AprSocket sock = newSocket(this); |
- sock.setHost(hi); |
- return sock; |
- } |
- |
- public AprSocket socket(long socket) { |
- AprSocket sock = newSocket(this); |
- // Tomcat doesn't set this |
- SSLExt.sslSetMode(socket, SSLExt.SSL_MODE_ENABLE_PARTIAL_WRITE | |
- SSLExt.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); |
- sock.setStatus(AprSocket.ACCEPTED); |
- sock.socket = socket; |
- return sock; |
- } |
- |
- |
- void destroySocket(AprSocket socket) { |
- // TODO: does it need to be done in io thread ? |
- synchronized (socket) { |
- if (socket.socket != 0) { |
- long s = socket.socket; |
- socket.socket = 0; |
- log.info("DESTROY: " + Long.toHexString(s)); |
- Socket.destroy(s); |
- } |
- } |
- } |
- |
- protected void connectBlocking(AprSocket apr) throws IOException { |
- try { |
- if (!running) { |
- throw new IOException("Stopped"); |
- } |
- HostInfo hi = apr.getHost(); |
- |
- long clientSockP; |
- synchronized (pollers) { |
- long socketpool = Pool.create(getRootPool()); |
- |
- int family = Socket.APR_INET; |
- |
- clientSockP = Socket.create(family, |
- Socket.SOCK_STREAM, |
- Socket.APR_PROTO_TCP, socketpool); // or rootPool ? |
- } |
- Socket.timeoutSet(clientSockP, connectTimeout * 1000); |
- if (OS.IS_UNIX) { |
- Socket.optSet(clientSockP, Socket.APR_SO_REUSEADDR, 1); |
- } |
- |
- Socket.optSet(clientSockP, Socket.APR_SO_KEEPALIVE, 1); |
- |
- // Blocking |
- // TODO: use socket pool |
- // TODO: cache it ( and TTL ) in hi |
- long inetAddress = Address.info(hi.host, Socket.APR_INET, |
- hi.port, 0, rootPool); |
- // this may take a long time - stop/destroy must wait |
- // at least connect timeout |
- int rc = Socket.connect(clientSockP, inetAddress); |
- |
- if (rc != 0) { |
- synchronized (pollers) { |
- Socket.close(clientSockP); |
- Socket.destroy(clientSockP); |
- } |
- /////Pool.destroy(socketpool); |
- throw new IOException("Socket.connect(): " + rc + " " + Error.strerror(rc) + " " + connectTimeout); |
- } |
- if (!running) { |
- throw new IOException("Stopped"); |
- } |
- |
- connectionsCount.incrementAndGet(); |
- if (tcpNoDelay) { |
- Socket.optSet(clientSockP, Socket.APR_TCP_NODELAY, 1); |
- } |
- |
- Socket.timeoutSet(clientSockP, defaultTimeout * 1000); |
- |
- apr.socket = clientSockP; |
- |
- apr.afterConnect(); |
- } catch (IOException e) { |
- apr.reset(); |
- throw e; |
- } catch (Throwable e) { |
- apr.reset(); |
- e.printStackTrace(); |
- throw new IOException(e); |
- } |
- } |
- |
- AprSocket newSocket(AprSocketContext context) { |
- return new AprSocket(context); |
- } |
- |
- /** |
- * To clean the pools - we could track if all channels are |
- * closed, but this seems simpler and safer. |
- */ |
- @Override |
- protected void finalize() throws Throwable { |
- if (rootPool != 0) { |
- log.warning(this + " GC without stop()"); |
- try { |
- stop(); |
- } catch (Exception e) { |
- //TODO Auto-generated catch block |
- e.printStackTrace(); |
- } |
- } |
- super.finalize(); |
- } |
- |
- |
- public void stop() { |
- synchronized (pollers) { |
- if (!running) { |
- return; |
- } |
- running = false; |
- } |
- |
- if (rootPool != 0) { |
- if (acceptor != null) { |
- try { |
- acceptor.unblock(); |
- acceptor.join(); |
- } catch (InterruptedException e) { |
- e.printStackTrace(); |
- } |
- } |
- if (acceptorDispatch != null) { |
- acceptedQueue.add(END); |
- try { |
- acceptorDispatch.join(); |
- } catch (InterruptedException e) { |
- e.printStackTrace(); |
- } |
- } |
- if (threadPool != null) { |
- threadPool.shutdownNow(); |
- } |
- |
- log.info("Stopping pollers " + contextId); |
- |
- while (true) { |
- AprPoller a; |
- synchronized (pollers) { |
- if (pollers.size() == 0) { |
- break; |
- } |
- a = pollers.remove(0); |
- } |
- a.interruptPoll(); |
- try { |
- a.join(); |
- log.info("Poller " + a.id + " done "); |
- } catch (InterruptedException e) { |
- e.printStackTrace(); |
- } |
- } |
- } |
- } |
- |
- |
- // Called when the last poller has been destroyed. |
- void destroy() { |
- synchronized (pollers) { |
- if (pollers.size() != 0) { |
- return; |
- } |
- |
- if (rootPool == 0) { |
- return; |
- } |
- log.info("Destroy root pool " + rootPool); |
- //Pool.destroy(rootPool); |
- //rootPool = 0; |
- } |
- } |
- |
- private static IOException noApr; |
- static { |
- |
- try { |
- Library.initialize(null); |
- SSL.initialize(null); |
- } catch (Exception e) { |
- noApr = new IOException("APR not present", e); |
- } |
- |
- } |
- |
- private long getRootPool() throws IOException { |
- if (rootPool == 0) { |
- if (noApr != null) { |
- throw noApr; |
- } |
- // Create the root APR memory pool |
- rootPool = Pool.create(0); |
- |
- // Adjust poller sizes |
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (maxConnections > 1024)) { |
- // The maximum per poller to get reasonable performance is 1024 |
- pollerThreadCount = maxConnections / 1024; |
- // Adjust poller size so that it won't reach the limit |
- maxConnections = maxConnections - (maxConnections % 1024); |
- } |
- } |
- return rootPool; |
- } |
- |
- long getSslCtx() throws Exception { |
- if (sslCtx == 0) { |
- synchronized (AprSocketContext.class) { |
- if (sslCtx == 0) { |
- boolean serverMode = acceptor != null; |
- sslCtx = SSLContext.make(getRootPool(), |
- sslProtocol, |
- serverMode ? SSL.SSL_MODE_SERVER : SSL.SSL_MODE_CLIENT); |
- |
- |
- // SSL.SSL_OP_NO_SSLv3 |
- int opts = SSL.SSL_OP_NO_SSLv2 | |
- SSL.SSL_OP_SINGLE_DH_USE; |
- |
- if (!USE_TICKETS || serverMode && ticketKey == null) { |
- opts |= SSL.SSL_OP_NO_TICKET; |
- } |
- |
- SSLContext.setOptions(sslCtx, opts); |
- // Set revocation |
- // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); |
- |
- // Client certificate verification - maybe make it option |
- try { |
- SSLContext.setCipherSuite(sslCtx, SSLCipherSuite); |
- |
- |
- if (serverMode) { |
- if (ticketKey != null) { |
- //SSLExt.setTicketKeys(sslCtx, ticketKey, ticketKey.length); |
- } |
- if (certFile != null) { |
- boolean rc = SSLContext.setCertificate(sslCtx, |
- certFile, |
- keyFile, null, SSL.SSL_AIDX_DSA); |
- if (!rc) { |
- throw new IOException("Can't set keys"); |
- } |
- } |
- SSLContext.setVerify(sslCtx, SSL.SSL_CVERIFY_NONE, 10); |
- |
- if (spdyNPN != null) { |
- SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); |
- } |
- } else { |
- if (tlsCertVerifier != null) { |
- // NONE ? |
- SSLContext.setVerify(sslCtx, |
- SSL.SSL_CVERIFY_NONE, 10); |
- } else { |
- SSLContext.setCACertificate(sslCtx, |
- "/etc/ssl/certs/ca-certificates.crt", |
- "/etc/ssl/certs"); |
- SSLContext.setVerify(sslCtx, |
- SSL.SSL_CVERIFY_REQUIRE, 10); |
- } |
- |
- if (spdyNPN != null) { |
- SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length); |
- } |
- } |
- } catch (IOException e) { |
- throw e; |
- } catch (Exception e) { |
- throw new IOException(e); |
- } |
- } |
- // TODO: try release buffers |
- } |
- } |
- return sslCtx; |
- } |
- |
- void findPollerAndAdd(AprSocket ch) throws IOException { |
- if (ch.poller != null) { |
- ch.poller.requestUpdate(ch); |
- return; |
- } |
- assignPoller(ch); |
- } |
- |
- void assignPoller(AprSocket ch) throws IOException { |
- AprPoller target = null; |
- synchronized (pollers) { |
- // Make sure we have min number of pollers |
- int needPollers = pollerThreadCount - pollers.size(); |
- if (needPollers > 0) { |
- for (int i = needPollers; i > 0; i--) { |
- pollers.add(allocatePoller()); |
- } |
- } |
- int max = 0; |
- for (AprPoller poller: pollers) { |
- int rem = poller.remaining(); |
- if (rem > max) { |
- target = poller; |
- max = rem; |
- } |
- } |
- } |
- if (target != null && target.add(ch)) { |
- return; |
- } |
- |
- // can't be added - add a new poller |
- synchronized (pollers) { |
- AprPoller poller = allocatePoller(); |
- poller.add(ch); |
- pollers.add(poller); |
- } |
- } |
- |
- /** |
- * Called on each accepted socket (for servers) or after connection (client) |
- * after handshake. |
- */ |
- protected void onSocket(@SuppressWarnings("unused") AprSocket s) { |
- // Defaults to NO-OP. Parameter is used by sub-classes. |
- } |
- |
- private class AcceptorThread extends Thread { |
- private final int port; |
- private long serverSockPool = 0; |
- private long serverSock = 0; |
- |
- private long inetAddress; |
- |
- AcceptorThread(int port) { |
- this.port = port; |
- setDaemon(true); |
- } |
- |
- void prepare() throws IOException { |
- try { |
- // Create the pool for the server socket |
- serverSockPool = Pool.create(getRootPool()); |
- |
- int family = Socket.APR_INET; |
- inetAddress = |
- Address.info(null, family, port, 0, serverSockPool); |
- |
- // Create the APR server socket |
- serverSock = Socket.create(family, |
- Socket.SOCK_STREAM, |
- Socket.APR_PROTO_TCP, serverSockPool); |
- |
- if (OS.IS_UNIX) { |
- Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); |
- } |
- // Deal with the firewalls that tend to drop the inactive sockets |
- Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1); |
- // Bind the server socket |
- int ret = Socket.bind(serverSock, inetAddress); |
- if (ret != 0) { |
- throw new IOException("Socket.bind " + ret + " " + |
- Error.strerror(ret) + " port=" + port); |
- } |
- // Start listening on the server socket |
- ret = Socket.listen(serverSock, backlog ); |
- if (ret != 0) { |
- throw new IOException("endpoint.init.listen" |
- + ret + " " + Error.strerror(ret)); |
- } |
- if (OS.IS_WIN32 || OS.IS_WIN64) { |
- // On Windows set the reuseaddr flag after the bind/listen |
- Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1); |
- } |
- |
- // Sendfile usage on systems which don't support it cause major problems |
- if (useSendfile && !Library.APR_HAS_SENDFILE) { |
- useSendfile = false; |
- } |
- |
- // Delay accepting of new connections until data is available |
- // Only Linux kernels 2.4 + have that implemented |
- // on other platforms this call is noop and will return APR_ENOTIMPL. |
- if (deferAccept) { |
- if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) { |
- deferAccept = false; |
- } |
- } |
- } catch (Throwable t) { |
- throw new IOException(t); |
- } |
- } |
- |
- void unblock() { |
- try (java.net.Socket sock = new java.net.Socket()) { |
- // Easiest ( maybe safest ) way to interrupt accept |
- // we could have it in non-blocking mode, etc |
- sock.connect(new InetSocketAddress("127.0.0.1", port)); |
- } catch (Exception ex) { |
- // ignore - the acceptor may have shut down by itself. |
- } |
- } |
- |
- @Override |
- public void run() { |
- while (running) { |
- try { |
- // each socket has a pool. |
- final AprSocket ch = newSocket(AprSocketContext.this); |
- ch.setStatus(AprSocket.ACCEPTED); |
- |
- ch.socket = Socket.accept(serverSock); |
- if (!running) { |
- break; |
- } |
- connectionsCount.incrementAndGet(); |
- if (connectionsCount.get() % 1000 == 0) { |
- System.err.println("Accepted: " + connectionsCount.get()); |
- } |
- |
- if (nonBlockingAccept && !sslMode) { |
- ch.setStatus(AprSocket.CONNECTED); |
- // TODO: SSL really needs a thread. |
- onSocket(ch); |
- } else { |
- acceptedQueue.add(ch); |
- } |
- } catch (Throwable e) { |
- e.printStackTrace(); |
- } |
- } |
- Socket.close(serverSock); |
- } |
- } |
- |
- private class AcceptorDispatchThread extends Thread { |
- |
- AcceptorDispatchThread() { |
- setDaemon(true); |
- } |
- |
- @Override |
- public void run() { |
- while(running) { |
- try { |
- AprSocket ch = acceptedQueue.take(); |
- if (ch == END) { |
- return; |
- } |
- connectExecutor.execute(ch); |
- } catch (InterruptedException e) { |
- } |
- } |
- } |
- } |
- |
- /** |
- * Create the poller. With some versions of APR, the maximum poller size will |
- * be 62 (recompiling APR is necessary to remove this limitation). |
- * @throws IOException |
- */ |
- AprPoller allocatePoller() throws IOException { |
- long pool = Pool.create(getRootPool()); |
- int size = maxConnections / pollerThreadCount; |
- |
- long serverPollset = allocatePoller(size, pool); |
- |
- if (serverPollset == 0 && size > 1024) { |
- log.severe("Falling back to 1024-sized poll, won't scale"); |
- size = 1024; |
- serverPollset = allocatePoller(size, pool); |
- } |
- if (serverPollset == 0) { |
- log.severe("Falling back to 62-sized poll, won't scale"); |
- size = 62; |
- serverPollset = allocatePoller(size, pool); |
- } |
- |
- AprPoller res = new AprPoller(); |
- res.pool = pool; |
- res.serverPollset = serverPollset; |
- res.desc = new long[size * 2]; |
- res.size = size; |
- res.id = contextId++; |
- res.setDaemon(true); |
- res.setName("AprPoller-" + res.id); |
- res.start(); |
- if (debugPoll && !sizeLogged) { |
- sizeLogged = true; |
- log.info("Poller size " + (res.desc.length / 2)); |
- } |
- return res; |
- } |
- |
- // Removed the 'thread safe' updates for now, to simplify the code |
- // last test shows a small improvement, can switch later. |
- private static boolean sizeLogged = false; |
- |
- protected long allocatePoller(int size, long pool) { |
- int flag = threadSafe ? Poll.APR_POLLSET_THREADSAFE: 0; |
- for (int i = 0; i < 2; i++) { |
- try { |
- // timeout must be -1 - or ttl will take effect, strange results. |
- return Poll.create(size, pool, flag, -1); |
- } catch (Error e) { |
- e.printStackTrace(); |
- if (Status.APR_STATUS_IS_EINVAL(e.getError())) { |
- log.info(" endpoint.poll.limitedpollsize " + size); |
- return 0; |
- } else if (Status.APR_STATUS_IS_ENOTIMPL(e.getError())) { |
- // thread safe not supported |
- log.severe("THREAD SAFE NOT SUPPORTED" + e); |
- threadSafe = false; |
- // try again without the flags |
- continue; |
- } else { |
- log.severe("endpoint.poll.initfail" + e); |
- return 0; |
- } |
- } |
- } |
- log.severe("Unexpected ENOTIMPL with flag==0"); |
- return 0; |
- } |
- |
- class AprPoller extends Thread { |
- |
- private int id; |
- private int size; |
- private long serverPollset = 0; |
- private long pool = 0; |
- private long[] desc; |
- |
- private long lastPoll; |
- private long lastPollTime; |
- private final AtomicBoolean inPoll = new AtomicBoolean(false); |
- |
- // Should be replaced with socket data. |
- // used only to lookup by socket |
- private final Map<Long, AprSocket> channels = new HashMap<>(); |
- |
- // Active + pending, must be < desc.length / 2 |
- // The channel will also have poller=this when active or pending |
- // How many sockets have poller == this |
- private final AtomicInteger keepAliveCount = new AtomicInteger(); |
- // Tracks desc, how many sockets are actively polled |
- private final AtomicInteger polledCount = new AtomicInteger(); |
- |
- private final AtomicInteger pollCount = new AtomicInteger(); |
- |
- private final List<AprSocket> updates = new ArrayList<>(); |
- |
- @Override |
- public void run() { |
- if (!running) { |
- return; |
- } |
- if (debugPoll) { |
- log.info("Starting poller " + id + " " + (isServer() ? "SRV ": "CLI ")); |
- } |
- long t0 = System.currentTimeMillis(); |
- while (running) { |
- try { |
- updates(); |
- |
- // Pool for the specified interval. Remove signaled sockets |
- synchronized (this) { |
- inPoll.set(true); |
- } |
- // if updates are added after updates and poll - interrupt will have still |
- // work |
- |
- int rv = Poll.poll(serverPollset, pollTime, desc, true); |
- synchronized (this) { |
- inPoll.set(false); |
- if (!running) { |
- break; |
- } |
- } |
- |
- pollCount.incrementAndGet(); |
- lastPoll = System.currentTimeMillis(); |
- lastPollTime = lastPoll - t0; |
- |
- if (rv > 0) { |
- if (debugPoll) { |
- log.info(" Poll() id=" + id + " rv=" + rv + " keepAliveCount=" + keepAliveCount + |
- " polled = " + polledCount.get() |
- + " time=" + lastPollTime); |
- } |
- polledCount.addAndGet(-rv); |
- for (int pollIdx = 0; pollIdx < rv; pollIdx++) { |
- long sock = desc[pollIdx * 2 + 1]; |
- AprSocket ch; |
- boolean blocking = false; |
- |
- synchronized (channels) { |
- ch = channels.get(Long.valueOf(sock)); |
- if (ch != null) { |
- blocking = ch.isBlocking(); |
- } else { |
- log.severe("Polled socket not found !!!!!" + Long.toHexString(sock)); |
- // TODO: destroy/close the raw socket |
- continue; |
- } |
- } |
- // was removed from polling |
- ch.clearStatus(AprSocket.POLL); |
- |
- // We just removed it ( see last param to poll()). |
- // Check for failed sockets and hand this socket off to a worker |
- long mask = desc[pollIdx * 2]; |
- |
- boolean err = ((mask & Poll.APR_POLLERR) == Poll.APR_POLLERR); |
- boolean nval = ((mask & Poll.APR_POLLNVAL) != 0); |
- if (err || nval) { |
- System.err.println("ERR " + err + " NVAL " + nval); |
- } |
- |
- boolean out = (mask & Poll.APR_POLLOUT) == Poll.APR_POLLOUT; |
- boolean in = (mask & Poll.APR_POLLIN) == Poll.APR_POLLIN; |
- if (debugPoll) { |
- log.info(" Poll channel: " + Long.toHexString(mask) + |
- (out ? " OUT" :"") + |
- (in ? " IN": "") + |
- (err ? " ERR" : "") + |
- " Ch: " + ch); |
- } |
- |
- // will be set again in process(), if all read/write is done |
- ch.clearStatus(AprSocket.POLLOUT); |
- ch.clearStatus(AprSocket.POLLIN); |
- |
- // try to send if needed |
- if (blocking) { |
- synchronized (ch) { |
- ch.notifyAll(); |
- } |
- getExecutor().execute(ch); |
- } else { |
- ((AprSocketContext.NonBlockingPollHandler) ch.handler).process(ch, in, out, false); |
- |
- // Update polling for the channel (in IO thread, safe) |
- updateIOThread(ch); |
- } |
- } |
- } else if (rv < 0) { |
- int errn = -rv; |
- if (errn == Status.TIMEUP) { |
- // to or interrupt |
-// if (debugPoll) { |
-// log.info(" Poll() timeup" + " keepAliveCount=" + keepAliveCount + |
-// " polled = " + polledCount.get() |
-// + " time=" + lastPollTime); |
-// } |
- } else if (errn == Status.EINTR) { |
- // interrupt - no need to log |
- } else { |
- if (debugPoll) { |
- log.info(" Poll() rv=" + rv + " keepAliveCount=" + keepAliveCount + |
- " polled = " + polledCount.get() |
- + " time=" + lastPollTime); |
- } |
- /* Any non timeup or interrupted error is critical */ |
- if (errn > Status.APR_OS_START_USERERR) { |
- errn -= Status.APR_OS_START_USERERR; |
- } |
- log.severe("endpoint.poll.fail " + errn + " " + Error.strerror(errn)); |
- // Handle poll critical failure |
- synchronized (this) { |
- destroyPoller(); // will close all sockets |
- } |
- continue; |
- } |
- } |
- // TODO: timeouts |
- } catch (Throwable t) { |
- log.log(Level.SEVERE, "endpoint.poll.error", t); |
- } |
- |
- } |
- if (!running) { |
- destroyPoller(); |
- } |
- } |
- |
- /** |
- * Destroy the poller. |
- */ |
- protected void destroyPoller() { |
- synchronized (pollers) { |
- pollers.remove(this); |
- } |
- log.info("Poller stopped after cnt=" + |
- pollCount.get() + |
- " sockets=" + channels.size() + |
- " lastPoll=" + lastPoll); |
- |
- // Close all sockets |
- synchronized (this) { |
- if (serverPollset == 0) { |
- return; |
- } |
- |
-// for (AprSocket ch: channels.values()) { |
-// ch.poller = null; |
-// ch.reset(); |
-// } |
- keepAliveCount.set(0); |
- log.warning("Destroy pollset"); |
- //serverPollset = 0; |
- } |
- Pool.destroy(pool); |
- pool = 0; |
- synchronized (pollers) { |
- // Now we can destroy the root pool |
- if (pollers.size() == 0 && !running) { |
- log.info("Destroy server context"); |
-// AprSocketContext.this.destroy(); |
- } |
- } |
- } |
- |
- /** |
- * Called only in poller thread, only used if not thread safe |
- * @throws IOException |
- */ |
- protected void updates() throws IOException { |
- synchronized (this) { |
- for (AprSocket up: updates) { |
- updateIOThread(up); |
- } |
- updates.clear(); |
- } |
- } |
- |
- void interruptPoll() { |
- try { |
- int rc = Status.APR_SUCCESS; |
- synchronized (this) { |
- if (serverPollset != 0) { |
- rc = Poll.interrupt(serverPollset); |
- } else { |
- log.severe("Interrupt with closed pollset"); |
- } |
- } |
- if (rc != Status.APR_SUCCESS) { |
- log.severe("Failed interrupt and not thread safe"); |
- } |
- } catch (Throwable t) { |
- t.printStackTrace(); |
- if (pollTime > FALLBACK_POLL_TIME) { |
- pollTime = FALLBACK_POLL_TIME; |
- } |
- } |
- } |
- |
- |
- int remaining() { |
- synchronized (channels) { |
- return (desc.length - channels.size() * 2); |
- } |
- } |
- |
- |
- |
- /** |
- * Called from any thread, return true if we could add it |
- * to pending. |
- */ |
- boolean add(AprSocket ch) throws IOException { |
- synchronized (this) { |
- if (!running) { |
- return false; |
- } |
- if (keepAliveCount.get() >= size) { |
- return false; |
- } |
- keepAliveCount.incrementAndGet(); |
- ch.poller = this; |
- } |
- |
- requestUpdate(ch); |
- |
- return true; |
- } |
- |
- /** |
- * May be called outside of IOThread. |
- */ |
- protected void requestUpdate(AprSocket ch) throws IOException { |
- synchronized (this) { |
- if (!running) { |
- return; |
- } |
- } |
- if (isPollerThread()) { |
- updateIOThread(ch); |
- } else { |
- synchronized (this) { |
- if (!updates.contains(ch)) { |
- updates.add(ch); |
- } |
- interruptPoll(); |
- } |
- if (debugPoll) { |
- log.info("Poll: requestUpdate " + id + " " + ch); |
- } |
- } |
- } |
- |
- private void updateIOThread(AprSocket ch) throws IOException { |
- if (!running || ch.socket == 0) { |
- return; |
- } |
- // called from IO thread, either in 'updates' or after |
- // poll. |
- //synchronized (ch) |
- boolean polling = ch.checkPreConnect(AprSocket.POLL); |
- |
- int requested = ch.requestedPolling(); |
- if (requested == 0) { |
- if (polling) { |
- removeSafe(ch); |
- } |
- if (ch.isClosed()) { |
- synchronized (channels) { |
- ch.poller = null; |
- channels.remove(Long.valueOf(ch.socket)); |
- } |
- keepAliveCount.decrementAndGet(); |
- ch.reset(); |
- } |
- } else { |
- if (polling) { |
- removeSafe(ch); |
- } |
- // will close if error |
- pollAdd(ch, requested); |
- } |
- if (debugPoll) { |
- log.info("Poll: updated=" + id + " " + ch); |
- } |
- } |
- |
- /** |
- * Called only from IO thread |
- */ |
- private void pollAdd(AprSocket up, int req) throws IOException { |
- boolean failed = false; |
- int rv; |
- synchronized (channels) { |
- if (up.isClosed()) { |
- return; |
- } |
- rv = Poll.add(serverPollset, up.socket, req); |
- if (rv != Status.APR_SUCCESS) { |
- up.poller = null; |
- keepAliveCount.decrementAndGet(); |
- failed = true; |
- } else { |
- polledCount.incrementAndGet(); |
- channels.put(Long.valueOf(up.socket), up); |
- up.setStatus(AprSocket.POLL); |
- } |
- } |
- if (failed) { |
- up.reset(); |
- throw new IOException("poll add error " + rv + " " + up + " " + Error.strerror(rv)); |
- } |
- } |
- |
- /** |
- * Called only from IO thread. Remove from Poll and channels, |
- * set POLL bit to false. |
- */ |
- private void removeSafe(AprSocket up) { |
- int rv = Status.APR_EGENERAL; |
- if (running && serverPollset != 0 && up.socket != 0 |
- && !up.isClosed()) { |
- rv = Poll.remove(serverPollset, up.socket); |
- } |
- up.clearStatus(AprSocket.POLL); |
- |
- if (rv != Status.APR_SUCCESS) { |
- log.severe("poll remove error " + Error.strerror(rv) + " " + up); |
- } else { |
- polledCount.decrementAndGet(); |
- } |
- } |
- |
- |
- public boolean isPollerThread() { |
- return Thread.currentThread() == this; |
- } |
- |
- } |
- |
- /** |
- * Callback for poll events, will be invoked in a thread pool. |
- * |
- */ |
- public static interface BlockingPollHandler { |
- |
- /** |
- * Called when the socket has been polled for in, out or closed. |
- * |
- * |
- */ |
- public void process(AprSocket ch, boolean in, boolean out, boolean close); |
- |
- |
- /** |
- * Called just before the socket is destroyed |
- */ |
- public void closed(AprSocket ch); |
- } |
- |
- /** |
- * Additional callbacks for non-blocking. |
- * This can be much faster - but it's harder to code, should be used only |
- * for low-level protocol implementation, proxies, etc. |
- * |
- * The model is restricted in many ways to avoid complexity and bugs: |
- * |
- * - read can only happen in the IO thread associated with the poller |
- * - user doesn't control poll interest - it is set automatically based |
- * on read/write results |
- * - it is only possible to suspend read, for TCP flow control - also |
- * only from the IO thread. Resume can happen from any thread. |
- * - it is also possible to call write() from any thread |
- */ |
- public static interface NonBlockingPollHandler extends BlockingPollHandler { |
- |
- /** |
- * Called after connection is established, in a thread pool. |
- * Process will be called next. |
- */ |
- public void connected(AprSocket ch); |
- |
- /** |
- * Before close, if an exception happens. |
- */ |
- public void error(AprSocket ch, Throwable t); |
- } |
- |
-} |