Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(980)

Side by Side Diff: plugins/org.chromium.sdk.wipbackend.wk120709/src/org/chromium/sdk/internal/websocket/AbstractWsConnection.java

Issue 11829027: drop old backends (Closed) Base URL: https://chromedevtools.googlecode.com/svn/trunk
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package org.chromium.sdk.internal.websocket;
6
7 import java.io.IOException;
8 import java.nio.ByteBuffer;
9 import java.nio.CharBuffer;
10 import java.nio.charset.Charset;
11 import java.nio.charset.CharsetDecoder;
12 import java.nio.charset.CharsetEncoder;
13 import java.nio.charset.CoderResult;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.logging.Level;
17 import java.util.logging.Logger;
18
19 import org.chromium.sdk.ConnectionLogger;
20 import org.chromium.sdk.RelayOk;
21 import org.chromium.sdk.SyncCallback;
22 import org.chromium.sdk.internal.transport.AbstractSocketWrapper;
23 import org.chromium.sdk.util.SignalRelay;
24 import org.chromium.sdk.util.SignalRelay.AlreadySignalledException;
25 import org.chromium.sdk.util.SignalRelay.SignalConverter;
26
27 public abstract class AbstractWsConnection<INPUT, OUTPUT> implements WsConnectio n {
28 protected static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
29 private static final Logger LOGGER = Logger.getLogger(Hybi00WsConnection.class .getName());
30 private final AbstractSocketWrapper<INPUT, OUTPUT> socketWrapper;
31 private final ConnectionLogger connectionLogger;
32 private volatile boolean isClosingGracefully = false;
33
34 private final BlockingQueue<MessageDispatcher> dispatchQueue =
35 new LinkedBlockingQueue<MessageDispatcher>();
36
37 // Access must be synchronized on dispatchQueue.
38 private boolean isDispatchQueueClosed = false;
39
40 // Access must be synchronized on this.
41 private boolean isOutputClosed = false;
42
43 protected AbstractWsConnection(AbstractSocketWrapper<INPUT, OUTPUT> socketWrap per,
44 ConnectionLogger connectionLogger) {
45 this.socketWrapper = socketWrapper;
46 this.connectionLogger = connectionLogger;
47 try {
48 linkedCloser.bind(socketWrapper.getShutdownRelay(), null, SOCKET_TO_CONNEC TION);
49 } catch (AlreadySignalledException e) {
50 throw new IllegalStateException(e);
51 }
52 }
53
54 public enum CloseReason {
55 /** Socket has been shut down. */
56 CONNECTION_CLOSED,
57
58 /**
59 * Some exception has terminated stream read thread.
60 * Occasionally {@link #CONNECTION_CLOSED} may be replaced with this reason (we are not
61 * accurate enough here).
62 */
63 INPUT_STREAM_PROBLEM,
64
65 /**
66 * Closed as requested by {@link WsConnection#close()}.
67 */
68 USER_REQUEST,
69
70 /**
71 * Connection close has been requested from remote side.
72 */
73 REMOTE_CLOSE_REQUEST,
74
75 /**
76 * Remote side silently closed connection (without breaking a message).
77 */
78 REMOTE_SILENTLY_CLOSED,
79 }
80
81 @Override
82 public RelayOk runInDispatchThread(final Runnable runnable, final SyncCallback syncCallback) {
83 MessageDispatcher messageDispatcher = new MessageDispatcher() {
84 @Override
85 boolean dispatch(Listener userListener) {
86 RuntimeException ex = null;
87 try {
88 runnable.run();
89 } catch (RuntimeException e) {
90 ex = e;
91 throw e;
92 } finally {
93 syncCallback.callbackDone(ex);
94 }
95 return false;
96 }
97 };
98 synchronized (dispatchQueue) {
99 if (isDispatchQueueClosed) {
100 throw new IllegalStateException("Connection is closed");
101 }
102 dispatchQueue.add(messageDispatcher);
103 }
104 return DISPATCH_THREAD_PROMISES_TO_RELAY_OK;
105 }
106
107 @Override
108 public void startListening(final Listener listener) {
109 final INPUT loggableReader = socketWrapper.getLoggableInput();
110 Runnable listenRunnable = new Runnable() {
111 @Override
112 public void run() {
113 Exception closeCause = null;
114 CloseReason closeReason = null;
115 try {
116 closeReason = runListenLoop(loggableReader);
117 if (closeReason == CloseReason.REMOTE_SILENTLY_CLOSED) {
118 LOGGER.log(Level.INFO,
119 "Remote side silently closed connection without 'close' message" );
120 }
121 } catch (IOException e) {
122 closeCause = e;
123 LOGGER.log(Level.SEVERE, "Connection read failure", e);
124 } catch (InterruptedException e) {
125 closeCause = e;
126 closeReason = CloseReason.USER_REQUEST;
127 LOGGER.log(Level.SEVERE, "Thread interruption", e);
128 } finally {
129 synchronized (dispatchQueue) {
130 dispatchQueue.add(EOS_MESSAGE_DISPATCHER);
131 isDispatchQueueClosed = true;
132 }
133
134 if (connectionLogger != null) {
135 connectionLogger.handleEos();
136 }
137 if (closeReason == null) {
138 closeReason = CloseReason.INPUT_STREAM_PROBLEM;
139 }
140 linkedCloser.sendSignal(closeReason, closeCause);
141 }
142 }
143 };
144 Thread readThread = new Thread(listenRunnable, "WebSocket listen thread");
145 readThread.setDaemon(true);
146 readThread.start();
147 if (connectionLogger != null) {
148 connectionLogger.start();
149 }
150
151 Runnable dispatchRunnable = new Runnable() {
152 @Override
153 public void run() {
154 try {
155 runImpl();
156 } catch (InterruptedException e) {
157 LOGGER.log(Level.SEVERE, "Thread interruption", e);
158 }
159 }
160 private void runImpl() throws InterruptedException {
161 while (true) {
162 MessageDispatcher next = dispatchQueue.take();
163 try {
164 boolean isLast = next.dispatch(listener);
165 if (isLast) {
166 return;
167 }
168 } catch (RuntimeException e) {
169 LOGGER.log(Level.SEVERE, "Exception in dispatch thread", e);
170 }
171 }
172 }
173 };
174 Thread dispatchThread = new Thread(dispatchRunnable, "WebSocket dispatch thr ead");
175 dispatchThread.setDaemon(true);
176 dispatchThread.start();
177 }
178
179 @Override
180 public abstract void sendTextualMessage(String message) throws IOException;
181
182 protected abstract CloseReason runListenLoop(INPUT loggableReader)
183 throws IOException, InterruptedException;
184
185 public SignalRelay<?> getCloser() {
186 return linkedCloser;
187 }
188
189 protected AbstractSocketWrapper<INPUT, OUTPUT> getSocketWrapper() {
190 return socketWrapper;
191 }
192
193 protected boolean isClosingGracefully() {
194 return isClosingGracefully;
195 }
196
197 /**
198 * Caller must be synchronized on this.
199 */
200 protected boolean isOutputClosed() {
201 return isOutputClosed;
202 }
203
204 /**
205 * Caller must be synchronized on this.
206 */
207 protected void setOutputClosed(boolean isOutputClosed) {
208 this.isOutputClosed = isOutputClosed;
209 }
210
211 protected BlockingQueue<MessageDispatcher> getDispatchQueue() {
212 return dispatchQueue;
213 }
214
215 private final SignalRelay<CloseReason> linkedCloser =
216 SignalRelay.create(new SignalRelay.Callback<CloseReason>() {
217 @Override public void onSignal(CloseReason param, Exception cause) {
218 isClosingGracefully = true;
219 }
220 });
221
222 /**
223 * A debug charset that simply encodes all non-ascii symbols as %DDD.
224 * We need it for log console because web-socket connection is essentially a r andom
225 * sequence of bytes.
226 */
227 protected static final Charset LOGGER_CHARSET =
228 new Charset("Chromium_Logger_Charset", new String[0]) {
229 @Override
230 public boolean contains(Charset cs) {
231 return this == cs;
232 }
233
234 @Override
235 public CharsetDecoder newDecoder() {
236 return new CharsetDecoder(this, 4 / 2, 4) {
237 @Override
238 protected CoderResult decodeLoop(ByteBuffer in, CharBuffer out) {
239 while (in.hasRemaining()) {
240 byte b = in.get();
241 if (b < 20 && b != (byte) '\n') {
242 if (out.remaining() < 4) {
243 return CoderResult.OVERFLOW;
244 }
245 out.put('%');
246 int code = b;
247 int d1 = code / 100 % 10;
248 int d2 = code / 10 % 10;
249 int d3 = code % 10;
250 out.put((char) ('0' + d1));
251 out.put((char) ('0' + d2));
252 out.put((char) ('0' + d3));
253 } else {
254 char ch = (char) b;
255 if (ch == '%') {
256 if (out.remaining() < 2) {
257 return CoderResult.OVERFLOW;
258 }
259 out.put('%');
260 out.put('%');
261 } else {
262 if (!out.hasRemaining()) {
263 return CoderResult.OVERFLOW;
264 }
265 out.put(ch);
266 }
267 }
268 }
269 return CoderResult.UNDERFLOW;
270 }
271 };
272 }
273
274 @Override
275 public CharsetEncoder newEncoder() {
276 throw new UnsupportedOperationException();
277 }
278 };
279
280 protected static void dumpByte(byte b, StringBuilder output) {
281 output.append('%');
282 int code = (b + 256) % 256;
283 int d1 = code / 100 % 10;
284 int d2 = code / 10 % 10;
285 int d3 = code % 10;
286 output.append((char) ('0' + d1));
287 output.append((char) ('0' + d2));
288 output.append((char) ('0' + d3));
289 }
290
291 static abstract class MessageDispatcher {
292 /**
293 * Dispatches message to user.
294 * @return true if it was a last message in queue
295 */
296 abstract boolean dispatch(Listener userListener);
297 }
298
299 private static final MessageDispatcher EOS_MESSAGE_DISPATCHER = new MessageDis patcher() {
300 @Override
301 boolean dispatch(Listener userListener) {
302 userListener.eofMessage();
303 return true;
304 }
305 };
306
307 private static final SignalConverter<AbstractSocketWrapper.ShutdownSignal, Clo seReason>
308 SOCKET_TO_CONNECTION =
309 new SignalConverter<AbstractSocketWrapper.ShutdownSignal, CloseReason>() {
310 @Override public CloseReason convert(AbstractSocketWrapper.ShutdownSigna l source) {
311 return CloseReason.CONNECTION_CLOSED;
312 }
313 };
314
315 private static final RelayOk DISPATCH_THREAD_PROMISES_TO_RELAY_OK = new RelayO k() {};
316 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698