OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package org.chromium.mojo.bindings; | |
6 | |
7 import org.chromium.mojo.system.AsyncWaiter; | |
8 import org.chromium.mojo.system.Core; | |
9 import org.chromium.mojo.system.MessagePipeHandle; | |
10 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; | |
11 import org.chromium.mojo.system.MojoException; | |
12 import org.chromium.mojo.system.MojoResult; | |
13 import org.chromium.mojo.system.ResultAnd; | |
14 | |
15 import java.nio.ByteBuffer; | |
16 | |
17 /** | |
18 * A {@link Connector} owns a {@link MessagePipeHandle} and will send any receiv
ed messages to the | |
19 * registered {@link MessageReceiver}. It also acts as a {@link MessageReceiver}
and will send any | |
20 * message through the handle. | |
21 * <p> | |
22 * The method |start| must be called before the {@link Connector} will start lis
tening to incoming | |
23 * messages. | |
24 */ | |
25 public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle
> { | |
26 | |
27 /** | |
28 * The callback that is notified when the state of the owned handle changes. | |
29 */ | |
30 private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCall
back(); | |
31 | |
32 /** | |
33 * The owned message pipe. | |
34 */ | |
35 private final MessagePipeHandle mMessagePipeHandle; | |
36 | |
37 /** | |
38 * A waiter which is notified when a new message is available on the owned m
essage pipe. | |
39 */ | |
40 private final AsyncWaiter mAsyncWaiter; | |
41 | |
42 /** | |
43 * The {@link MessageReceiver} to which received messages are sent. | |
44 */ | |
45 private MessageReceiver mIncomingMessageReceiver; | |
46 | |
47 /** | |
48 * The Cancellable for the current wait. Is |null| when not currently waitin
g for new messages. | |
49 */ | |
50 private AsyncWaiter.Cancellable mCancellable; | |
51 | |
52 /** | |
53 * The error handler to notify of errors. | |
54 */ | |
55 private ConnectionErrorHandler mErrorHandler; | |
56 | |
57 /** | |
58 * Create a new connector over a |messagePipeHandle|. The created connector
will use the default | |
59 * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeH
andle|. | |
60 */ | |
61 public Connector(MessagePipeHandle messagePipeHandle) { | |
62 this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(me
ssagePipeHandle)); | |
63 } | |
64 | |
65 /** | |
66 * Create a new connector over a |messagePipeHandle| using the given {@link
AsyncWaiter} to get | |
67 * notified of changes on the handle. | |
68 */ | |
69 public Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaite
r) { | |
70 mCancellable = null; | |
71 mMessagePipeHandle = messagePipeHandle; | |
72 mAsyncWaiter = asyncWaiter; | |
73 } | |
74 | |
75 /** | |
76 * Set the {@link MessageReceiver} that will receive message from the owned
message pipe. | |
77 */ | |
78 public void setIncomingMessageReceiver(MessageReceiver incomingMessageReceiv
er) { | |
79 mIncomingMessageReceiver = incomingMessageReceiver; | |
80 } | |
81 | |
82 /** | |
83 * Set the {@link ConnectionErrorHandler} that will be notified of errors on
the owned message | |
84 * pipe. | |
85 */ | |
86 public void setErrorHandler(ConnectionErrorHandler errorHandler) { | |
87 mErrorHandler = errorHandler; | |
88 } | |
89 | |
90 /** | |
91 * Start listening for incoming messages. | |
92 */ | |
93 public void start() { | |
94 assert mCancellable == null; | |
95 registerAsyncWaiterForRead(); | |
96 } | |
97 | |
98 /** | |
99 * @see MessageReceiver#accept(Message) | |
100 */ | |
101 @Override | |
102 public boolean accept(Message message) { | |
103 try { | |
104 mMessagePipeHandle.writeMessage(message.getData(), | |
105 message.getHandles(), MessagePipeHandle.WriteFlags.NONE); | |
106 return true; | |
107 } catch (MojoException e) { | |
108 onError(e); | |
109 return false; | |
110 } | |
111 } | |
112 | |
113 /** | |
114 * Pass the owned handle of the connector. After this, the connector is disc
onnected. It cannot | |
115 * accept new message and it isn't listening to the handle anymore. | |
116 * | |
117 * @see org.chromium.mojo.bindings.HandleOwner#passHandle() | |
118 */ | |
119 @Override | |
120 public MessagePipeHandle passHandle() { | |
121 cancelIfActive(); | |
122 MessagePipeHandle handle = mMessagePipeHandle.pass(); | |
123 if (mIncomingMessageReceiver != null) { | |
124 mIncomingMessageReceiver.close(); | |
125 } | |
126 return handle; | |
127 } | |
128 | |
129 /** | |
130 * @see java.io.Closeable#close() | |
131 */ | |
132 @Override | |
133 public void close() { | |
134 cancelIfActive(); | |
135 mMessagePipeHandle.close(); | |
136 if (mIncomingMessageReceiver != null) { | |
137 MessageReceiver incomingMessageReceiver = mIncomingMessageReceiver; | |
138 mIncomingMessageReceiver = null; | |
139 incomingMessageReceiver.close(); | |
140 } | |
141 } | |
142 | |
143 private class AsyncWaiterCallback implements AsyncWaiter.Callback { | |
144 | |
145 /** | |
146 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int) | |
147 */ | |
148 @Override | |
149 public void onResult(int result) { | |
150 Connector.this.onAsyncWaiterResult(result); | |
151 } | |
152 | |
153 /** | |
154 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onError(MojoExcept
ion) | |
155 */ | |
156 @Override | |
157 public void onError(MojoException exception) { | |
158 mCancellable = null; | |
159 Connector.this.onError(exception); | |
160 } | |
161 | |
162 } | |
163 | |
164 /** | |
165 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int) | |
166 */ | |
167 private void onAsyncWaiterResult(int result) { | |
168 mCancellable = null; | |
169 if (result == MojoResult.OK) { | |
170 readOutstandingMessages(); | |
171 } else { | |
172 onError(new MojoException(result)); | |
173 } | |
174 } | |
175 | |
176 private void onError(MojoException exception) { | |
177 close(); | |
178 assert mCancellable == null; | |
179 if (mErrorHandler != null) { | |
180 mErrorHandler.onConnectionError(exception); | |
181 } | |
182 } | |
183 | |
184 /** | |
185 * Register to be called back when a new message is available on the owned m
essage pipe. | |
186 */ | |
187 private void registerAsyncWaiterForRead() { | |
188 assert mCancellable == null; | |
189 if (mAsyncWaiter != null) { | |
190 mCancellable = mAsyncWaiter.asyncWait(mMessagePipeHandle, Core.Handl
eSignals.READABLE, | |
191 Core.DEADLINE_INFINITE, mAsyncWaiterCallback); | |
192 } else { | |
193 onError(new MojoException(MojoResult.INVALID_ARGUMENT)); | |
194 } | |
195 } | |
196 | |
197 /** | |
198 * Read all available messages on the owned message pipe. | |
199 */ | |
200 private void readOutstandingMessages() { | |
201 ResultAnd<Boolean> result; | |
202 do { | |
203 try { | |
204 result = readAndDispatchMessage(mMessagePipeHandle, mIncomingMes
sageReceiver); | |
205 } catch (MojoException e) { | |
206 onError(e); | |
207 return; | |
208 } | |
209 } while (result.getValue()); | |
210 if (result.getMojoResult() == MojoResult.SHOULD_WAIT) { | |
211 registerAsyncWaiterForRead(); | |
212 } else { | |
213 onError(new MojoException(result.getMojoResult())); | |
214 } | |
215 } | |
216 | |
217 private void cancelIfActive() { | |
218 if (mCancellable != null) { | |
219 mCancellable.cancel(); | |
220 mCancellable = null; | |
221 } | |
222 } | |
223 | |
224 /** | |
225 * Read a message, and pass it to the given |MessageReceiver| if not null. I
f the | |
226 * |MessageReceiver| is null, the message is lost. | |
227 * | |
228 * @param receiver The {@link MessageReceiver} that will receive the read {@
link Message}. Can | |
229 * be <code>null</code>, in which case the message is discarded. | |
230 */ | |
231 static ResultAnd<Boolean> readAndDispatchMessage( | |
232 MessagePipeHandle handle, MessageReceiver receiver) { | |
233 // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performan
ce. | |
234 ResultAnd<ReadMessageResult> result = | |
235 handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE); | |
236 if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) { | |
237 return new ResultAnd<Boolean>(result.getMojoResult(), false); | |
238 } | |
239 ReadMessageResult readResult = result.getValue(); | |
240 assert readResult != null; | |
241 ByteBuffer buffer = ByteBuffer.allocateDirect(readResult.getMessageSize(
)); | |
242 result = handle.readMessage( | |
243 buffer, readResult.getHandlesCount(), MessagePipeHandle.ReadFlag
s.NONE); | |
244 if (receiver != null && result.getMojoResult() == MojoResult.OK) { | |
245 boolean accepted = receiver.accept(new Message(buffer, result.getVal
ue().getHandles())); | |
246 return new ResultAnd<Boolean>(result.getMojoResult(), accepted); | |
247 } | |
248 return new ResultAnd<Boolean>(result.getMojoResult(), false); | |
249 } | |
250 } | |
OLD | NEW |