Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1437)

Unified Diff: sdk/lib/io/secure_socket.dart

Issue 16858011: dart:io | Enable multithreaded secure networking encryption. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address all comments Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/secure_socket.dart
diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart
index d11b1028789782e5b438b6add5b5d5a57e79b987..a8b0eaafacf7ff0ba81e3d648d944444af1adcc9 100644
--- a/sdk/lib/io/secure_socket.dart
+++ b/sdk/lib/io/secure_socket.dart
@@ -358,10 +358,23 @@ class X509Certificate {
}
+class _FilterStatus {
+ bool progress = false; // The filter read or wrote data to the buffers.
+ bool readEmpty = true; // The read buffers and decryption filter are empty.
+ bool writeEmpty = true; // The write buffers and encryption filter are empty.
+ // These are set if a buffer changes state from empty or full.
+ bool readPlaintextNoLongerEmpty = false;
+ bool writePlaintextNoLongerFull = false;
+ bool readEncryptedNoLongerFull = false;
+ bool writeEncryptedNoLongerEmpty = false;
+
+ _FilterStatus();
+}
+
+
class _RawSecureSocket extends Stream<RawSocketEvent>
implements RawSecureSocket {
// Status states
- static final int NOT_CONNECTED = 200;
static final int HANDSHAKE = 201;
static final int CONNECTED = 202;
static final int CLOSED = 203;
@@ -374,6 +387,9 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
static final int WRITE_ENCRYPTED = 3;
static final int NUM_BUFFERS = 4;
+ // Is a buffer identifier for an encrypted buffer?
+ static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED;
+
RawSocket _socket;
final Completer<_RawSecureSocket> _handshakeComplete =
new Completer<_RawSecureSocket>();
@@ -390,17 +406,23 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
final bool sendClientCertificate;
final Function onBadCertificate;
- var _status = NOT_CONNECTED;
+ var _status = HANDSHAKE;
bool _writeEventsEnabled = true;
bool _readEventsEnabled = true;
+ int _pauseCount = 0;
+ bool _pendingReadEvent = false;
bool _socketClosedRead = false; // The network socket is closed for reading.
bool _socketClosedWrite = false; // The network socket is closed for writing.
bool _closedRead = false; // The secure socket has fired an onClosed event.
bool _closedWrite = false; // The secure socket has been closed for writing.
- bool _filterReadEmpty = true; // There is no buffered data to read.
- bool _filterWriteEmpty = true; // There is no buffered data to be written.
+ _FilterStatus _filterStatus = new _FilterStatus();
bool _connectPending = false;
+ bool _filterPending = false;
+ bool _filterActive = false;
+
_SecureFilter _secureFilter = new _SecureFilter();
+ int _filterPointer;
+ SendPort _filterService;
static Future<_RawSecureSocket> connect(
host,
@@ -464,7 +486,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
// errors will be reported through the future or the stream.
_verifyFields();
_secureFilter.init();
- if (_bufferedData != null) _readFromBuffered();
+ _filterPointer = _secureFilter._pointer();
_secureFilter.registerHandshakeCompleteCallback(
_secureHandshakeCompleteHandler);
if (onBadCertificate != null) {
@@ -501,7 +523,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
requireClientCertificate,
requireClientCertificate,
sendClientCertificate);
- _status = HANDSHAKE;
_secureHandshake();
})
.catchError((error) {
@@ -514,10 +535,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
{void onError(error),
void onDone(),
bool cancelOnError}) {
- if (_writeEventsEnabled) {
- _writeEventsEnabled = false;
- _controller.add(RawSocketEvent.WRITE);
- }
+ _sendWriteEvent();
return _stream.listen(onData,
onError: onError,
onDone: onDone,
@@ -559,7 +577,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
int available() {
if (_status != CONNECTED) return 0;
- _readEncryptedData();
return _secureFilter.buffers[READ_PLAINTEXT].length;
}
@@ -575,7 +592,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
_socketClosedWrite = true;
_socketClosedRead = true;
- if (_secureFilter != null) {
+ if (!_filterActive && _secureFilter != null) {
_secureFilter.destroy();
_secureFilter = null;
}
@@ -590,8 +607,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
if (direction == SocketDirection.SEND ||
direction == SocketDirection.BOTH) {
_closedWrite = true;
- _writeEncryptedData();
- if (_filterWriteEmpty) {
+ if (_filterStatus.writeEmpty) {
_socket.shutdown(SocketDirection.SEND);
_socketClosedWrite = true;
if (_closedRead) {
@@ -613,99 +629,60 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
bool get writeEventsEnabled => _writeEventsEnabled;
void set writeEventsEnabled(bool value) {
- if (value &&
- _controller.hasListener &&
- _secureFilter != null &&
- _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
- Timer.run(() => _controller.add(RawSocketEvent.WRITE));
- } else {
- _writeEventsEnabled = value;
+ _writeEventsEnabled = value;
+ if (value) {
+ Timer.run(() => _sendWriteEvent());
}
}
bool get readEventsEnabled => _readEventsEnabled;
void set readEventsEnabled(bool value) {
- _readEventsEnabled = value;
- if (value &&
- ((_secureFilter != null &&
- _secureFilter.buffers[READ_PLAINTEXT].length > 0) ||
- _socketClosedRead)) {
- // We might not have no underlying socket to set off read events.
- Timer.run(_readHandler);
- }
+ _readEventsEnabled = value;
+ _scheduleReadEvent();
}
- List<int> read([int len]) {
+ List<int> read([int length]) {
+ if (length != null && (length is! int || length < 0)) {
+ throw new ArgumentError(
+ "Invalid length parameter in SecureSocket.read (length: $length)");
+ }
if (_closedRead) {
throw new SocketException("Reading from a closed socket");
}
if (_status != CONNECTED) {
return null;
}
- var buffer = _secureFilter.buffers[READ_PLAINTEXT];
- _readEncryptedData();
- int toRead = buffer.length;
- if (len != null) {
- if (len is! int || len < 0) {
- throw new ArgumentError(
- "Invalid len parameter in SecureSocket.read (len: $len)");
- }
- if (len < toRead) {
- toRead = len;
- }
- }
- List<int> result = (toRead == 0) ? null :
- buffer.data.sublist(buffer.start, buffer.start + toRead);
- buffer.advanceStart(toRead);
-
- // Set up a read event if the filter still has data.
- if (!_filterReadEmpty) {
- Timer.run(_readHandler);
- }
-
- if (_socketClosedRead) { // An onClose event is pending.
- // _closedRead is false, since we are in a read call.
- if (!_filterReadEmpty) {
- // _filterReadEmpty may be out of date since read empties
- // the plaintext buffer after calling _readEncryptedData.
- // TODO(whesse): Fix this as part of fixing read.
- _readEncryptedData();
- }
- if (_filterReadEmpty) {
- // This can't be an else clause: the value of _filterReadEmpty changes.
- // This must be asynchronous, because we are in a read call.
- Timer.run(_closeHandler);
- }
- }
-
+ var result = _secureFilter.buffers[READ_PLAINTEXT].read(length);
+ _scheduleFilter();
return result;
}
- // Write the data to the socket, and flush it as much as possible
- // until it would block. If the write would block, _writeEncryptedData sets
- // up handlers to flush the pipeline when possible.
+ // Write the data to the socket, and schedule the filter to encrypt it.
int write(List<int> data, [int offset, int bytes]) {
+ if (bytes != null && (bytes is! int || bytes < 0)) {
+ throw new ArgumentError(
+ "Invalid bytes parameter in SecureSocket.read (bytes: $bytes)");
+ }
+ if (offset != null && (offset is! int || offset < 0)) {
+ throw new ArgumentError(
+ "Invalid offset parameter in SecureSocket.read (offset: $offset)");
+ }
if (_closedWrite) {
_controller.addError(new SocketException("Writing to a closed socket"));
return 0;
}
if (_status != CONNECTED) return 0;
-
if (offset == null) offset = 0;
if (bytes == null) bytes = data.length - offset;
- var buffer = _secureFilter.buffers[WRITE_PLAINTEXT];
- if (bytes > buffer.free) {
- bytes = buffer.free;
- }
- if (bytes > 0) {
- int startIndex = buffer.start + buffer.length;
- buffer.data.setRange(startIndex, startIndex + bytes, data, offset);
- buffer.length += bytes;
+ int written =
+ _secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes);
+ if (written > 0) {
+ _filterStatus.writeEmpty = false;
}
- _writeEncryptedData(); // Tries to flush all pipeline stages.
- return bytes;
+ _scheduleFilter();
+ return written;
}
X509Certificate get peerCertificate => _secureFilter.peerCertificate;
@@ -715,28 +692,6 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
return _socket.setOption(option, enabled);
}
- void _writeHandler() {
- if (_status == CLOSED) return;
- _writeEncryptedData();
- if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) {
- // Close _socket for write, by calling shutdown(), to avoid cloning the
- // socket closing code in shutdown().
- shutdown(SocketDirection.SEND);
- }
- if (_status == HANDSHAKE) {
- try {
- _secureHandshake();
- } catch (e) { _reportError(e, "RawSecureSocket error"); }
- } else if (_status == CONNECTED &&
- _controller.hasListener &&
- _writeEventsEnabled &&
- _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
- // Reset the one-shot handler.
- _writeEventsEnabled = false;
- _controller.add(RawSocketEvent.WRITE);
- }
- }
-
void _eventDispatcher(RawSocketEvent event) {
if (event == RawSocketEvent.READ) {
_readHandler();
@@ -747,56 +702,18 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
}
- void _readFromBuffered() {
- assert(_bufferedData != null);
- var encrypted = _secureFilter.buffers[READ_ENCRYPTED];
- var bytes = _bufferedData.length - _bufferedDataIndex;
- int startIndex = encrypted.start + encrypted.length;
- encrypted.data.setRange(startIndex,
- startIndex + bytes,
- _bufferedData,
- _bufferedDataIndex);
- encrypted.length += bytes;
- _bufferedDataIndex += bytes;
- if (_bufferedData.length == _bufferedDataIndex) {
- _bufferedData = null;
- }
+ void _readHandler() {
+ _readSocket();
+ _scheduleFilter();
}
- void _readHandler() {
- if (_status == CLOSED) {
- return;
- } else if (_status == HANDSHAKE) {
- try {
- _secureHandshake();
- if (_status != HANDSHAKE) _readHandler();
- } catch (e) { _reportError(e, "RawSecureSocket error"); }
- } else {
- if (_status != CONNECTED) {
- // Cannot happen.
- throw new SocketException("Internal SocketIO Error");
- }
- try {
- _readEncryptedData();
- } catch (e) { _reportError(e, "RawSecureSocket error"); }
- if (!_filterReadEmpty) {
- if (_readEventsEnabled) {
- if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) {
- _controller.add(RawSocketEvent.READ);
- }
- if (_socketClosedRead) {
- // Keep firing read events until we are paused or buffer is empty.
- Timer.run(_readHandler);
- }
- }
- } else if (_socketClosedRead) {
- _closeHandler();
- }
- }
+ void _writeHandler() {
+ _writeSocket();
+ _scheduleFilter();
}
void _doneHandler() {
- if (_filterReadEmpty) {
+ if (_filterStatus.readEmpty) {
_close();
}
}
@@ -826,38 +743,59 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
if (_status == CONNECTED) {
if (_closedRead) return;
_socketClosedRead = true;
- if (_filterReadEmpty) {
+ if (_filterStatus.readEmpty) {
_closedRead = true;
_controller.add(RawSocketEvent.READ_CLOSED);
if (_socketClosedWrite) {
_close();
}
+ } else {
+ _scheduleFilter();
}
} else if (_status == HANDSHAKE) {
+ _socketClosedRead = true;
+ if (_filterStatus.readEmpty) {
_reportError(
new SocketException('Connection terminated during handshake'),
- 'handshake error');
+ 'RawSecureSocket error');
+ } else {
+ _secureHandshake();
+ }
}
}
void _secureHandshake() {
- _readEncryptedData();
- _secureFilter.handshake();
- _writeEncryptedData();
+ try {
+ _secureFilter.handshake();
+ _filterStatus.writeEmpty = false;
+ _readSocket();
+ _writeSocket();
+ _scheduleFilter();
+ } catch (e) {
+ _reportError(e, "RawSecureSocket error");
+ }
}
void _secureHandshakeCompleteHandler() {
_status = CONNECTED;
if (_connectPending) {
_connectPending = false;
- // If we complete the future synchronously, user code will run here,
- // and modify the state of the RawSecureSocket. For example, it
- // could close the socket, and set _filter to null.
+ // We don't want user code to run synchronously in this callback.
Timer.run(() => _handshakeComplete.complete(this));
}
}
void _onPauseStateChange() {
+ if (_controller.isPaused) {
+ _pauseCount++;
+ } else {
+ _pauseCount--;
+ if (_pauseCount == 0) {
+ _scheduleReadEvent();
+ _sendWriteEvent(); // Can send event synchronously.
+ }
+ }
+
if (!_socketClosedRead || !_socketClosedWrite) {
if (_controller.isPaused) {
_socketSubscription.pause();
@@ -873,120 +811,337 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
}
}
- void _readEncryptedData() {
- // Read from the socket, and push it through the filter as far as
- // possible.
- var encrypted = _secureFilter.buffers[READ_ENCRYPTED];
- var plaintext = _secureFilter.buffers[READ_PLAINTEXT];
- bool progress = true;
- while (progress) {
- progress = false;
- // Do not try to read plaintext from the filter while handshaking.
- if ((_status == CONNECTED) && plaintext.free > 0) {
- int bytes = _secureFilter.processBuffer(READ_PLAINTEXT);
- if (bytes > 0) {
- plaintext.length += bytes;
- progress = true;
+ void _scheduleFilter() {
+ _filterPending = true;
+ _tryFilter();
+ }
+
+ void _tryFilter() {
+ if (_status == CLOSED) return;
+ if (_filterPending && !_filterActive) {
+ _filterActive = true;
+ _filterPending = false;
+ _pushAllFilterStages().then((status) {
+ _filterStatus = status;
+ _filterActive = false;
+ if (_status == CLOSED) {
+ _secureFilter.destroy();
+ _secureFilter = null;
+ return;
}
- }
- if (encrypted.length > 0) {
- int bytes = _secureFilter.processBuffer(READ_ENCRYPTED);
- if (bytes > 0) {
- encrypted.advanceStart(bytes);
- progress = true;
+ if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) {
+ // Checks for and handles all cases of partially closed sockets.
+ shutdown(SocketDirection.SEND);
+ if (_status == CLOSED) return;
}
- }
- if (!_socketClosedRead && encrypted.free > 0) {
- if (_bufferedData != null) {
- _readFromBuffered();
- progress = true;
- } else {
- List<int> data = _socket.read(encrypted.free);
- if (data != null) {
- int bytes = data.length;
- int startIndex = encrypted.start + encrypted.length;
- encrypted.data.setRange(startIndex, startIndex + bytes, data);
- encrypted.length += bytes;
- progress = true;
+ if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) {
+ if (_status == HANDSHAKE) {
+ _secureFilter.handshake();
+ if (_status == HANDSHAKE) {
+ _reportError(
+ new SocketException('Connection terminated during handshake'),
+ 'RawSecureSocket error');
+ }
}
+ _closeHandler();
}
+ if (_status == CLOSED) return;
+ if (_filterStatus.progress) {
+ _filterPending = true;
+ if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent();
+ if (_filterStatus.readEncryptedNoLongerFull) _readSocket();
+ if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket();
+ if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent();
+ if (_status == HANDSHAKE) _secureHandshake();
+ }
+ _tryFilter();
+ });
+ }
+ }
+
+ List<int> _readSocketOrBufferedData(int bytes) {
+ if (_bufferedData != null) {
+ if (bytes > _bufferedData.length - _bufferedDataIndex) {
+ bytes = _bufferedData.length - _bufferedDataIndex;
+ }
+ var result = _bufferedData.sublist(_bufferedDataIndex,
+ _bufferedDataIndex + bytes);
+ _bufferedDataIndex += bytes;
+ if (_bufferedData.length == _bufferedDataIndex) {
+ _bufferedData = null;
+ }
+ return result;
+ } else if (!_socketClosedRead) {
+ try {
+ return _socket.read(bytes);
+ } catch (e) {
+ _reportError(e, "RawSecureSocket error reading encrypted socket");
+ return null;
}
+ } else {
+ return null;
}
- // If there is any data in any stages of the filter, there should
- // be data in the plaintext buffer after this process.
- // TODO(whesse): Verify that this is true, and there can be no
- // partial encrypted block stuck in the secureFilter.
- _filterReadEmpty = (plaintext.length == 0);
}
- void _writeEncryptedData() {
+ void _readSocket() {
+ if (_status == CLOSED) return;
+ var buffer = _secureFilter.buffers[READ_ENCRYPTED];
+ if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) {
+ _filterStatus.readEmpty = false;
+ }
+ }
+
+ void _writeSocket() {
if (_socketClosedWrite) return;
- var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED];
- var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT];
- while (true) {
- if (encrypted.length > 0) {
- // Write from the filter to the socket.
- int bytes = _socket.write(encrypted.data,
- encrypted.start,
- encrypted.length);
- encrypted.advanceStart(bytes);
- if (encrypted.length > 0) {
- // The socket has blocked while we have data to write.
- // We must be notified when it becomes unblocked.
- _socket.writeEventsEnabled = true;
- _filterWriteEmpty = false;
- break;
+ var buffer = _secureFilter.buffers[WRITE_ENCRYPTED];
+ if (buffer.readToSocket(_socket)) { // Returns true if blocked
+ _socket.writeEventsEnabled = true;
+ }
+ }
+
+ // If a read event should be sent, add it to the controller.
+ _scheduleReadEvent() {
+ if (!_pendingReadEvent &&
+ _readEventsEnabled &&
+ _pauseCount == 0 &&
+ _secureFilter != null &&
+ !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
+ _pendingReadEvent = true;
+ Timer.run(_sendReadEvent);
+ }
+ }
+
+ _sendReadEvent() {
+ _pendingReadEvent = false;
+ if (_readEventsEnabled &&
+ _pauseCount == 0 &&
+ _secureFilter != null &&
+ !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
+ _controller.add(RawSocketEvent.READ);
+ _scheduleReadEvent();
+ }
+ }
+
+ // If a write event should be sent, add it to the controller.
+ _sendWriteEvent() {
+ if (!_closedWrite &&
+ _writeEventsEnabled &&
+ _pauseCount == 0 &&
+ _secureFilter != null &&
+ _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
+ _writeEventsEnabled = false;
+ _controller.add(RawSocketEvent.WRITE);
+ }
+ }
+
+ Future<_FilterStatus> _pushAllFilterStages() {
+ if (_filterService == null) {
+ _filterService = _SecureFilter._newServicePort();
+ }
+ List args = [_filterPointer, _status != CONNECTED];
+ var bufs = _secureFilter.buffers;
+ for (var i = 0; i < NUM_BUFFERS; ++i) {
+ args.add(bufs[i].start);
+ args.add(bufs[i].end);
+ }
+
+ return _filterService.call(args).then((response) {
+ bool wasInHandshake = response[1];
+ int start(int index) => response[2 * index + 2];
+ int end(int index) => response[2 * index + 3];
+
+ _FilterStatus status = new _FilterStatus();
+ // Compute writeEmpty as "write plaintext buffer and write encrypted
+ // buffer were empty when we started and are empty now".
+ status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty &&
+ start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED);
+ // If we were in handshake when this started, _writeEmpty may be false
+ // because the handshake wrote data after we checked.
+ if (wasInHandshake) status.writeEmpty = false;
+
+ // Compute readEmpty as "both read buffers were empty when we started
+ // and are empty now".
+ status.readEmpty = bufs[READ_ENCRYPTED].isEmpty &&
+ start(READ_PLAINTEXT) == end(READ_PLAINTEXT);
+
+ _ExternalBuffer buffer = bufs[WRITE_PLAINTEXT];
+ int new_start = start(WRITE_PLAINTEXT);
+ if (new_start != buffer.start) {
+ status.progress = true;
+ if (buffer.free == 0) {
+ status.writePlaintextNoLongerFull = true;
}
- } else {
- var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT];
- if (plaintext.length > 0) {
- int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT);
- plaintext.advanceStart(plaintext_bytes);
+ buffer.start = new_start;
+ }
+ buffer = bufs[READ_ENCRYPTED];
+ new_start = start(READ_ENCRYPTED);
+ if (new_start != buffer.start) {
+ status.progress = true;
+ if (buffer.free == 0) {
+ status.readEncryptedNoLongerFull = true;
}
- int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED);
- if (bytes <= 0) {
- // We know the WRITE_ENCRYPTED buffer is empty, and the
- // filter wrote zero bytes to it, so the filter must be empty.
- // Also, the WRITE_PLAINTEXT buffer must have been empty, or
- // it would have written to the filter.
- // TODO(whesse): Verify that the filter works this way.
- _filterWriteEmpty = true;
- break;
+ buffer.start = new_start;
+ }
+ buffer = bufs[WRITE_ENCRYPTED];
+ int new_end = end(WRITE_ENCRYPTED);
+ if (new_end != buffer.end) {
+ status.progress = true;
+ if (buffer.length == 0) {
+ status.writeEncryptedNoLongerEmpty = true;
}
- encrypted.length += bytes;
+ buffer.end = new_end;
}
- }
+ buffer = bufs[READ_PLAINTEXT];
+ new_end = end(READ_PLAINTEXT);
+ if (new_end != buffer.end) {
+ status.progress = true;
+ if (buffer.length == 0) {
+ status.readPlaintextNoLongerEmpty = true;
+ }
+ buffer.end = new_end;
+ }
+ return status;
+ });
}
}
+/**
+ * A circular buffer backed by an external byte array. Accessed from
+ * both C++ and Dart code in an unsynchronized way, with one reading
+ * and one writing. All updates to start and end are done by Dart code.
+ */
class _ExternalBuffer {
- // Performance is improved if a full buffer of plaintext fits
- // in the encrypted buffer, when encrypted.
- static final int SIZE = 8 * 1024;
- static final int ENCRYPTED_SIZE = 10 * 1024;
- _ExternalBuffer() : start = 0, length = 0;
+ _ExternalBuffer(this.size) {
+ start = size~/2;
+ end = size~/2;
+ }
+
+ void advanceStart(int bytes) {
+ assert(start > end || start + bytes <= end);
+ start += bytes;
+ if (start >= size) {
+ start -= size;
+ assert(start <= end);
+ assert(start < size);
+ }
+ }
+
+ void advanceEnd(int bytes) {
+ assert(start <= end || start > end + bytes);
+ end += bytes;
+ if (end >= size) {
+ end -= size;
+ assert(end < start);
+ assert(end < size);
+ }
+ }
+
+ bool get isEmpty => end == start;
+
+ int get length {
+ if (start > end) return size + end - start;
+ return end - start;
+ }
+
+ int get linearLength {
+ if (start > end) return size - start;
+ return end - start;
+ }
+
+ int get free {
+ if (start > end) return start - end - 1;
+ return size + start - end - 1;
+ }
+
+ int get linearFree {
+ if (start > end) return start - end - 1;
+ if (start == 0) return size - end - 1;
+ return size - end;
+ }
+
+ List<int> read(int bytes) {
+ if (bytes == null) {
+ bytes = length;
+ } else {
+ bytes = min(bytes, length);
+ }
+ if (bytes == 0) return null;
+ List<int> result = new Uint8List(bytes);
+ int bytesRead = 0;
+ // Loop over zero, one, or two linear data ranges.
+ while (bytesRead < bytes) {
+ int toRead = linearLength;
+ result.setRange(bytesRead,
+ bytesRead + toRead,
+ data,
+ start);
+ advanceStart(toRead);
+ bytesRead += toRead;
+ }
+ return result;
+ }
- // TODO(whesse): Consider making this a circular buffer. Only if it helps.
- void advanceStart(int numBytes) {
- start += numBytes;
- length -= numBytes;
- if (length == 0) {
- start = 0;
+ int write(List<int> inputData, int offset, int bytes) {
+ if (bytes > free) {
+ bytes = free;
}
+ int written = 0;
+ int toWrite = min(bytes, linearFree);
+ // Loop over zero, one, or two linear data ranges.
+ while (toWrite > 0) {
+ data.setRange(end, end + toWrite, inputData, offset);
+ advanceEnd(toWrite);
+ offset += toWrite;
+ written += toWrite;
+ toWrite = min(bytes - written, linearFree);
+ }
+ return written;
}
- int get free => data.length - (start + length);
+ int writeFromSource(List<int> getData(int requested)) {
+ int written = 0;
+ int toWrite = linearFree;
+ // Loop over zero, one, or two linear data ranges.
+ while (toWrite > 0) {
+ // Source returns at most toWrite bytes, and it returns null when empty.
+ var inputData = getData(toWrite);
+ if (inputData == null) break;
+ var len = inputData.length;
+ data.setRange(end, end + len, inputData);
+ advanceEnd(len);
+ written += len;
+ toWrite = linearFree;
+ }
+ return written;
+ }
+
+ bool readToSocket(RawSocket socket) {
+ // Loop over zero, one, or two linear data ranges.
+ while (true) {
+ var toWrite = linearLength;
+ if (toWrite == 0) return false;
+ int bytes = socket.write(data, start, toWrite);
+ advanceStart(bytes);
+ if (bytes < toWrite) {
+ // The socket has blocked while we have data to write.
+ return true;
+ }
+ }
+ }
List data; // This will be a ExternalByteArray, backed by C allocated data.
int start;
- int length;
+ int end;
+ final size;
}
abstract class _SecureFilter {
external factory _SecureFilter();
+ external static SendPort _newServicePort();
+
void connect(String hostName,
Uint8List addr,
int port,
@@ -1002,6 +1157,7 @@ abstract class _SecureFilter {
int processBuffer(int bufferIndex);
void registerBadCertificateCallback(Function callback);
void registerHandshakeCompleteCallback(Function handshakeCompleteHandler);
+ int _pointer();
List<_ExternalBuffer> get buffers;
}
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698