Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(385)

Unified Diff: java/src/org/apache/tomcat/jni/socket/AprSocketContext.java

Issue 2842333002: Updated netty-tcnative to version 2.0.0.Final (Closed)
Patch Set: Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: java/src/org/apache/tomcat/jni/socket/AprSocketContext.java
diff --git a/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java b/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java
deleted file mode 100644
index 03036c9d23302465cf198163cd48156363edea37..0000000000000000000000000000000000000000
--- a/java/src/org/apache/tomcat/jni/socket/AprSocketContext.java
+++ /dev/null
@@ -1,1352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.jni.socket;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.tomcat.jni.Address;
-import org.apache.tomcat.jni.Error;
-import org.apache.tomcat.jni.Library;
-import org.apache.tomcat.jni.OS;
-import org.apache.tomcat.jni.Poll;
-import org.apache.tomcat.jni.Pool;
-import org.apache.tomcat.jni.SSL;
-import org.apache.tomcat.jni.SSLContext;
-import org.apache.tomcat.jni.SSLExt;
-import org.apache.tomcat.jni.Socket;
-import org.apache.tomcat.jni.Status;
-
-public class AprSocketContext {
- /**
- * Called when a chunk of data is sent or received. This is very low
- * level, used mostly for debugging or stats.
- */
- public static interface RawDataHandler {
- public void rawData(AprSocket ch, boolean input, byte[] data, int pos,
- int len, int requested, boolean closed);
- }
-
- /**
- * Called in SSL mode after the handshake is completed.
- *
- * @see AprSocketContext#customVerification(TlsCertVerifier)
- */
- public static interface TlsCertVerifier {
- public void handshakeDone(AprSocket ch);
- }
-
- /**
- * Delegates loading of persistent info about a host - public certs,
- * tickets, config, persistent info etc.
- */
- public static interface HostInfoLoader {
- public HostInfo getHostInfo(String name, int port, boolean ssl);
- }
-
- private static final Logger log = Logger.getLogger("AprSocketCtx");
-
- // If interrupt() or thread-safe poll update are not supported - the
- // poll updates will happen after the poll() timeout.
- // The poll timeout with interrupt/thread safe updates can be much higher/
- private static final int FALLBACK_POLL_TIME = 2000;
-
- // It seems to send the ticket, get server helo / ChangeCipherSpec, but than
- // SSL3_GET_RECORD:decryption failed or bad record mac in s3_pkt.c:480:
- // Either bug in openssl, or some combination of ciphers - needs more debugging.
- // ( this can save a roundtrip and CPU on TLS handshake )
- boolean USE_TICKETS = false;
-
- private final AprSocket END = new AprSocket(this);
-
- private static final AtomicInteger contextNumber = new AtomicInteger();
- private int contextId;
-
- private final AtomicInteger threadNumber = new AtomicInteger();
-
- /**
- * For now - single acceptor thread per connector.
- */
- private AcceptorThread acceptor;
- private AcceptorDispatchThread acceptorDispatch;
-
- // APR/JNI is thread safe
- private boolean threadSafe = true;
-
- /**
- * Pollers.
- */
- private final List<AprPoller> pollers = new ArrayList<>();
-
- // Set on all accepted or connected sockets.
- // TODO: add the other properties
- boolean tcpNoDelay = true;
-
- protected boolean running = true;
-
- protected boolean sslMode;
-
- // onSocket() will be called in accept thread.
- // If false: use executor ( but that may choke the acceptor thread )
- private boolean nonBlockingAccept = false;
-
- private final BlockingQueue<AprSocket> acceptedQueue =
- new LinkedBlockingQueue<>();
-
- /**
- * Root APR memory pool.
- */
- private long rootPool = 0;
-
- /**
- * SSL context.
- */
- private volatile long sslCtx = 0;
-
- TlsCertVerifier tlsCertVerifier;
-
- //
- final int connectTimeout = 20000;
- final int defaultTimeout = 100000;
- // TODO: Use this
- final int keepAliveTimeout = 20000;
-
- final AtomicInteger open = new AtomicInteger();
-
- /**
- * Poll interval, in microseconds. If the platform doesn't support
- * poll interrupt - it'll take this time to stop the poller.
- *
- */
- private int pollTime = 5 * 1000000;
-
- private HostInfoLoader hostInfoLoader;
-
- final RawDataHandler rawDataHandler = null;
-
- // TODO: do we need this here ?
- private final Map<String, HostInfo> hosts = new HashMap<>();
-
- private String certFile;
- private String keyFile;
-
- private byte[] spdyNPN;
-
- private byte[] ticketKey;
-
- // For resolving DNS ( i.e. connect ), callbacks
- private ExecutorService threadPool;
-
- // Separate executor for connect/handshakes
- final ExecutorService connectExecutor;
-
- final boolean debugSSL = false;
- private boolean debugPoll = false;
-
- private boolean deferAccept = false;
-
- private int backlog = 100;
-
- private boolean useSendfile;
-
- private int sslProtocol = SSL.SSL_PROTOCOL_TLSV1 | SSL.SSL_PROTOCOL_TLSV1_1 | SSL.SSL_PROTOCOL_TLSV1_2;
-
- /**
- * Max time spent in a callback ( will be longer for blocking )
- */
- final AtomicLong maxHandlerTime = new AtomicLong();
- final AtomicLong totalHandlerTime = new AtomicLong();
- final AtomicLong handlerCount = new AtomicLong();
-
- /**
- * Total connections handled ( accepted or connected ).
- */
- private final AtomicInteger connectionsCount = new AtomicInteger();
-
-
- public AprSocketContext() {
- connectExecutor =new ThreadPoolExecutor(0, 64, 5, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r,
- java.util.concurrent.ThreadPoolExecutor executor) {
- AprSocket s = (AprSocket) r;
- log.severe("Rejecting " + s);
- s.reset();
- }
- });
- contextId = contextNumber.incrementAndGet();
- }
-
- /**
- * Poller thread count.
- */
- private int pollerThreadCount = 4;
- public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
- public int getPollerThreadCount() { return pollerThreadCount; }
-
- // to test the limits - default should be lower
- private int maxConnections = 64 * 1024;
- public void setMaxconnections(int maxCon) {
- this.maxConnections = maxCon;
- }
-
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
- public int getBacklog() { return backlog; }
-
- /**
- * Defer accept.
- */
- public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; }
- public boolean getDeferAccept() { return deferAccept; }
-
- /**
- * For client:
- * - ClientHello will include the npn extension ( the ID == 0x3374)
- * - if ServerHello includes a list of protocols - select one
- * - send it after ChangeCipherSpec and before Finish
- *
- * For server:
- * - if ClientHello includes the npn extension
- * -- will send this string as list of supported protocols in ServerHello
- * - read the selection before Finish.
- * @param npn
- */
- public void setNpn(String npn) {
- byte[] data = npn.getBytes();
- byte[] npnB = new byte[data.length + 2];
-
- System.arraycopy(data, 0, npnB, 1, data.length);
- npnB[0] = (byte) data.length;
- npnB[npnB.length - 1] = 0;
- spdyNPN = npnB;
-
- }
-
- public void setNpn(byte[] data) {
- spdyNPN = data;
- }
-
- public void setHostLoader(HostInfoLoader handler) {
- this.hostInfoLoader = handler;
- }
-
- public boolean isServer() {
- return acceptor != null;
- }
-
- protected Executor getExecutor() {
- if (threadPool == null) {
- threadPool = Executors.newCachedThreadPool(new ThreadFactory( ) {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "AprThread-" + contextId + "-" +
- threadNumber.incrementAndGet());
- t.setDaemon(true);
- return t;
- }
- });
- }
- return threadPool;
- }
-
- /**
- * All accepted/connected sockets will start handshake automatically.
- */
- public AprSocketContext setTls() {
- this.sslMode = true;
- return this;
- }
-
- public void setTcpNoDelay(boolean b) {
- tcpNoDelay = b;
- }
-
- public void setSslProtocol(String protocol) {
- protocol = protocol.trim();
- if ("SSLv2".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_SSLV2;
- } else if ("SSLv3".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_SSLV3;
- } else if ("TLSv1".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1;
- } else if ("TLSv1.1".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1_1;
- } else if ("TLSv1.2".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_TLSV1_2;
- } else if ("all".equalsIgnoreCase(protocol)) {
- sslProtocol = SSL.SSL_PROTOCOL_ALL;
- }
- }
-
- public void setTicketKey(byte[] key48Bytes) {
- if(key48Bytes.length != 48) {
- throw new RuntimeException("Key must be 48 bytes");
- }
- this.ticketKey = key48Bytes;
- }
-
- public void customVerification(TlsCertVerifier verifier) {
- tlsCertVerifier = verifier;
- }
-
- // TODO: should have a separate method for switching to tls later.
- /**
- * Set certificate, will also enable TLS mode.
- */
- public AprSocketContext setKeys(String certPemFile, String keyDerFile) {
- this.sslMode = true;
- setTls();
- certFile = certPemFile;
- keyFile = keyDerFile;
- return this;
- }
-
- /**
- * SSL cipher suite.
- */
- private String SSLCipherSuite = "ALL";
- public String getSSLCipherSuite() { return SSLCipherSuite; }
- public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
-
- /**
- * Override or use hostInfoLoader to implement persistent/memcache storage.
- */
- public HostInfo getHostInfo(String host, int port, boolean ssl) {
- if (hostInfoLoader != null) {
- return hostInfoLoader.getHostInfo(host, port, ssl);
- }
- // Use local cache
- String key = host + ":" + port;
- HostInfo pi = hosts.get(key);
- if (pi != null) {
- return pi;
- }
- pi = new HostInfo(host, port, ssl);
- hosts.put(key, pi);
- return pi;
- }
-
- protected void rawData(AprSocket ch, boolean inp, byte[] data, int pos,
- int len, int requested, boolean closed) {
- if (rawDataHandler != null) {
- rawDataHandler.rawData(ch, inp, data, pos, len, requested, closed);
- }
- }
-
- public void listen(final int port) throws IOException {
- if (acceptor != null) {
- throw new IOException("Already accepting on " + acceptor.port);
- }
- if (sslMode && certFile == null) {
- throw new IOException("Missing certificates for server");
- }
- if (sslMode || !nonBlockingAccept) {
- acceptorDispatch = new AcceptorDispatchThread();
- acceptorDispatch.setName("AprAcceptorDispatch-" + port);
- acceptorDispatch.start();
- }
-
- acceptor = new AcceptorThread(port);
- acceptor.prepare();
- acceptor.setName("AprAcceptor-" + port);
- acceptor.start();
-
-
- }
-
- /**
- * Get a socket for connectiong to host:port.
- */
- public AprSocket socket(String host, int port, boolean ssl) {
- HostInfo hi = getHostInfo(host, port, ssl);
- return socket(hi);
- }
-
- public AprSocket socket(HostInfo hi) {
- AprSocket sock = newSocket(this);
- sock.setHost(hi);
- return sock;
- }
-
- public AprSocket socket(long socket) {
- AprSocket sock = newSocket(this);
- // Tomcat doesn't set this
- SSLExt.sslSetMode(socket, SSLExt.SSL_MODE_ENABLE_PARTIAL_WRITE |
- SSLExt.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
- sock.setStatus(AprSocket.ACCEPTED);
- sock.socket = socket;
- return sock;
- }
-
-
- void destroySocket(AprSocket socket) {
- // TODO: does it need to be done in io thread ?
- synchronized (socket) {
- if (socket.socket != 0) {
- long s = socket.socket;
- socket.socket = 0;
- log.info("DESTROY: " + Long.toHexString(s));
- Socket.destroy(s);
- }
- }
- }
-
- protected void connectBlocking(AprSocket apr) throws IOException {
- try {
- if (!running) {
- throw new IOException("Stopped");
- }
- HostInfo hi = apr.getHost();
-
- long clientSockP;
- synchronized (pollers) {
- long socketpool = Pool.create(getRootPool());
-
- int family = Socket.APR_INET;
-
- clientSockP = Socket.create(family,
- Socket.SOCK_STREAM,
- Socket.APR_PROTO_TCP, socketpool); // or rootPool ?
- }
- Socket.timeoutSet(clientSockP, connectTimeout * 1000);
- if (OS.IS_UNIX) {
- Socket.optSet(clientSockP, Socket.APR_SO_REUSEADDR, 1);
- }
-
- Socket.optSet(clientSockP, Socket.APR_SO_KEEPALIVE, 1);
-
- // Blocking
- // TODO: use socket pool
- // TODO: cache it ( and TTL ) in hi
- long inetAddress = Address.info(hi.host, Socket.APR_INET,
- hi.port, 0, rootPool);
- // this may take a long time - stop/destroy must wait
- // at least connect timeout
- int rc = Socket.connect(clientSockP, inetAddress);
-
- if (rc != 0) {
- synchronized (pollers) {
- Socket.close(clientSockP);
- Socket.destroy(clientSockP);
- }
- /////Pool.destroy(socketpool);
- throw new IOException("Socket.connect(): " + rc + " " + Error.strerror(rc) + " " + connectTimeout);
- }
- if (!running) {
- throw new IOException("Stopped");
- }
-
- connectionsCount.incrementAndGet();
- if (tcpNoDelay) {
- Socket.optSet(clientSockP, Socket.APR_TCP_NODELAY, 1);
- }
-
- Socket.timeoutSet(clientSockP, defaultTimeout * 1000);
-
- apr.socket = clientSockP;
-
- apr.afterConnect();
- } catch (IOException e) {
- apr.reset();
- throw e;
- } catch (Throwable e) {
- apr.reset();
- e.printStackTrace();
- throw new IOException(e);
- }
- }
-
- AprSocket newSocket(AprSocketContext context) {
- return new AprSocket(context);
- }
-
- /**
- * To clean the pools - we could track if all channels are
- * closed, but this seems simpler and safer.
- */
- @Override
- protected void finalize() throws Throwable {
- if (rootPool != 0) {
- log.warning(this + " GC without stop()");
- try {
- stop();
- } catch (Exception e) {
- //TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- super.finalize();
- }
-
-
- public void stop() {
- synchronized (pollers) {
- if (!running) {
- return;
- }
- running = false;
- }
-
- if (rootPool != 0) {
- if (acceptor != null) {
- try {
- acceptor.unblock();
- acceptor.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if (acceptorDispatch != null) {
- acceptedQueue.add(END);
- try {
- acceptorDispatch.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if (threadPool != null) {
- threadPool.shutdownNow();
- }
-
- log.info("Stopping pollers " + contextId);
-
- while (true) {
- AprPoller a;
- synchronized (pollers) {
- if (pollers.size() == 0) {
- break;
- }
- a = pollers.remove(0);
- }
- a.interruptPoll();
- try {
- a.join();
- log.info("Poller " + a.id + " done ");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
-
- // Called when the last poller has been destroyed.
- void destroy() {
- synchronized (pollers) {
- if (pollers.size() != 0) {
- return;
- }
-
- if (rootPool == 0) {
- return;
- }
- log.info("Destroy root pool " + rootPool);
- //Pool.destroy(rootPool);
- //rootPool = 0;
- }
- }
-
- private static IOException noApr;
- static {
-
- try {
- Library.initialize(null);
- SSL.initialize(null);
- } catch (Exception e) {
- noApr = new IOException("APR not present", e);
- }
-
- }
-
- private long getRootPool() throws IOException {
- if (rootPool == 0) {
- if (noApr != null) {
- throw noApr;
- }
- // Create the root APR memory pool
- rootPool = Pool.create(0);
-
- // Adjust poller sizes
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (maxConnections > 1024)) {
- // The maximum per poller to get reasonable performance is 1024
- pollerThreadCount = maxConnections / 1024;
- // Adjust poller size so that it won't reach the limit
- maxConnections = maxConnections - (maxConnections % 1024);
- }
- }
- return rootPool;
- }
-
- long getSslCtx() throws Exception {
- if (sslCtx == 0) {
- synchronized (AprSocketContext.class) {
- if (sslCtx == 0) {
- boolean serverMode = acceptor != null;
- sslCtx = SSLContext.make(getRootPool(),
- sslProtocol,
- serverMode ? SSL.SSL_MODE_SERVER : SSL.SSL_MODE_CLIENT);
-
-
- // SSL.SSL_OP_NO_SSLv3
- int opts = SSL.SSL_OP_NO_SSLv2 |
- SSL.SSL_OP_SINGLE_DH_USE;
-
- if (!USE_TICKETS || serverMode && ticketKey == null) {
- opts |= SSL.SSL_OP_NO_TICKET;
- }
-
- SSLContext.setOptions(sslCtx, opts);
- // Set revocation
- // SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
-
- // Client certificate verification - maybe make it option
- try {
- SSLContext.setCipherSuite(sslCtx, SSLCipherSuite);
-
-
- if (serverMode) {
- if (ticketKey != null) {
- //SSLExt.setTicketKeys(sslCtx, ticketKey, ticketKey.length);
- }
- if (certFile != null) {
- boolean rc = SSLContext.setCertificate(sslCtx,
- certFile,
- keyFile, null, SSL.SSL_AIDX_DSA);
- if (!rc) {
- throw new IOException("Can't set keys");
- }
- }
- SSLContext.setVerify(sslCtx, SSL.SSL_CVERIFY_NONE, 10);
-
- if (spdyNPN != null) {
- SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length);
- }
- } else {
- if (tlsCertVerifier != null) {
- // NONE ?
- SSLContext.setVerify(sslCtx,
- SSL.SSL_CVERIFY_NONE, 10);
- } else {
- SSLContext.setCACertificate(sslCtx,
- "/etc/ssl/certs/ca-certificates.crt",
- "/etc/ssl/certs");
- SSLContext.setVerify(sslCtx,
- SSL.SSL_CVERIFY_REQUIRE, 10);
- }
-
- if (spdyNPN != null) {
- SSLExt.setNPN(sslCtx, spdyNPN, spdyNPN.length);
- }
- }
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- // TODO: try release buffers
- }
- }
- return sslCtx;
- }
-
- void findPollerAndAdd(AprSocket ch) throws IOException {
- if (ch.poller != null) {
- ch.poller.requestUpdate(ch);
- return;
- }
- assignPoller(ch);
- }
-
- void assignPoller(AprSocket ch) throws IOException {
- AprPoller target = null;
- synchronized (pollers) {
- // Make sure we have min number of pollers
- int needPollers = pollerThreadCount - pollers.size();
- if (needPollers > 0) {
- for (int i = needPollers; i > 0; i--) {
- pollers.add(allocatePoller());
- }
- }
- int max = 0;
- for (AprPoller poller: pollers) {
- int rem = poller.remaining();
- if (rem > max) {
- target = poller;
- max = rem;
- }
- }
- }
- if (target != null && target.add(ch)) {
- return;
- }
-
- // can't be added - add a new poller
- synchronized (pollers) {
- AprPoller poller = allocatePoller();
- poller.add(ch);
- pollers.add(poller);
- }
- }
-
- /**
- * Called on each accepted socket (for servers) or after connection (client)
- * after handshake.
- */
- protected void onSocket(@SuppressWarnings("unused") AprSocket s) {
- // Defaults to NO-OP. Parameter is used by sub-classes.
- }
-
- private class AcceptorThread extends Thread {
- private final int port;
- private long serverSockPool = 0;
- private long serverSock = 0;
-
- private long inetAddress;
-
- AcceptorThread(int port) {
- this.port = port;
- setDaemon(true);
- }
-
- void prepare() throws IOException {
- try {
- // Create the pool for the server socket
- serverSockPool = Pool.create(getRootPool());
-
- int family = Socket.APR_INET;
- inetAddress =
- Address.info(null, family, port, 0, serverSockPool);
-
- // Create the APR server socket
- serverSock = Socket.create(family,
- Socket.SOCK_STREAM,
- Socket.APR_PROTO_TCP, serverSockPool);
-
- if (OS.IS_UNIX) {
- Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
- }
- // Deal with the firewalls that tend to drop the inactive sockets
- Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
- // Bind the server socket
- int ret = Socket.bind(serverSock, inetAddress);
- if (ret != 0) {
- throw new IOException("Socket.bind " + ret + " " +
- Error.strerror(ret) + " port=" + port);
- }
- // Start listening on the server socket
- ret = Socket.listen(serverSock, backlog );
- if (ret != 0) {
- throw new IOException("endpoint.init.listen"
- + ret + " " + Error.strerror(ret));
- }
- if (OS.IS_WIN32 || OS.IS_WIN64) {
- // On Windows set the reuseaddr flag after the bind/listen
- Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
- }
-
- // Sendfile usage on systems which don't support it cause major problems
- if (useSendfile && !Library.APR_HAS_SENDFILE) {
- useSendfile = false;
- }
-
- // Delay accepting of new connections until data is available
- // Only Linux kernels 2.4 + have that implemented
- // on other platforms this call is noop and will return APR_ENOTIMPL.
- if (deferAccept) {
- if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) {
- deferAccept = false;
- }
- }
- } catch (Throwable t) {
- throw new IOException(t);
- }
- }
-
- void unblock() {
- try (java.net.Socket sock = new java.net.Socket()) {
- // Easiest ( maybe safest ) way to interrupt accept
- // we could have it in non-blocking mode, etc
- sock.connect(new InetSocketAddress("127.0.0.1", port));
- } catch (Exception ex) {
- // ignore - the acceptor may have shut down by itself.
- }
- }
-
- @Override
- public void run() {
- while (running) {
- try {
- // each socket has a pool.
- final AprSocket ch = newSocket(AprSocketContext.this);
- ch.setStatus(AprSocket.ACCEPTED);
-
- ch.socket = Socket.accept(serverSock);
- if (!running) {
- break;
- }
- connectionsCount.incrementAndGet();
- if (connectionsCount.get() % 1000 == 0) {
- System.err.println("Accepted: " + connectionsCount.get());
- }
-
- if (nonBlockingAccept && !sslMode) {
- ch.setStatus(AprSocket.CONNECTED);
- // TODO: SSL really needs a thread.
- onSocket(ch);
- } else {
- acceptedQueue.add(ch);
- }
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- Socket.close(serverSock);
- }
- }
-
- private class AcceptorDispatchThread extends Thread {
-
- AcceptorDispatchThread() {
- setDaemon(true);
- }
-
- @Override
- public void run() {
- while(running) {
- try {
- AprSocket ch = acceptedQueue.take();
- if (ch == END) {
- return;
- }
- connectExecutor.execute(ch);
- } catch (InterruptedException e) {
- }
- }
- }
- }
-
- /**
- * Create the poller. With some versions of APR, the maximum poller size will
- * be 62 (recompiling APR is necessary to remove this limitation).
- * @throws IOException
- */
- AprPoller allocatePoller() throws IOException {
- long pool = Pool.create(getRootPool());
- int size = maxConnections / pollerThreadCount;
-
- long serverPollset = allocatePoller(size, pool);
-
- if (serverPollset == 0 && size > 1024) {
- log.severe("Falling back to 1024-sized poll, won't scale");
- size = 1024;
- serverPollset = allocatePoller(size, pool);
- }
- if (serverPollset == 0) {
- log.severe("Falling back to 62-sized poll, won't scale");
- size = 62;
- serverPollset = allocatePoller(size, pool);
- }
-
- AprPoller res = new AprPoller();
- res.pool = pool;
- res.serverPollset = serverPollset;
- res.desc = new long[size * 2];
- res.size = size;
- res.id = contextId++;
- res.setDaemon(true);
- res.setName("AprPoller-" + res.id);
- res.start();
- if (debugPoll && !sizeLogged) {
- sizeLogged = true;
- log.info("Poller size " + (res.desc.length / 2));
- }
- return res;
- }
-
- // Removed the 'thread safe' updates for now, to simplify the code
- // last test shows a small improvement, can switch later.
- private static boolean sizeLogged = false;
-
- protected long allocatePoller(int size, long pool) {
- int flag = threadSafe ? Poll.APR_POLLSET_THREADSAFE: 0;
- for (int i = 0; i < 2; i++) {
- try {
- // timeout must be -1 - or ttl will take effect, strange results.
- return Poll.create(size, pool, flag, -1);
- } catch (Error e) {
- e.printStackTrace();
- if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
- log.info(" endpoint.poll.limitedpollsize " + size);
- return 0;
- } else if (Status.APR_STATUS_IS_ENOTIMPL(e.getError())) {
- // thread safe not supported
- log.severe("THREAD SAFE NOT SUPPORTED" + e);
- threadSafe = false;
- // try again without the flags
- continue;
- } else {
- log.severe("endpoint.poll.initfail" + e);
- return 0;
- }
- }
- }
- log.severe("Unexpected ENOTIMPL with flag==0");
- return 0;
- }
-
- class AprPoller extends Thread {
-
- private int id;
- private int size;
- private long serverPollset = 0;
- private long pool = 0;
- private long[] desc;
-
- private long lastPoll;
- private long lastPollTime;
- private final AtomicBoolean inPoll = new AtomicBoolean(false);
-
- // Should be replaced with socket data.
- // used only to lookup by socket
- private final Map<Long, AprSocket> channels = new HashMap<>();
-
- // Active + pending, must be < desc.length / 2
- // The channel will also have poller=this when active or pending
- // How many sockets have poller == this
- private final AtomicInteger keepAliveCount = new AtomicInteger();
- // Tracks desc, how many sockets are actively polled
- private final AtomicInteger polledCount = new AtomicInteger();
-
- private final AtomicInteger pollCount = new AtomicInteger();
-
- private final List<AprSocket> updates = new ArrayList<>();
-
- @Override
- public void run() {
- if (!running) {
- return;
- }
- if (debugPoll) {
- log.info("Starting poller " + id + " " + (isServer() ? "SRV ": "CLI "));
- }
- long t0 = System.currentTimeMillis();
- while (running) {
- try {
- updates();
-
- // Pool for the specified interval. Remove signaled sockets
- synchronized (this) {
- inPoll.set(true);
- }
- // if updates are added after updates and poll - interrupt will have still
- // work
-
- int rv = Poll.poll(serverPollset, pollTime, desc, true);
- synchronized (this) {
- inPoll.set(false);
- if (!running) {
- break;
- }
- }
-
- pollCount.incrementAndGet();
- lastPoll = System.currentTimeMillis();
- lastPollTime = lastPoll - t0;
-
- if (rv > 0) {
- if (debugPoll) {
- log.info(" Poll() id=" + id + " rv=" + rv + " keepAliveCount=" + keepAliveCount +
- " polled = " + polledCount.get()
- + " time=" + lastPollTime);
- }
- polledCount.addAndGet(-rv);
- for (int pollIdx = 0; pollIdx < rv; pollIdx++) {
- long sock = desc[pollIdx * 2 + 1];
- AprSocket ch;
- boolean blocking = false;
-
- synchronized (channels) {
- ch = channels.get(Long.valueOf(sock));
- if (ch != null) {
- blocking = ch.isBlocking();
- } else {
- log.severe("Polled socket not found !!!!!" + Long.toHexString(sock));
- // TODO: destroy/close the raw socket
- continue;
- }
- }
- // was removed from polling
- ch.clearStatus(AprSocket.POLL);
-
- // We just removed it ( see last param to poll()).
- // Check for failed sockets and hand this socket off to a worker
- long mask = desc[pollIdx * 2];
-
- boolean err = ((mask & Poll.APR_POLLERR) == Poll.APR_POLLERR);
- boolean nval = ((mask & Poll.APR_POLLNVAL) != 0);
- if (err || nval) {
- System.err.println("ERR " + err + " NVAL " + nval);
- }
-
- boolean out = (mask & Poll.APR_POLLOUT) == Poll.APR_POLLOUT;
- boolean in = (mask & Poll.APR_POLLIN) == Poll.APR_POLLIN;
- if (debugPoll) {
- log.info(" Poll channel: " + Long.toHexString(mask) +
- (out ? " OUT" :"") +
- (in ? " IN": "") +
- (err ? " ERR" : "") +
- " Ch: " + ch);
- }
-
- // will be set again in process(), if all read/write is done
- ch.clearStatus(AprSocket.POLLOUT);
- ch.clearStatus(AprSocket.POLLIN);
-
- // try to send if needed
- if (blocking) {
- synchronized (ch) {
- ch.notifyAll();
- }
- getExecutor().execute(ch);
- } else {
- ((AprSocketContext.NonBlockingPollHandler) ch.handler).process(ch, in, out, false);
-
- // Update polling for the channel (in IO thread, safe)
- updateIOThread(ch);
- }
- }
- } else if (rv < 0) {
- int errn = -rv;
- if (errn == Status.TIMEUP) {
- // to or interrupt
-// if (debugPoll) {
-// log.info(" Poll() timeup" + " keepAliveCount=" + keepAliveCount +
-// " polled = " + polledCount.get()
-// + " time=" + lastPollTime);
-// }
- } else if (errn == Status.EINTR) {
- // interrupt - no need to log
- } else {
- if (debugPoll) {
- log.info(" Poll() rv=" + rv + " keepAliveCount=" + keepAliveCount +
- " polled = " + polledCount.get()
- + " time=" + lastPollTime);
- }
- /* Any non timeup or interrupted error is critical */
- if (errn > Status.APR_OS_START_USERERR) {
- errn -= Status.APR_OS_START_USERERR;
- }
- log.severe("endpoint.poll.fail " + errn + " " + Error.strerror(errn));
- // Handle poll critical failure
- synchronized (this) {
- destroyPoller(); // will close all sockets
- }
- continue;
- }
- }
- // TODO: timeouts
- } catch (Throwable t) {
- log.log(Level.SEVERE, "endpoint.poll.error", t);
- }
-
- }
- if (!running) {
- destroyPoller();
- }
- }
-
- /**
- * Destroy the poller.
- */
- protected void destroyPoller() {
- synchronized (pollers) {
- pollers.remove(this);
- }
- log.info("Poller stopped after cnt=" +
- pollCount.get() +
- " sockets=" + channels.size() +
- " lastPoll=" + lastPoll);
-
- // Close all sockets
- synchronized (this) {
- if (serverPollset == 0) {
- return;
- }
-
-// for (AprSocket ch: channels.values()) {
-// ch.poller = null;
-// ch.reset();
-// }
- keepAliveCount.set(0);
- log.warning("Destroy pollset");
- //serverPollset = 0;
- }
- Pool.destroy(pool);
- pool = 0;
- synchronized (pollers) {
- // Now we can destroy the root pool
- if (pollers.size() == 0 && !running) {
- log.info("Destroy server context");
-// AprSocketContext.this.destroy();
- }
- }
- }
-
- /**
- * Called only in poller thread, only used if not thread safe
- * @throws IOException
- */
- protected void updates() throws IOException {
- synchronized (this) {
- for (AprSocket up: updates) {
- updateIOThread(up);
- }
- updates.clear();
- }
- }
-
- void interruptPoll() {
- try {
- int rc = Status.APR_SUCCESS;
- synchronized (this) {
- if (serverPollset != 0) {
- rc = Poll.interrupt(serverPollset);
- } else {
- log.severe("Interrupt with closed pollset");
- }
- }
- if (rc != Status.APR_SUCCESS) {
- log.severe("Failed interrupt and not thread safe");
- }
- } catch (Throwable t) {
- t.printStackTrace();
- if (pollTime > FALLBACK_POLL_TIME) {
- pollTime = FALLBACK_POLL_TIME;
- }
- }
- }
-
-
- int remaining() {
- synchronized (channels) {
- return (desc.length - channels.size() * 2);
- }
- }
-
-
-
- /**
- * Called from any thread, return true if we could add it
- * to pending.
- */
- boolean add(AprSocket ch) throws IOException {
- synchronized (this) {
- if (!running) {
- return false;
- }
- if (keepAliveCount.get() >= size) {
- return false;
- }
- keepAliveCount.incrementAndGet();
- ch.poller = this;
- }
-
- requestUpdate(ch);
-
- return true;
- }
-
- /**
- * May be called outside of IOThread.
- */
- protected void requestUpdate(AprSocket ch) throws IOException {
- synchronized (this) {
- if (!running) {
- return;
- }
- }
- if (isPollerThread()) {
- updateIOThread(ch);
- } else {
- synchronized (this) {
- if (!updates.contains(ch)) {
- updates.add(ch);
- }
- interruptPoll();
- }
- if (debugPoll) {
- log.info("Poll: requestUpdate " + id + " " + ch);
- }
- }
- }
-
- private void updateIOThread(AprSocket ch) throws IOException {
- if (!running || ch.socket == 0) {
- return;
- }
- // called from IO thread, either in 'updates' or after
- // poll.
- //synchronized (ch)
- boolean polling = ch.checkPreConnect(AprSocket.POLL);
-
- int requested = ch.requestedPolling();
- if (requested == 0) {
- if (polling) {
- removeSafe(ch);
- }
- if (ch.isClosed()) {
- synchronized (channels) {
- ch.poller = null;
- channels.remove(Long.valueOf(ch.socket));
- }
- keepAliveCount.decrementAndGet();
- ch.reset();
- }
- } else {
- if (polling) {
- removeSafe(ch);
- }
- // will close if error
- pollAdd(ch, requested);
- }
- if (debugPoll) {
- log.info("Poll: updated=" + id + " " + ch);
- }
- }
-
- /**
- * Called only from IO thread
- */
- private void pollAdd(AprSocket up, int req) throws IOException {
- boolean failed = false;
- int rv;
- synchronized (channels) {
- if (up.isClosed()) {
- return;
- }
- rv = Poll.add(serverPollset, up.socket, req);
- if (rv != Status.APR_SUCCESS) {
- up.poller = null;
- keepAliveCount.decrementAndGet();
- failed = true;
- } else {
- polledCount.incrementAndGet();
- channels.put(Long.valueOf(up.socket), up);
- up.setStatus(AprSocket.POLL);
- }
- }
- if (failed) {
- up.reset();
- throw new IOException("poll add error " + rv + " " + up + " " + Error.strerror(rv));
- }
- }
-
- /**
- * Called only from IO thread. Remove from Poll and channels,
- * set POLL bit to false.
- */
- private void removeSafe(AprSocket up) {
- int rv = Status.APR_EGENERAL;
- if (running && serverPollset != 0 && up.socket != 0
- && !up.isClosed()) {
- rv = Poll.remove(serverPollset, up.socket);
- }
- up.clearStatus(AprSocket.POLL);
-
- if (rv != Status.APR_SUCCESS) {
- log.severe("poll remove error " + Error.strerror(rv) + " " + up);
- } else {
- polledCount.decrementAndGet();
- }
- }
-
-
- public boolean isPollerThread() {
- return Thread.currentThread() == this;
- }
-
- }
-
- /**
- * Callback for poll events, will be invoked in a thread pool.
- *
- */
- public static interface BlockingPollHandler {
-
- /**
- * Called when the socket has been polled for in, out or closed.
- *
- *
- */
- public void process(AprSocket ch, boolean in, boolean out, boolean close);
-
-
- /**
- * Called just before the socket is destroyed
- */
- public void closed(AprSocket ch);
- }
-
- /**
- * Additional callbacks for non-blocking.
- * This can be much faster - but it's harder to code, should be used only
- * for low-level protocol implementation, proxies, etc.
- *
- * The model is restricted in many ways to avoid complexity and bugs:
- *
- * - read can only happen in the IO thread associated with the poller
- * - user doesn't control poll interest - it is set automatically based
- * on read/write results
- * - it is only possible to suspend read, for TCP flow control - also
- * only from the IO thread. Resume can happen from any thread.
- * - it is also possible to call write() from any thread
- */
- public static interface NonBlockingPollHandler extends BlockingPollHandler {
-
- /**
- * Called after connection is established, in a thread pool.
- * Process will be called next.
- */
- public void connected(AprSocket ch);
-
- /**
- * Before close, if an exception happens.
- */
- public void error(AprSocket ch, Throwable t);
- }
-
-}
« no previous file with comments | « java/src/org/apache/tomcat/jni/socket/AprSocket.java ('k') | java/src/org/apache/tomcat/jni/socket/HostInfo.java » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698