Index: sdk/lib/io/secure_socket.dart |
diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart |
index d11b1028789782e5b438b6add5b5d5a57e79b987..a8b0eaafacf7ff0ba81e3d648d944444af1adcc9 100644 |
--- a/sdk/lib/io/secure_socket.dart |
+++ b/sdk/lib/io/secure_socket.dart |
@@ -358,10 +358,23 @@ class X509Certificate { |
} |
+class _FilterStatus { |
+ bool progress = false; // The filter read or wrote data to the buffers. |
+ bool readEmpty = true; // The read buffers and decryption filter are empty. |
+ bool writeEmpty = true; // The write buffers and encryption filter are empty. |
+ // These are set if a buffer changes state from empty or full. |
+ bool readPlaintextNoLongerEmpty = false; |
+ bool writePlaintextNoLongerFull = false; |
+ bool readEncryptedNoLongerFull = false; |
+ bool writeEncryptedNoLongerEmpty = false; |
+ |
+ _FilterStatus(); |
+} |
+ |
+ |
class _RawSecureSocket extends Stream<RawSocketEvent> |
implements RawSecureSocket { |
// Status states |
- static final int NOT_CONNECTED = 200; |
static final int HANDSHAKE = 201; |
static final int CONNECTED = 202; |
static final int CLOSED = 203; |
@@ -374,6 +387,9 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
static final int WRITE_ENCRYPTED = 3; |
static final int NUM_BUFFERS = 4; |
+ // Is a buffer identifier for an encrypted buffer? |
+ static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED; |
+ |
RawSocket _socket; |
final Completer<_RawSecureSocket> _handshakeComplete = |
new Completer<_RawSecureSocket>(); |
@@ -390,17 +406,23 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
final bool sendClientCertificate; |
final Function onBadCertificate; |
- var _status = NOT_CONNECTED; |
+ var _status = HANDSHAKE; |
bool _writeEventsEnabled = true; |
bool _readEventsEnabled = true; |
+ int _pauseCount = 0; |
+ bool _pendingReadEvent = false; |
bool _socketClosedRead = false; // The network socket is closed for reading. |
bool _socketClosedWrite = false; // The network socket is closed for writing. |
bool _closedRead = false; // The secure socket has fired an onClosed event. |
bool _closedWrite = false; // The secure socket has been closed for writing. |
- bool _filterReadEmpty = true; // There is no buffered data to read. |
- bool _filterWriteEmpty = true; // There is no buffered data to be written. |
+ _FilterStatus _filterStatus = new _FilterStatus(); |
bool _connectPending = false; |
+ bool _filterPending = false; |
+ bool _filterActive = false; |
+ |
_SecureFilter _secureFilter = new _SecureFilter(); |
+ int _filterPointer; |
+ SendPort _filterService; |
static Future<_RawSecureSocket> connect( |
host, |
@@ -464,7 +486,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
// errors will be reported through the future or the stream. |
_verifyFields(); |
_secureFilter.init(); |
- if (_bufferedData != null) _readFromBuffered(); |
+ _filterPointer = _secureFilter._pointer(); |
_secureFilter.registerHandshakeCompleteCallback( |
_secureHandshakeCompleteHandler); |
if (onBadCertificate != null) { |
@@ -501,7 +523,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
requireClientCertificate, |
requireClientCertificate, |
sendClientCertificate); |
- _status = HANDSHAKE; |
_secureHandshake(); |
}) |
.catchError((error) { |
@@ -514,10 +535,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
{void onError(error), |
void onDone(), |
bool cancelOnError}) { |
- if (_writeEventsEnabled) { |
- _writeEventsEnabled = false; |
- _controller.add(RawSocketEvent.WRITE); |
- } |
+ _sendWriteEvent(); |
return _stream.listen(onData, |
onError: onError, |
onDone: onDone, |
@@ -559,7 +577,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
int available() { |
if (_status != CONNECTED) return 0; |
- _readEncryptedData(); |
return _secureFilter.buffers[READ_PLAINTEXT].length; |
} |
@@ -575,7 +592,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
} |
_socketClosedWrite = true; |
_socketClosedRead = true; |
- if (_secureFilter != null) { |
+ if (!_filterActive && _secureFilter != null) { |
_secureFilter.destroy(); |
_secureFilter = null; |
} |
@@ -590,8 +607,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
if (direction == SocketDirection.SEND || |
direction == SocketDirection.BOTH) { |
_closedWrite = true; |
- _writeEncryptedData(); |
- if (_filterWriteEmpty) { |
+ if (_filterStatus.writeEmpty) { |
_socket.shutdown(SocketDirection.SEND); |
_socketClosedWrite = true; |
if (_closedRead) { |
@@ -613,99 +629,60 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
bool get writeEventsEnabled => _writeEventsEnabled; |
void set writeEventsEnabled(bool value) { |
- if (value && |
- _controller.hasListener && |
- _secureFilter != null && |
- _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
- Timer.run(() => _controller.add(RawSocketEvent.WRITE)); |
- } else { |
- _writeEventsEnabled = value; |
+ _writeEventsEnabled = value; |
+ if (value) { |
+ Timer.run(() => _sendWriteEvent()); |
} |
} |
bool get readEventsEnabled => _readEventsEnabled; |
void set readEventsEnabled(bool value) { |
- _readEventsEnabled = value; |
- if (value && |
- ((_secureFilter != null && |
- _secureFilter.buffers[READ_PLAINTEXT].length > 0) || |
- _socketClosedRead)) { |
- // We might not have no underlying socket to set off read events. |
- Timer.run(_readHandler); |
- } |
+ _readEventsEnabled = value; |
+ _scheduleReadEvent(); |
} |
- List<int> read([int len]) { |
+ List<int> read([int length]) { |
+ if (length != null && (length is! int || length < 0)) { |
+ throw new ArgumentError( |
+ "Invalid length parameter in SecureSocket.read (length: $length)"); |
+ } |
if (_closedRead) { |
throw new SocketException("Reading from a closed socket"); |
} |
if (_status != CONNECTED) { |
return null; |
} |
- var buffer = _secureFilter.buffers[READ_PLAINTEXT]; |
- _readEncryptedData(); |
- int toRead = buffer.length; |
- if (len != null) { |
- if (len is! int || len < 0) { |
- throw new ArgumentError( |
- "Invalid len parameter in SecureSocket.read (len: $len)"); |
- } |
- if (len < toRead) { |
- toRead = len; |
- } |
- } |
- List<int> result = (toRead == 0) ? null : |
- buffer.data.sublist(buffer.start, buffer.start + toRead); |
- buffer.advanceStart(toRead); |
- |
- // Set up a read event if the filter still has data. |
- if (!_filterReadEmpty) { |
- Timer.run(_readHandler); |
- } |
- |
- if (_socketClosedRead) { // An onClose event is pending. |
- // _closedRead is false, since we are in a read call. |
- if (!_filterReadEmpty) { |
- // _filterReadEmpty may be out of date since read empties |
- // the plaintext buffer after calling _readEncryptedData. |
- // TODO(whesse): Fix this as part of fixing read. |
- _readEncryptedData(); |
- } |
- if (_filterReadEmpty) { |
- // This can't be an else clause: the value of _filterReadEmpty changes. |
- // This must be asynchronous, because we are in a read call. |
- Timer.run(_closeHandler); |
- } |
- } |
- |
+ var result = _secureFilter.buffers[READ_PLAINTEXT].read(length); |
+ _scheduleFilter(); |
return result; |
} |
- // Write the data to the socket, and flush it as much as possible |
- // until it would block. If the write would block, _writeEncryptedData sets |
- // up handlers to flush the pipeline when possible. |
+ // Write the data to the socket, and schedule the filter to encrypt it. |
int write(List<int> data, [int offset, int bytes]) { |
+ if (bytes != null && (bytes is! int || bytes < 0)) { |
+ throw new ArgumentError( |
+ "Invalid bytes parameter in SecureSocket.read (bytes: $bytes)"); |
+ } |
+ if (offset != null && (offset is! int || offset < 0)) { |
+ throw new ArgumentError( |
+ "Invalid offset parameter in SecureSocket.read (offset: $offset)"); |
+ } |
if (_closedWrite) { |
_controller.addError(new SocketException("Writing to a closed socket")); |
return 0; |
} |
if (_status != CONNECTED) return 0; |
- |
if (offset == null) offset = 0; |
if (bytes == null) bytes = data.length - offset; |
- var buffer = _secureFilter.buffers[WRITE_PLAINTEXT]; |
- if (bytes > buffer.free) { |
- bytes = buffer.free; |
- } |
- if (bytes > 0) { |
- int startIndex = buffer.start + buffer.length; |
- buffer.data.setRange(startIndex, startIndex + bytes, data, offset); |
- buffer.length += bytes; |
+ int written = |
+ _secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes); |
+ if (written > 0) { |
+ _filterStatus.writeEmpty = false; |
} |
- _writeEncryptedData(); // Tries to flush all pipeline stages. |
- return bytes; |
+ _scheduleFilter(); |
+ return written; |
} |
X509Certificate get peerCertificate => _secureFilter.peerCertificate; |
@@ -715,28 +692,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
return _socket.setOption(option, enabled); |
} |
- void _writeHandler() { |
- if (_status == CLOSED) return; |
- _writeEncryptedData(); |
- if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) { |
- // Close _socket for write, by calling shutdown(), to avoid cloning the |
- // socket closing code in shutdown(). |
- shutdown(SocketDirection.SEND); |
- } |
- if (_status == HANDSHAKE) { |
- try { |
- _secureHandshake(); |
- } catch (e) { _reportError(e, "RawSecureSocket error"); } |
- } else if (_status == CONNECTED && |
- _controller.hasListener && |
- _writeEventsEnabled && |
- _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
- // Reset the one-shot handler. |
- _writeEventsEnabled = false; |
- _controller.add(RawSocketEvent.WRITE); |
- } |
- } |
- |
void _eventDispatcher(RawSocketEvent event) { |
if (event == RawSocketEvent.READ) { |
_readHandler(); |
@@ -747,56 +702,18 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
} |
} |
- void _readFromBuffered() { |
- assert(_bufferedData != null); |
- var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; |
- var bytes = _bufferedData.length - _bufferedDataIndex; |
- int startIndex = encrypted.start + encrypted.length; |
- encrypted.data.setRange(startIndex, |
- startIndex + bytes, |
- _bufferedData, |
- _bufferedDataIndex); |
- encrypted.length += bytes; |
- _bufferedDataIndex += bytes; |
- if (_bufferedData.length == _bufferedDataIndex) { |
- _bufferedData = null; |
- } |
+ void _readHandler() { |
+ _readSocket(); |
+ _scheduleFilter(); |
} |
- void _readHandler() { |
- if (_status == CLOSED) { |
- return; |
- } else if (_status == HANDSHAKE) { |
- try { |
- _secureHandshake(); |
- if (_status != HANDSHAKE) _readHandler(); |
- } catch (e) { _reportError(e, "RawSecureSocket error"); } |
- } else { |
- if (_status != CONNECTED) { |
- // Cannot happen. |
- throw new SocketException("Internal SocketIO Error"); |
- } |
- try { |
- _readEncryptedData(); |
- } catch (e) { _reportError(e, "RawSecureSocket error"); } |
- if (!_filterReadEmpty) { |
- if (_readEventsEnabled) { |
- if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) { |
- _controller.add(RawSocketEvent.READ); |
- } |
- if (_socketClosedRead) { |
- // Keep firing read events until we are paused or buffer is empty. |
- Timer.run(_readHandler); |
- } |
- } |
- } else if (_socketClosedRead) { |
- _closeHandler(); |
- } |
- } |
+ void _writeHandler() { |
+ _writeSocket(); |
+ _scheduleFilter(); |
} |
void _doneHandler() { |
- if (_filterReadEmpty) { |
+ if (_filterStatus.readEmpty) { |
_close(); |
} |
} |
@@ -826,38 +743,59 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
if (_status == CONNECTED) { |
if (_closedRead) return; |
_socketClosedRead = true; |
- if (_filterReadEmpty) { |
+ if (_filterStatus.readEmpty) { |
_closedRead = true; |
_controller.add(RawSocketEvent.READ_CLOSED); |
if (_socketClosedWrite) { |
_close(); |
} |
+ } else { |
+ _scheduleFilter(); |
} |
} else if (_status == HANDSHAKE) { |
+ _socketClosedRead = true; |
+ if (_filterStatus.readEmpty) { |
_reportError( |
new SocketException('Connection terminated during handshake'), |
- 'handshake error'); |
+ 'RawSecureSocket error'); |
+ } else { |
+ _secureHandshake(); |
+ } |
} |
} |
void _secureHandshake() { |
- _readEncryptedData(); |
- _secureFilter.handshake(); |
- _writeEncryptedData(); |
+ try { |
+ _secureFilter.handshake(); |
+ _filterStatus.writeEmpty = false; |
+ _readSocket(); |
+ _writeSocket(); |
+ _scheduleFilter(); |
+ } catch (e) { |
+ _reportError(e, "RawSecureSocket error"); |
+ } |
} |
void _secureHandshakeCompleteHandler() { |
_status = CONNECTED; |
if (_connectPending) { |
_connectPending = false; |
- // If we complete the future synchronously, user code will run here, |
- // and modify the state of the RawSecureSocket. For example, it |
- // could close the socket, and set _filter to null. |
+ // We don't want user code to run synchronously in this callback. |
Timer.run(() => _handshakeComplete.complete(this)); |
} |
} |
void _onPauseStateChange() { |
+ if (_controller.isPaused) { |
+ _pauseCount++; |
+ } else { |
+ _pauseCount--; |
+ if (_pauseCount == 0) { |
+ _scheduleReadEvent(); |
+ _sendWriteEvent(); // Can send event synchronously. |
+ } |
+ } |
+ |
if (!_socketClosedRead || !_socketClosedWrite) { |
if (_controller.isPaused) { |
_socketSubscription.pause(); |
@@ -873,120 +811,337 @@ class _RawSecureSocket extends Stream<RawSocketEvent> |
} |
} |
- void _readEncryptedData() { |
- // Read from the socket, and push it through the filter as far as |
- // possible. |
- var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; |
- var plaintext = _secureFilter.buffers[READ_PLAINTEXT]; |
- bool progress = true; |
- while (progress) { |
- progress = false; |
- // Do not try to read plaintext from the filter while handshaking. |
- if ((_status == CONNECTED) && plaintext.free > 0) { |
- int bytes = _secureFilter.processBuffer(READ_PLAINTEXT); |
- if (bytes > 0) { |
- plaintext.length += bytes; |
- progress = true; |
+ void _scheduleFilter() { |
+ _filterPending = true; |
+ _tryFilter(); |
+ } |
+ |
+ void _tryFilter() { |
+ if (_status == CLOSED) return; |
+ if (_filterPending && !_filterActive) { |
+ _filterActive = true; |
+ _filterPending = false; |
+ _pushAllFilterStages().then((status) { |
+ _filterStatus = status; |
+ _filterActive = false; |
+ if (_status == CLOSED) { |
+ _secureFilter.destroy(); |
+ _secureFilter = null; |
+ return; |
} |
- } |
- if (encrypted.length > 0) { |
- int bytes = _secureFilter.processBuffer(READ_ENCRYPTED); |
- if (bytes > 0) { |
- encrypted.advanceStart(bytes); |
- progress = true; |
+ if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) { |
+ // Checks for and handles all cases of partially closed sockets. |
+ shutdown(SocketDirection.SEND); |
+ if (_status == CLOSED) return; |
} |
- } |
- if (!_socketClosedRead && encrypted.free > 0) { |
- if (_bufferedData != null) { |
- _readFromBuffered(); |
- progress = true; |
- } else { |
- List<int> data = _socket.read(encrypted.free); |
- if (data != null) { |
- int bytes = data.length; |
- int startIndex = encrypted.start + encrypted.length; |
- encrypted.data.setRange(startIndex, startIndex + bytes, data); |
- encrypted.length += bytes; |
- progress = true; |
+ if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) { |
+ if (_status == HANDSHAKE) { |
+ _secureFilter.handshake(); |
+ if (_status == HANDSHAKE) { |
+ _reportError( |
+ new SocketException('Connection terminated during handshake'), |
+ 'RawSecureSocket error'); |
+ } |
} |
+ _closeHandler(); |
} |
+ if (_status == CLOSED) return; |
+ if (_filterStatus.progress) { |
+ _filterPending = true; |
+ if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent(); |
+ if (_filterStatus.readEncryptedNoLongerFull) _readSocket(); |
+ if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket(); |
+ if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent(); |
+ if (_status == HANDSHAKE) _secureHandshake(); |
+ } |
+ _tryFilter(); |
+ }); |
+ } |
+ } |
+ |
+ List<int> _readSocketOrBufferedData(int bytes) { |
+ if (_bufferedData != null) { |
+ if (bytes > _bufferedData.length - _bufferedDataIndex) { |
+ bytes = _bufferedData.length - _bufferedDataIndex; |
+ } |
+ var result = _bufferedData.sublist(_bufferedDataIndex, |
+ _bufferedDataIndex + bytes); |
+ _bufferedDataIndex += bytes; |
+ if (_bufferedData.length == _bufferedDataIndex) { |
+ _bufferedData = null; |
+ } |
+ return result; |
+ } else if (!_socketClosedRead) { |
+ try { |
+ return _socket.read(bytes); |
+ } catch (e) { |
+ _reportError(e, "RawSecureSocket error reading encrypted socket"); |
+ return null; |
} |
+ } else { |
+ return null; |
} |
- // If there is any data in any stages of the filter, there should |
- // be data in the plaintext buffer after this process. |
- // TODO(whesse): Verify that this is true, and there can be no |
- // partial encrypted block stuck in the secureFilter. |
- _filterReadEmpty = (plaintext.length == 0); |
} |
- void _writeEncryptedData() { |
+ void _readSocket() { |
+ if (_status == CLOSED) return; |
+ var buffer = _secureFilter.buffers[READ_ENCRYPTED]; |
+ if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) { |
+ _filterStatus.readEmpty = false; |
+ } |
+ } |
+ |
+ void _writeSocket() { |
if (_socketClosedWrite) return; |
- var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; |
- var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; |
- while (true) { |
- if (encrypted.length > 0) { |
- // Write from the filter to the socket. |
- int bytes = _socket.write(encrypted.data, |
- encrypted.start, |
- encrypted.length); |
- encrypted.advanceStart(bytes); |
- if (encrypted.length > 0) { |
- // The socket has blocked while we have data to write. |
- // We must be notified when it becomes unblocked. |
- _socket.writeEventsEnabled = true; |
- _filterWriteEmpty = false; |
- break; |
+ var buffer = _secureFilter.buffers[WRITE_ENCRYPTED]; |
+ if (buffer.readToSocket(_socket)) { // Returns true if blocked |
+ _socket.writeEventsEnabled = true; |
+ } |
+ } |
+ |
+ // If a read event should be sent, add it to the controller. |
+ _scheduleReadEvent() { |
+ if (!_pendingReadEvent && |
+ _readEventsEnabled && |
+ _pauseCount == 0 && |
+ _secureFilter != null && |
+ !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
+ _pendingReadEvent = true; |
+ Timer.run(_sendReadEvent); |
+ } |
+ } |
+ |
+ _sendReadEvent() { |
+ _pendingReadEvent = false; |
+ if (_readEventsEnabled && |
+ _pauseCount == 0 && |
+ _secureFilter != null && |
+ !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
+ _controller.add(RawSocketEvent.READ); |
+ _scheduleReadEvent(); |
+ } |
+ } |
+ |
+ // If a write event should be sent, add it to the controller. |
+ _sendWriteEvent() { |
+ if (!_closedWrite && |
+ _writeEventsEnabled && |
+ _pauseCount == 0 && |
+ _secureFilter != null && |
+ _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
+ _writeEventsEnabled = false; |
+ _controller.add(RawSocketEvent.WRITE); |
+ } |
+ } |
+ |
+ Future<_FilterStatus> _pushAllFilterStages() { |
+ if (_filterService == null) { |
+ _filterService = _SecureFilter._newServicePort(); |
+ } |
+ List args = [_filterPointer, _status != CONNECTED]; |
+ var bufs = _secureFilter.buffers; |
+ for (var i = 0; i < NUM_BUFFERS; ++i) { |
+ args.add(bufs[i].start); |
+ args.add(bufs[i].end); |
+ } |
+ |
+ return _filterService.call(args).then((response) { |
+ bool wasInHandshake = response[1]; |
+ int start(int index) => response[2 * index + 2]; |
+ int end(int index) => response[2 * index + 3]; |
+ |
+ _FilterStatus status = new _FilterStatus(); |
+ // Compute writeEmpty as "write plaintext buffer and write encrypted |
+ // buffer were empty when we started and are empty now". |
+ status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty && |
+ start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED); |
+ // If we were in handshake when this started, _writeEmpty may be false |
+ // because the handshake wrote data after we checked. |
+ if (wasInHandshake) status.writeEmpty = false; |
+ |
+ // Compute readEmpty as "both read buffers were empty when we started |
+ // and are empty now". |
+ status.readEmpty = bufs[READ_ENCRYPTED].isEmpty && |
+ start(READ_PLAINTEXT) == end(READ_PLAINTEXT); |
+ |
+ _ExternalBuffer buffer = bufs[WRITE_PLAINTEXT]; |
+ int new_start = start(WRITE_PLAINTEXT); |
+ if (new_start != buffer.start) { |
+ status.progress = true; |
+ if (buffer.free == 0) { |
+ status.writePlaintextNoLongerFull = true; |
} |
- } else { |
- var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; |
- if (plaintext.length > 0) { |
- int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT); |
- plaintext.advanceStart(plaintext_bytes); |
+ buffer.start = new_start; |
+ } |
+ buffer = bufs[READ_ENCRYPTED]; |
+ new_start = start(READ_ENCRYPTED); |
+ if (new_start != buffer.start) { |
+ status.progress = true; |
+ if (buffer.free == 0) { |
+ status.readEncryptedNoLongerFull = true; |
} |
- int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED); |
- if (bytes <= 0) { |
- // We know the WRITE_ENCRYPTED buffer is empty, and the |
- // filter wrote zero bytes to it, so the filter must be empty. |
- // Also, the WRITE_PLAINTEXT buffer must have been empty, or |
- // it would have written to the filter. |
- // TODO(whesse): Verify that the filter works this way. |
- _filterWriteEmpty = true; |
- break; |
+ buffer.start = new_start; |
+ } |
+ buffer = bufs[WRITE_ENCRYPTED]; |
+ int new_end = end(WRITE_ENCRYPTED); |
+ if (new_end != buffer.end) { |
+ status.progress = true; |
+ if (buffer.length == 0) { |
+ status.writeEncryptedNoLongerEmpty = true; |
} |
- encrypted.length += bytes; |
+ buffer.end = new_end; |
} |
- } |
+ buffer = bufs[READ_PLAINTEXT]; |
+ new_end = end(READ_PLAINTEXT); |
+ if (new_end != buffer.end) { |
+ status.progress = true; |
+ if (buffer.length == 0) { |
+ status.readPlaintextNoLongerEmpty = true; |
+ } |
+ buffer.end = new_end; |
+ } |
+ return status; |
+ }); |
} |
} |
+/** |
+ * A circular buffer backed by an external byte array. Accessed from |
+ * both C++ and Dart code in an unsynchronized way, with one reading |
+ * and one writing. All updates to start and end are done by Dart code. |
+ */ |
class _ExternalBuffer { |
- // Performance is improved if a full buffer of plaintext fits |
- // in the encrypted buffer, when encrypted. |
- static final int SIZE = 8 * 1024; |
- static final int ENCRYPTED_SIZE = 10 * 1024; |
- _ExternalBuffer() : start = 0, length = 0; |
+ _ExternalBuffer(this.size) { |
+ start = size~/2; |
+ end = size~/2; |
+ } |
+ |
+ void advanceStart(int bytes) { |
+ assert(start > end || start + bytes <= end); |
+ start += bytes; |
+ if (start >= size) { |
+ start -= size; |
+ assert(start <= end); |
+ assert(start < size); |
+ } |
+ } |
+ |
+ void advanceEnd(int bytes) { |
+ assert(start <= end || start > end + bytes); |
+ end += bytes; |
+ if (end >= size) { |
+ end -= size; |
+ assert(end < start); |
+ assert(end < size); |
+ } |
+ } |
+ |
+ bool get isEmpty => end == start; |
+ |
+ int get length { |
+ if (start > end) return size + end - start; |
+ return end - start; |
+ } |
+ |
+ int get linearLength { |
+ if (start > end) return size - start; |
+ return end - start; |
+ } |
+ |
+ int get free { |
+ if (start > end) return start - end - 1; |
+ return size + start - end - 1; |
+ } |
+ |
+ int get linearFree { |
+ if (start > end) return start - end - 1; |
+ if (start == 0) return size - end - 1; |
+ return size - end; |
+ } |
+ |
+ List<int> read(int bytes) { |
+ if (bytes == null) { |
+ bytes = length; |
+ } else { |
+ bytes = min(bytes, length); |
+ } |
+ if (bytes == 0) return null; |
+ List<int> result = new Uint8List(bytes); |
+ int bytesRead = 0; |
+ // Loop over zero, one, or two linear data ranges. |
+ while (bytesRead < bytes) { |
+ int toRead = linearLength; |
+ result.setRange(bytesRead, |
+ bytesRead + toRead, |
+ data, |
+ start); |
+ advanceStart(toRead); |
+ bytesRead += toRead; |
+ } |
+ return result; |
+ } |
- // TODO(whesse): Consider making this a circular buffer. Only if it helps. |
- void advanceStart(int numBytes) { |
- start += numBytes; |
- length -= numBytes; |
- if (length == 0) { |
- start = 0; |
+ int write(List<int> inputData, int offset, int bytes) { |
+ if (bytes > free) { |
+ bytes = free; |
} |
+ int written = 0; |
+ int toWrite = min(bytes, linearFree); |
+ // Loop over zero, one, or two linear data ranges. |
+ while (toWrite > 0) { |
+ data.setRange(end, end + toWrite, inputData, offset); |
+ advanceEnd(toWrite); |
+ offset += toWrite; |
+ written += toWrite; |
+ toWrite = min(bytes - written, linearFree); |
+ } |
+ return written; |
} |
- int get free => data.length - (start + length); |
+ int writeFromSource(List<int> getData(int requested)) { |
+ int written = 0; |
+ int toWrite = linearFree; |
+ // Loop over zero, one, or two linear data ranges. |
+ while (toWrite > 0) { |
+ // Source returns at most toWrite bytes, and it returns null when empty. |
+ var inputData = getData(toWrite); |
+ if (inputData == null) break; |
+ var len = inputData.length; |
+ data.setRange(end, end + len, inputData); |
+ advanceEnd(len); |
+ written += len; |
+ toWrite = linearFree; |
+ } |
+ return written; |
+ } |
+ |
+ bool readToSocket(RawSocket socket) { |
+ // Loop over zero, one, or two linear data ranges. |
+ while (true) { |
+ var toWrite = linearLength; |
+ if (toWrite == 0) return false; |
+ int bytes = socket.write(data, start, toWrite); |
+ advanceStart(bytes); |
+ if (bytes < toWrite) { |
+ // The socket has blocked while we have data to write. |
+ return true; |
+ } |
+ } |
+ } |
List data; // This will be a ExternalByteArray, backed by C allocated data. |
int start; |
- int length; |
+ int end; |
+ final size; |
} |
abstract class _SecureFilter { |
external factory _SecureFilter(); |
+ external static SendPort _newServicePort(); |
+ |
void connect(String hostName, |
Uint8List addr, |
int port, |
@@ -1002,6 +1157,7 @@ abstract class _SecureFilter { |
int processBuffer(int bufferIndex); |
void registerBadCertificateCallback(Function callback); |
void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); |
+ int _pointer(); |
List<_ExternalBuffer> get buffers; |
} |