Chromium Code Reviews| Index: sdk/lib/io/secure_socket.dart |
| diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart |
| index 05ec18cf4fe326d33f6f686cd9b5adafbbff2b6e..c96d311b4377d9de8de3426e69bd8c3300d15f35 100644 |
| --- a/sdk/lib/io/secure_socket.dart |
| +++ b/sdk/lib/io/secure_socket.dart |
| @@ -103,7 +103,7 @@ abstract class SecureSocket implements Socket { |
| * the right thing to do. |
| * |
| * If some of the data of the TLS handshake has already been read |
| - * from the socket this data can be passed in the [carryOverData] |
| + * from the socket this data can be passed in the [bufferedData] |
| * parameter. This data will be processed before any other data |
| * available on the socket. |
| * |
| @@ -114,7 +114,7 @@ abstract class SecureSocket implements Socket { |
| static Future<SecureSocket> secureServer( |
| Socket socket, |
| String certificateName, |
| - {List<int> carryOverData, |
| + {List<int> bufferedData, |
| bool requestClientCertificate: false, |
| bool requireClientCertificate: false}) { |
| var completer = new Completer(); |
| @@ -124,7 +124,7 @@ abstract class SecureSocket implements Socket { |
| detachedRaw[0], |
| certificateName, |
| subscription: detachedRaw[1], |
| - carryOverData: carryOverData, |
| + bufferedData: bufferedData, |
| requestClientCertificate: requestClientCertificate, |
| requireClientCertificate: requireClientCertificate); |
| }) |
| @@ -284,7 +284,7 @@ abstract class RawSecureSocket implements RawSocket { |
| * events. |
| * |
| * If some of the data of the TLS handshake has already been read |
| - * from the socket this data can be passed in the [carryOverData] |
| + * from the socket this data can be passed in the [bufferedData] |
| * parameter. This data will be processed before any other data |
| * available on the socket. |
| * |
| @@ -296,7 +296,7 @@ abstract class RawSecureSocket implements RawSocket { |
| RawSocket socket, |
| String certificateName, |
| {StreamSubscription subscription, |
| - List<int> carryOverData, |
| + List<int> bufferedData, |
|
Anders Johnsen
2013/06/13 13:14:17
Can you factor out this change?
|
| bool requestClientCertificate: false, |
| bool requireClientCertificate: false}) { |
| socket.readEventsEnabled = false; |
| @@ -308,7 +308,7 @@ abstract class RawSecureSocket implements RawSocket { |
| is_server: true, |
| socket: socket, |
| subscription: subscription, |
| - carryOverData: carryOverData, |
| + bufferedData: bufferedData, |
| requestClientCertificate: requestClientCertificate, |
| requireClientCertificate: requireClientCertificate); |
| } |
| @@ -343,7 +343,6 @@ class X509Certificate { |
| class _RawSecureSocket extends Stream<RawSocketEvent> |
| implements RawSecureSocket { |
| // Status states |
| - static final int NOT_CONNECTED = 200; |
| static final int HANDSHAKE = 201; |
| static final int CONNECTED = 202; |
| static final int CLOSED = 203; |
| @@ -356,14 +355,17 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| static final int WRITE_ENCRYPTED = 3; |
| static final int NUM_BUFFERS = 4; |
| + // Is a buffer identifier for an encrypted buffer? |
| + static bool isEncrypted(int identifier) => identifier >= READ_ENCRYPTED; |
| + |
| RawSocket _socket; |
| final Completer<_RawSecureSocket> _handshakeComplete = |
| new Completer<_RawSecureSocket>(); |
| StreamController<RawSocketEvent> _controller; |
| Stream<RawSocketEvent> _stream; |
| StreamSubscription<RawSocketEvent> _socketSubscription; |
| - List<int> _carryOverData; |
| - int _carryOverDataIndex = 0; |
| + List<int> _bufferedData; |
| + int _bufferedDataIndex = 0; |
| final InternetAddress address; |
| final bool is_server; |
| final String certificateName; |
| @@ -372,17 +374,24 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| final bool sendClientCertificate; |
| final Function onBadCertificate; |
| - var _status = NOT_CONNECTED; |
| + var _status = HANDSHAKE; |
| bool _writeEventsEnabled = true; |
| bool _readEventsEnabled = true; |
| + int _pauseCount = 0; |
| + bool _pendingReadEvent = false; |
| bool _socketClosedRead = false; // The network socket is closed for reading. |
| bool _socketClosedWrite = false; // The network socket is closed for writing. |
| bool _closedRead = false; // The secure socket has fired an onClosed event. |
| bool _closedWrite = false; // The secure socket has been closed for writing. |
| - bool _filterReadEmpty = true; // There is no buffered data to read. |
| - bool _filterWriteEmpty = true; // There is no buffered data to be written. |
| + bool _readEmpty = true; // There is no buffered data to read. |
| + bool _writeEmpty = true; // There is no buffered data to be written. |
| bool _connectPending = false; |
| + bool _filterPending = false; |
| + bool _filterActive = false; |
| + |
| _SecureFilter _secureFilter = new _SecureFilter(); |
| + int _filterPointer; |
| + SendPort _filterService; |
| static Future<_RawSecureSocket> connect( |
| host, |
| @@ -391,7 +400,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| {bool is_server, |
| RawSocket socket, |
| StreamSubscription subscription, |
| - List<int> carryOverData, |
| + List<int> bufferedData, |
| bool requestClientCertificate: false, |
| bool requireClientCertificate: false, |
| bool sendClientCertificate: false, |
| @@ -409,7 +418,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| is_server, |
| socket, |
| subscription, |
| - carryOverData, |
| + bufferedData, |
| requestClientCertificate, |
| requireClientCertificate, |
| sendClientCertificate, |
| @@ -425,7 +434,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| bool this.is_server, |
| RawSocket socket, |
| StreamSubscription this._socketSubscription, |
| - List<int> this._carryOverData, |
| + List<int> this._bufferedData, |
| bool this.requestClientCertificate, |
| bool this.requireClientCertificate, |
| bool this.sendClientCertificate, |
| @@ -441,7 +450,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| // errors will be reported through the future or the stream. |
| _verifyFields(); |
| _secureFilter.init(); |
| - if (_carryOverData != null) _readFromCarryOver(); |
| + _filterPointer = _secureFilter._pointer(); |
| _secureFilter.registerHandshakeCompleteCallback( |
| _secureHandshakeCompleteHandler); |
| if (onBadCertificate != null) { |
| @@ -477,7 +486,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| requireClientCertificate, |
| requireClientCertificate, |
| sendClientCertificate); |
| - _status = HANDSHAKE; |
| _secureHandshake(); |
| }) |
| .catchError((error) { |
| @@ -490,10 +498,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| {void onError(error), |
| void onDone(), |
| bool cancelOnError}) { |
| - if (_writeEventsEnabled) { |
| - _writeEventsEnabled = false; |
| - _controller.add(RawSocketEvent.WRITE); |
| - } |
| + _sendWriteEvent(); |
| return _stream.listen(onData, |
| onError: onError, |
| onDone: onDone, |
| @@ -535,7 +540,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| int available() { |
| if (_status != CONNECTED) return 0; |
| - _readEncryptedData(); |
| return _secureFilter.buffers[READ_PLAINTEXT].length; |
| } |
| @@ -551,7 +555,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| } |
| _socketClosedWrite = true; |
| _socketClosedRead = true; |
| - if (_secureFilter != null) { |
| + if (!_filterActive && _secureFilter != null) { |
| _secureFilter.destroy(); |
| _secureFilter = null; |
| } |
| @@ -566,8 +570,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| if (direction == SocketDirection.SEND || |
| direction == SocketDirection.BOTH) { |
| _closedWrite = true; |
| - _writeEncryptedData(); |
| - if (_filterWriteEmpty) { |
| + if (_writeEmpty) { |
| _socket.shutdown(SocketDirection.SEND); |
| _socketClosedWrite = true; |
| if (_closedRead) { |
| @@ -589,30 +592,20 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| bool get writeEventsEnabled => _writeEventsEnabled; |
| void set writeEventsEnabled(bool value) { |
| - if (value && |
| - _controller.hasListener && |
| - _secureFilter != null && |
| - _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
| - Timer.run(() => _controller.add(RawSocketEvent.WRITE)); |
| - } else { |
| - _writeEventsEnabled = value; |
| + _writeEventsEnabled = value; |
| + if (value) { |
| + Timer.run(() => _sendWriteEvent()); |
| } |
| } |
| bool get readEventsEnabled => _readEventsEnabled; |
| void set readEventsEnabled(bool value) { |
| - _readEventsEnabled = value; |
| - if (value && |
| - ((_secureFilter != null && |
| - _secureFilter.buffers[READ_PLAINTEXT].length > 0) || |
| - _socketClosedRead)) { |
| - // We might not have no underlying socket to set off read events. |
| - Timer.run(_readHandler); |
| - } |
| + _readEventsEnabled = value; |
| + _scheduleReadEvent(); |
| } |
| - List<int> read([int len]) { |
| + List<int> read([int length]) { |
| if (_closedRead) { |
| throw new SocketException("Reading from a closed socket"); |
| } |
| @@ -620,47 +613,31 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| return null; |
| } |
| var buffer = _secureFilter.buffers[READ_PLAINTEXT]; |
| - _readEncryptedData(); |
| - int toRead = buffer.length; |
| - if (len != null) { |
| - if (len is! int || len < 0) { |
| + if (length == null) { |
| + length = buffer.length; |
| + } else if (length is! int || length < 0) { |
| throw new ArgumentError( |
| - "Invalid len parameter in SecureSocket.read (len: $len)"); |
| - } |
| - if (len < toRead) { |
| - toRead = len; |
| - } |
| + "Invalid length parameter in SecureSocket.read (length: $length)"); |
| + } else { |
| + length = min(length, buffer.length); |
| } |
| - List<int> result = (toRead == 0) ? null : |
| - buffer.data.sublist(buffer.start, buffer.start + toRead); |
| - buffer.advanceStart(toRead); |
| - |
| - // Set up a read event if the filter still has data. |
| - if (!_filterReadEmpty) { |
| - Timer.run(_readHandler); |
| + if (length == 0) return null; |
| + List<int> result = new Uint8List(length); |
| + int bytesRead = 0; |
| + while (bytesRead < length) { // Loop over 2 data ranges in circular buffer. |
| + int toRead = buffer.linearLength; |
| + result.setRange(bytesRead, |
| + bytesRead + toRead, |
| + buffer.data, |
| + buffer.start); |
| + buffer.advanceStart(toRead); |
| + bytesRead += toRead; |
| } |
| - |
| - if (_socketClosedRead) { // An onClose event is pending. |
| - // _closedRead is false, since we are in a read call. |
| - if (!_filterReadEmpty) { |
| - // _filterReadEmpty may be out of date since read empties |
| - // the plaintext buffer after calling _readEncryptedData. |
| - // TODO(whesse): Fix this as part of fixing read. |
| - _readEncryptedData(); |
| - } |
| - if (_filterReadEmpty) { |
| - // This can't be an else clause: the value of _filterReadEmpty changes. |
| - // This must be asynchronous, because we are in a read call. |
| - Timer.run(_closeHandler); |
| - } |
| - } |
| - |
| + _runFilter(); |
| return result; |
| } |
| - // Write the data to the socket, and flush it as much as possible |
| - // until it would block. If the write would block, _writeEncryptedData sets |
| - // up handlers to flush the pipeline when possible. |
| + // Write the data to the socket, and schedule the filter to encrypt it. |
| int write(List<int> data, [int offset, int bytes]) { |
| if (_closedWrite) { |
| _controller.addError(new SocketException("Writing to a closed socket")); |
| @@ -675,13 +652,18 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| if (bytes > buffer.free) { |
| bytes = buffer.free; |
| } |
| - if (bytes > 0) { |
| - int startIndex = buffer.start + buffer.length; |
| - buffer.data.setRange(startIndex, startIndex + bytes, data, offset); |
| - buffer.length += bytes; |
| + int written = 0; |
| + int toWrite = min(bytes, buffer.linearFree); |
| + while (toWrite > 0) { // The buffer may contain two linear free ranges. |
| + _writeEmpty = false; |
| + buffer.data.setRange(buffer.end, buffer.end + toWrite, data, offset); |
| + buffer.advanceEnd(toWrite); |
| + offset += toWrite; |
| + written += toWrite; |
| + toWrite = min(bytes - written, buffer.linearFree); |
| } |
| - _writeEncryptedData(); // Tries to flush all pipeline stages. |
| - return bytes; |
| + _runFilter(); |
| + return written; |
| } |
| X509Certificate get peerCertificate => _secureFilter.peerCertificate; |
| @@ -691,28 +673,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| return _socket.setOption(option, enabled); |
| } |
| - void _writeHandler() { |
| - if (_status == CLOSED) return; |
| - _writeEncryptedData(); |
| - if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) { |
| - // Close _socket for write, by calling shutdown(), to avoid cloning the |
| - // socket closing code in shutdown(). |
| - shutdown(SocketDirection.SEND); |
| - } |
| - if (_status == HANDSHAKE) { |
| - try { |
| - _secureHandshake(); |
| - } catch (e) { _reportError(e, "RawSecureSocket error"); } |
| - } else if (_status == CONNECTED && |
| - _controller.hasListener && |
| - _writeEventsEnabled && |
| - _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
| - // Reset the one-shot handler. |
| - _writeEventsEnabled = false; |
| - _controller.add(RawSocketEvent.WRITE); |
| - } |
| - } |
| - |
| void _eventDispatcher(RawSocketEvent event) { |
| if (event == RawSocketEvent.READ) { |
| _readHandler(); |
| @@ -723,56 +683,18 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| } |
| } |
| - void _readFromCarryOver() { |
| - assert(_carryOverData != null); |
| - var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; |
| - var bytes = _carryOverData.length - _carryOverDataIndex; |
| - int startIndex = encrypted.start + encrypted.length; |
| - encrypted.data.setRange(startIndex, |
| - startIndex + bytes, |
| - _carryOverData, |
| - _carryOverDataIndex); |
| - encrypted.length += bytes; |
| - _carryOverDataIndex += bytes; |
| - if (_carryOverData.length == _carryOverDataIndex) { |
| - _carryOverData = null; |
| - } |
| + void _readHandler() { |
| + _readSocket(); |
| + _runFilter(); |
| } |
| - void _readHandler() { |
| - if (_status == CLOSED) { |
| - return; |
| - } else if (_status == HANDSHAKE) { |
| - try { |
| - _secureHandshake(); |
| - if (_status != HANDSHAKE) _readHandler(); |
| - } catch (e) { _reportError(e, "RawSecureSocket error"); } |
| - } else { |
| - if (_status != CONNECTED) { |
| - // Cannot happen. |
| - throw new SocketException("Internal SocketIO Error"); |
| - } |
| - try { |
| - _readEncryptedData(); |
| - } catch (e) { _reportError(e, "RawSecureSocket error"); } |
| - if (!_filterReadEmpty) { |
| - if (_readEventsEnabled) { |
| - if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) { |
| - _controller.add(RawSocketEvent.READ); |
| - } |
| - if (_socketClosedRead) { |
| - // Keep firing read events until we are paused or buffer is empty. |
| - Timer.run(_readHandler); |
| - } |
| - } |
| - } else if (_socketClosedRead) { |
| - _closeHandler(); |
| - } |
| - } |
| + void _writeHandler() { |
| + _writeSocket(); |
| + _runFilter(); |
| } |
| void _doneHandler() { |
| - if (_filterReadEmpty) { |
| + if (_readEmpty) { |
| _close(); |
| } |
| } |
| @@ -802,38 +724,59 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| if (_status == CONNECTED) { |
| if (_closedRead) return; |
| _socketClosedRead = true; |
| - if (_filterReadEmpty) { |
| + if (_readEmpty) { |
| _closedRead = true; |
| _controller.add(RawSocketEvent.READ_CLOSED); |
| if (_socketClosedWrite) { |
| _close(); |
| } |
| + } else { |
| + _runFilter(); |
| } |
| } else if (_status == HANDSHAKE) { |
| + _socketClosedRead = true; |
| + if (_readEmpty) { |
| _reportError( |
| new SocketException('Connection terminated during handshake'), |
| - 'handshake error'); |
| + 'RawSecureSocket error'); |
| + } else { |
| + _secureHandshake(); |
| + } |
| } |
| } |
| void _secureHandshake() { |
| - _readEncryptedData(); |
| - _secureFilter.handshake(); |
| - _writeEncryptedData(); |
| + try { |
| + _secureFilter.handshake(); |
| + _writeEmpty = false; |
| + _readSocket(); |
| + _writeSocket(); |
| + _runFilter(); |
| + } catch (e) { |
| + _reportError(e, "RawSecureSocket error"); |
| + } |
| } |
| void _secureHandshakeCompleteHandler() { |
| _status = CONNECTED; |
| if (_connectPending) { |
| _connectPending = false; |
| - // If we complete the future synchronously, user code will run here, |
| - // and modify the state of the RawSecureSocket. For example, it |
| - // could close the socket, and set _filter to null. |
| + // We don't want user code to run synchronously in this callback. |
| Timer.run(() => _handshakeComplete.complete(this)); |
| } |
| } |
| void _onPauseStateChange() { |
| + if (_controller.isPaused) { |
| + _pauseCount++; |
| + } else { |
| + _pauseCount--; |
| + if (_pauseCount == 0) { |
| + _scheduleReadEvent(); |
| + _sendWriteEvent(); // Can send event synchronously. |
| + } |
| + } |
| + |
| if (!_socketClosedRead || !_socketClosedWrite) { |
| if (_controller.isPaused) { |
| _socketSubscription.pause(); |
| @@ -849,120 +792,294 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
| } |
| } |
| - void _readEncryptedData() { |
| - // Read from the socket, and push it through the filter as far as |
| - // possible. |
| + void _readSocket() { |
| + if (_status == CLOSED) return; |
| var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; |
| - var plaintext = _secureFilter.buffers[READ_PLAINTEXT]; |
| - bool progress = true; |
| - while (progress) { |
| - progress = false; |
| - // Do not try to read plaintext from the filter while handshaking. |
| - if ((_status == CONNECTED) && plaintext.free > 0) { |
| - int bytes = _secureFilter.processBuffer(READ_PLAINTEXT); |
| - if (bytes > 0) { |
| - plaintext.length += bytes; |
| - progress = true; |
| + while (true) { // Loop over both linear segments in the buffer. |
| + var encrypted_free = encrypted.linearFree; |
| + if (encrypted_free == 0) break; |
| + List<int> data = readSocketOrBufferedData(encrypted_free); |
| + if (data == null) break; |
| + int bytes = data.length; |
| + encrypted.data.setRange(encrypted.end, encrypted.end + bytes, data); |
| + encrypted.advanceEnd(bytes); |
| + _readEmpty = false; |
| + } |
| + } |
| + |
| + void _runFilter() { |
| + _filterPending = true; |
| + _tryFilter(); |
| + } |
| + |
| + void _tryFilter() { |
| + if (_status == CLOSED) return; |
| + if (_filterPending && !_filterActive) { |
| + _filterActive = true; |
| + _filterPending = false; |
| + _pushAllFilterStages().then((progress) { |
| + _filterActive = false; |
| + if (_status == CLOSED) { |
| + _secureFilter.destroy(); |
| + _secureFilter = null; |
| + return; |
| } |
| - } |
| - if (encrypted.length > 0) { |
| - int bytes = _secureFilter.processBuffer(READ_ENCRYPTED); |
| - if (bytes > 0) { |
| - encrypted.advanceStart(bytes); |
| - progress = true; |
| + if (_writeEmpty && _closedWrite && !_socketClosedWrite) { |
| + // Checks for and handles all cases of partially closed sockets. |
| + shutdown(SocketDirection.SEND); |
| + if (_status == CLOSED) return; |
| } |
| - } |
| - if (!_socketClosedRead && encrypted.free > 0) { |
| - if (_carryOverData != null) { |
| - _readFromCarryOver(); |
| - progress = true; |
| - } else { |
| - List<int> data = _socket.read(encrypted.free); |
| - if (data != null) { |
| - int bytes = data.length; |
| - int startIndex = encrypted.start + encrypted.length; |
| - encrypted.data.setRange(startIndex, startIndex + bytes, data); |
| - encrypted.length += bytes; |
| - progress = true; |
| + if (_readEmpty && _socketClosedRead && !_closedRead) { |
| + if (_status == HANDSHAKE) { |
| + _secureFilter.handshake(); |
| + if (_status == HANDSHAKE) { |
| + _reportError( |
| + new SocketException('Connection terminated during handshake'), |
| + 'RawSecureSocket error'); |
| + } |
| + } |
| + _closeHandler(); |
| + } |
| + if (_status == CLOSED) return; |
| + if (progress) { |
| + _filterPending = true; |
| + _readSocket(); |
| + _writeSocket(); |
| + if (_status == HANDSHAKE) { |
| + _secureHandshake(); |
| } |
| } |
| + _tryFilter(); |
| + }); |
| + } |
| + } |
| + |
| + List<int> readSocketOrBufferedData(int bytes) { |
| + if (_bufferedData != null) { |
| + if (bytes > _bufferedData.length - _bufferedDataIndex) { |
| + bytes = _bufferedData.length - _bufferedDataIndex; |
| } |
| + var result = _bufferedData.sublist(_bufferedDataIndex, |
| + _bufferedDataIndex + bytes); |
| + _bufferedDataIndex += bytes; |
| + if (_bufferedData.length == _bufferedDataIndex) { |
| + _bufferedData = null; |
| + } |
| + return result; |
| + } else if (!_socketClosedRead) { |
| + try { |
| + return _socket.read(bytes); |
| + } catch (e) { |
| + _reportError(e, "RawSecureSocket error reading encrypted socket"); |
| + return null; |
| + } |
| + } else { |
| + return null; |
| } |
| - // If there is any data in any stages of the filter, there should |
| - // be data in the plaintext buffer after this process. |
| - // TODO(whesse): Verify that this is true, and there can be no |
| - // partial encrypted block stuck in the secureFilter. |
| - _filterReadEmpty = (plaintext.length == 0); |
| } |
| - void _writeEncryptedData() { |
| + void _writeSocket() { |
| if (_socketClosedWrite) return; |
| var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; |
| - var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; |
| - while (true) { |
| - if (encrypted.length > 0) { |
| - // Write from the filter to the socket. |
| - int bytes = _socket.write(encrypted.data, |
| - encrypted.start, |
| - encrypted.length); |
| - encrypted.advanceStart(bytes); |
| - if (encrypted.length > 0) { |
| + while (true) { // Loop over both linear ranges in circular buffer. |
| + // Get the length of the first data segment in circular buffer. |
| + var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; |
| + var encrypted_length = encrypted.linearLength; |
| + if (encrypted_length == 0) break; |
| + int bytes = _socket.write(encrypted.data, |
| + encrypted.start, |
| + encrypted_length); |
| + encrypted.advanceStart(bytes); |
| + if (bytes < encrypted_length) { |
| // The socket has blocked while we have data to write. |
| // We must be notified when it becomes unblocked. |
| _socket.writeEventsEnabled = true; |
| - _filterWriteEmpty = false; |
| break; |
| + } |
| + } |
| + } |
| + |
| + // If a read event should be sent, add it to the controller. |
| + _scheduleReadEvent() { |
| + if (!_pendingReadEvent && |
| + _readEventsEnabled && |
| + _pauseCount == 0 && |
| + _secureFilter != null && |
| + !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
| + _pendingReadEvent = true; |
| + Timer.run(_sendReadEvent); |
| + } |
| + } |
| + |
| + _sendReadEvent() { |
| + _pendingReadEvent = false; |
| + if (_readEventsEnabled && |
| + _pauseCount == 0 && |
| + _secureFilter != null && |
| + !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
| + _controller.add(RawSocketEvent.READ); |
| + _scheduleReadEvent(); |
| + } |
| + } |
| + |
| + // If a write event should be sent, add it to the controller. |
| + _sendWriteEvent() { |
| + if (!_closedWrite && |
| + _writeEventsEnabled && |
| + _pauseCount == 0 && |
| + _secureFilter != null && |
| + _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
| + _writeEventsEnabled = false; |
| + _controller.add(RawSocketEvent.WRITE); |
| + } |
| + } |
| + |
| + Future<bool> _pushAllFilterStages() { |
| + if (_filterService == null) { |
| + _filterService = _SecureFilter._newServicePort(); |
| + } |
| + List args = [_filterPointer, _status != CONNECTED]; |
| + var bufs = _secureFilter.buffers; |
| + for (var i = 0; i < NUM_BUFFERS; ++i) { |
| + args.add(bufs[i].start); |
| + args.add(bufs[i].end); |
| + } |
| + |
| + return _filterService.call(args).then((positions) { |
| + // Compute writeEmpty as "write plaintext buffer and write encrypted |
| + // buffer were empty when we started and are empty now". |
| + _writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty && |
| + positions[2 * WRITE_ENCRYPTED + 2] == |
| + positions[2 * WRITE_ENCRYPTED + 3]; |
| + // If we were in handshake when this started, _writeEmpty may be false |
| + // because the handshake wrote data after we checked. |
| + if (positions[1]) _writeEmpty = false; |
| + |
| + // Compute readEmpty as "both read buffers were empty when we started |
| + // and are empty now". |
| + _readEmpty = bufs[READ_ENCRYPTED].isEmpty && |
| + positions[2 * READ_PLAINTEXT + 2] == |
| + positions[2 * READ_PLAINTEXT + 3]; |
| + |
| + bool progress = false; |
| + ExternalBuffer buffer = bufs[WRITE_PLAINTEXT]; |
| + int new_start = positions[2 * WRITE_PLAINTEXT + 2]; |
| + if (new_start != buffer.start) { |
| + progress = true; |
| + // If WRITE_PLAINTEXT goes from full to not full: |
| + if (buffer.free == 0) { |
| + buffer.start = new_start; |
| + _sendWriteEvent(); |
| + } else { |
| + buffer.start = new_start; |
| } |
| - } else { |
| - var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; |
| - if (plaintext.length > 0) { |
| - int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT); |
| - plaintext.advanceStart(plaintext_bytes); |
| + } |
| + buffer = bufs[READ_ENCRYPTED]; |
| + new_start = positions[2 * READ_ENCRYPTED + 2]; |
| + if (new_start != buffer.start) { |
| + progress = true; |
| + // If READ_ENCRYPTED goes from full to not full: |
| + if (buffer.free == 0) { |
| + buffer.start = new_start; |
| + _readSocket(); |
| + } else { |
| + buffer.start = new_start; |
| } |
| - int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED); |
| - if (bytes <= 0) { |
| - // We know the WRITE_ENCRYPTED buffer is empty, and the |
| - // filter wrote zero bytes to it, so the filter must be empty. |
| - // Also, the WRITE_PLAINTEXT buffer must have been empty, or |
| - // it would have written to the filter. |
| - // TODO(whesse): Verify that the filter works this way. |
| - _filterWriteEmpty = true; |
| - break; |
| + } |
| + buffer = bufs[WRITE_ENCRYPTED]; |
| + int new_end = positions[2 * WRITE_ENCRYPTED + 3]; |
| + if (new_end != buffer.end) { |
| + progress = true; |
| + // If WRITE_ENCRYPTED goes from empty to non-empty: |
| + if (buffer.length == 0) { |
| + buffer.end = new_end; |
| + _writeSocket(); |
| + } else { |
| + buffer.end = new_end; |
| } |
| - encrypted.length += bytes; |
| } |
| - } |
| + buffer = bufs[READ_PLAINTEXT]; |
| + new_end = positions[2 * READ_PLAINTEXT + 3]; |
| + if (new_end != buffer.end) { |
| + progress = true; |
| + // If READ_PLAINTEXT goes from empty to non-empty: |
| + if (buffer.length == 0) { |
| + buffer.end = new_end; |
| + _scheduleReadEvent(); |
| + } else { |
| + buffer.end = new_end; |
| + } |
| + } |
| + return progress; |
| + }); |
| } |
| } |
| +/** |
| + * A circular buffer backed by an external byte array. Accessed from |
| + * both C++ and Dart code in an unsynchronized way, with one reading |
| + * and one writing. All updates to start and end are done by Dart code. |
| + */ |
| class _ExternalBuffer { |
| - // Performance is improved if a full buffer of plaintext fits |
| - // in the encrypted buffer, when encrypted. |
| - static final int SIZE = 8 * 1024; |
| - static final int ENCRYPTED_SIZE = 10 * 1024; |
| - _ExternalBuffer() : start = 0, length = 0; |
| + _ExternalBuffer(this.size) : start = 0, end = 0; |
| + |
| + void advanceStart(int bytes) { |
| + assert(start > end || start + bytes <= end); |
| + start += bytes; |
| + if (start >= size) { |
| + start -= size; |
| + assert(start <= end); |
| + assert(start < size); |
| + } |
| + } |
| - // TODO(whesse): Consider making this a circular buffer. Only if it helps. |
| - void advanceStart(int numBytes) { |
| - start += numBytes; |
| - length -= numBytes; |
| - if (length == 0) { |
| - start = 0; |
| + void advanceEnd(int bytes) { |
| + assert(start <= end || start > end + bytes); |
| + end += bytes; |
| + if (end >= size) { |
| + end -= size; |
| + assert(end < start); |
| + assert(end < size); |
| } |
| } |
| - int get free => data.length - (start + length); |
| + bool get isEmpty => end == start; |
| + |
| + int get length { |
| + if (start > end) return size + end - start; |
| + return end - start; |
| + } |
| + |
| + int get linearLength { |
| + if (start > end) return size - start; |
| + return end - start; |
| + } |
| + |
| + int get free { |
| + if (start > end) return start - end - 1; |
| + return size + start - end - 1; |
| + } |
| + |
| + int get linearFree { |
| + if (start > end) return start - end - 1; |
| + if (start == 0) return size - end - 1; |
| + return size - end; |
| + } |
| + |
| List data; // This will be a ExternalByteArray, backed by C allocated data. |
| int start; |
| - int length; |
| + int end; |
| + int size; |
| } |
| abstract class _SecureFilter { |
| external factory _SecureFilter(); |
| + external static SendPort _newServicePort(); |
| + |
| void connect(String hostName, |
| int port, |
| bool is_server, |
| @@ -977,6 +1094,7 @@ abstract class _SecureFilter { |
| int processBuffer(int bufferIndex); |
| void registerBadCertificateCallback(Function callback); |
| void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); |
| + int _pointer(); |
| List<_ExternalBuffer> get buffers; |
| } |