OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * A high-level class for communicating securely over a TCP socket, using | 8 * A high-level class for communicating securely over a TCP socket, using |
9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an | 9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an |
10 * [IOSink] interface, making it ideal for using together with | 10 * [IOSink] interface, making it ideal for using together with |
(...skipping 322 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
333 this.issuer, | 333 this.issuer, |
334 this.startValidity, | 334 this.startValidity, |
335 this.endValidity); | 335 this.endValidity); |
336 final String subject; | 336 final String subject; |
337 final String issuer; | 337 final String issuer; |
338 final DateTime startValidity; | 338 final DateTime startValidity; |
339 final DateTime endValidity; | 339 final DateTime endValidity; |
340 } | 340 } |
341 | 341 |
342 | 342 |
343 class _FilterStatus { | |
344 bool progress = false; // The filter read or wrote data to the buffers. | |
345 bool readEmpty = true; // The read buffers and decryption filter are empty. | |
346 bool writeEmpty = true; // The write buffers and encryption filter are empty. | |
347 // These are set if a buffer changes state from empty or full. | |
348 bool readPlaintextNoLongerEmpty = false; | |
349 bool writePlaintextNoLongerFull = false; | |
350 bool readEncryptedNoLongerFull = false; | |
351 bool writeEncryptedNoLongerEmpty = false; | |
352 | |
353 _FilterStatus(); | |
354 } | |
355 | |
356 | |
343 class _RawSecureSocket extends Stream<RawSocketEvent> | 357 class _RawSecureSocket extends Stream<RawSocketEvent> |
344 implements RawSecureSocket { | 358 implements RawSecureSocket { |
345 // Status states | 359 // Status states |
346 static final int NOT_CONNECTED = 200; | |
347 static final int HANDSHAKE = 201; | 360 static final int HANDSHAKE = 201; |
348 static final int CONNECTED = 202; | 361 static final int CONNECTED = 202; |
349 static final int CLOSED = 203; | 362 static final int CLOSED = 203; |
350 | 363 |
351 // Buffer identifiers. | 364 // Buffer identifiers. |
352 // These must agree with those in the native C++ implementation. | 365 // These must agree with those in the native C++ implementation. |
353 static final int READ_PLAINTEXT = 0; | 366 static final int READ_PLAINTEXT = 0; |
354 static final int WRITE_PLAINTEXT = 1; | 367 static final int WRITE_PLAINTEXT = 1; |
355 static final int READ_ENCRYPTED = 2; | 368 static final int READ_ENCRYPTED = 2; |
356 static final int WRITE_ENCRYPTED = 3; | 369 static final int WRITE_ENCRYPTED = 3; |
357 static final int NUM_BUFFERS = 4; | 370 static final int NUM_BUFFERS = 4; |
358 | 371 |
372 // Is a buffer identifier for an encrypted buffer? | |
373 static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED ; | |
374 | |
359 RawSocket _socket; | 375 RawSocket _socket; |
360 final Completer<_RawSecureSocket> _handshakeComplete = | 376 final Completer<_RawSecureSocket> _handshakeComplete = |
361 new Completer<_RawSecureSocket>(); | 377 new Completer<_RawSecureSocket>(); |
362 StreamController<RawSocketEvent> _controller; | 378 StreamController<RawSocketEvent> _controller; |
363 Stream<RawSocketEvent> _stream; | 379 Stream<RawSocketEvent> _stream; |
364 StreamSubscription<RawSocketEvent> _socketSubscription; | 380 StreamSubscription<RawSocketEvent> _socketSubscription; |
365 List<int> _bufferedData; | 381 List<int> _bufferedData; |
366 int _bufferedDataIndex = 0; | 382 int _bufferedDataIndex = 0; |
367 final InternetAddress address; | 383 final InternetAddress address; |
368 final bool is_server; | 384 final bool is_server; |
369 final String certificateName; | 385 final String certificateName; |
370 final bool requestClientCertificate; | 386 final bool requestClientCertificate; |
371 final bool requireClientCertificate; | 387 final bool requireClientCertificate; |
372 final bool sendClientCertificate; | 388 final bool sendClientCertificate; |
373 final Function onBadCertificate; | 389 final Function onBadCertificate; |
374 | 390 |
375 var _status = NOT_CONNECTED; | 391 var _status = HANDSHAKE; |
376 bool _writeEventsEnabled = true; | 392 bool _writeEventsEnabled = true; |
377 bool _readEventsEnabled = true; | 393 bool _readEventsEnabled = true; |
394 int _pauseCount = 0; | |
395 bool _pendingReadEvent = false; | |
378 bool _socketClosedRead = false; // The network socket is closed for reading. | 396 bool _socketClosedRead = false; // The network socket is closed for reading. |
379 bool _socketClosedWrite = false; // The network socket is closed for writing. | 397 bool _socketClosedWrite = false; // The network socket is closed for writing. |
380 bool _closedRead = false; // The secure socket has fired an onClosed event. | 398 bool _closedRead = false; // The secure socket has fired an onClosed event. |
381 bool _closedWrite = false; // The secure socket has been closed for writing. | 399 bool _closedWrite = false; // The secure socket has been closed for writing. |
382 bool _filterReadEmpty = true; // There is no buffered data to read. | 400 _FilterStatus _filterStatus = new _FilterStatus(); |
383 bool _filterWriteEmpty = true; // There is no buffered data to be written. | |
384 bool _connectPending = false; | 401 bool _connectPending = false; |
402 bool _filterPending = false; | |
403 bool _filterActive = false; | |
404 | |
385 _SecureFilter _secureFilter = new _SecureFilter(); | 405 _SecureFilter _secureFilter = new _SecureFilter(); |
406 int _filterPointer; | |
407 SendPort _filterService; | |
386 | 408 |
387 static Future<_RawSecureSocket> connect( | 409 static Future<_RawSecureSocket> connect( |
388 host, | 410 host, |
389 int requestedPort, | 411 int requestedPort, |
390 String certificateName, | 412 String certificateName, |
391 {bool is_server, | 413 {bool is_server, |
392 RawSocket socket, | 414 RawSocket socket, |
393 StreamSubscription subscription, | 415 StreamSubscription subscription, |
394 List<int> bufferedData, | 416 List<int> bufferedData, |
395 bool requestClientCertificate: false, | 417 bool requestClientCertificate: false, |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
434 sync: true, | 456 sync: true, |
435 onListen: _onSubscriptionStateChange, | 457 onListen: _onSubscriptionStateChange, |
436 onPause: _onPauseStateChange, | 458 onPause: _onPauseStateChange, |
437 onResume: _onPauseStateChange, | 459 onResume: _onPauseStateChange, |
438 onCancel: _onSubscriptionStateChange); | 460 onCancel: _onSubscriptionStateChange); |
439 _stream = _controller.stream; | 461 _stream = _controller.stream; |
440 // Throw an ArgumentError if any field is invalid. After this, all | 462 // Throw an ArgumentError if any field is invalid. After this, all |
441 // errors will be reported through the future or the stream. | 463 // errors will be reported through the future or the stream. |
442 _verifyFields(); | 464 _verifyFields(); |
443 _secureFilter.init(); | 465 _secureFilter.init(); |
444 if (_bufferedData != null) _readFromBuffered(); | 466 _filterPointer = _secureFilter._pointer(); |
445 _secureFilter.registerHandshakeCompleteCallback( | 467 _secureFilter.registerHandshakeCompleteCallback( |
446 _secureHandshakeCompleteHandler); | 468 _secureHandshakeCompleteHandler); |
447 if (onBadCertificate != null) { | 469 if (onBadCertificate != null) { |
448 _secureFilter.registerBadCertificateCallback(onBadCertificate); | 470 _secureFilter.registerBadCertificateCallback(onBadCertificate); |
449 } | 471 } |
450 var futureSocket; | 472 var futureSocket; |
451 if (socket == null) { | 473 if (socket == null) { |
452 futureSocket = RawSocket.connect(address, requestedPort); | 474 futureSocket = RawSocket.connect(address, requestedPort); |
453 } else { | 475 } else { |
454 futureSocket = new Future.value(socket); | 476 futureSocket = new Future.value(socket); |
(...skipping 15 matching lines...) Expand all Loading... | |
470 } | 492 } |
471 _connectPending = true; | 493 _connectPending = true; |
472 _secureFilter.connect(address.host, | 494 _secureFilter.connect(address.host, |
473 port, | 495 port, |
474 is_server, | 496 is_server, |
475 certificateName, | 497 certificateName, |
476 requestClientCertificate || | 498 requestClientCertificate || |
477 requireClientCertificate, | 499 requireClientCertificate, |
478 requireClientCertificate, | 500 requireClientCertificate, |
479 sendClientCertificate); | 501 sendClientCertificate); |
480 _status = HANDSHAKE; | |
481 _secureHandshake(); | 502 _secureHandshake(); |
482 }) | 503 }) |
483 .catchError((error) { | 504 .catchError((error) { |
484 _handshakeComplete.completeError(error); | 505 _handshakeComplete.completeError(error); |
485 _close(); | 506 _close(); |
486 }); | 507 }); |
487 } | 508 } |
488 | 509 |
489 StreamSubscription listen(void onData(RawSocketEvent data), | 510 StreamSubscription listen(void onData(RawSocketEvent data), |
490 {void onError(error), | 511 {void onError(error), |
491 void onDone(), | 512 void onDone(), |
492 bool cancelOnError}) { | 513 bool cancelOnError}) { |
493 if (_writeEventsEnabled) { | 514 _sendWriteEvent(); |
494 _writeEventsEnabled = false; | |
495 _controller.add(RawSocketEvent.WRITE); | |
496 } | |
497 return _stream.listen(onData, | 515 return _stream.listen(onData, |
498 onError: onError, | 516 onError: onError, |
499 onDone: onDone, | 517 onDone: onDone, |
500 cancelOnError: cancelOnError); | 518 cancelOnError: cancelOnError); |
501 } | 519 } |
502 | 520 |
503 void _verifyFields() { | 521 void _verifyFields() { |
504 assert(is_server is bool); | 522 assert(is_server is bool); |
505 assert(_socket == null || _socket is RawSocket); | 523 assert(_socket == null || _socket is RawSocket); |
506 if (address is! InternetAddress) { | 524 if (address is! InternetAddress) { |
(...skipping 21 matching lines...) Expand all Loading... | |
528 } | 546 } |
529 | 547 |
530 int get port => _socket.port; | 548 int get port => _socket.port; |
531 | 549 |
532 String get remoteHost => _socket.remoteHost; | 550 String get remoteHost => _socket.remoteHost; |
533 | 551 |
534 int get remotePort => _socket.remotePort; | 552 int get remotePort => _socket.remotePort; |
535 | 553 |
536 int available() { | 554 int available() { |
537 if (_status != CONNECTED) return 0; | 555 if (_status != CONNECTED) return 0; |
538 _readEncryptedData(); | |
539 return _secureFilter.buffers[READ_PLAINTEXT].length; | 556 return _secureFilter.buffers[READ_PLAINTEXT].length; |
540 } | 557 } |
541 | 558 |
542 void close() { | 559 void close() { |
543 shutdown(SocketDirection.BOTH); | 560 shutdown(SocketDirection.BOTH); |
544 } | 561 } |
545 | 562 |
546 void _close() { | 563 void _close() { |
547 _closedWrite = true; | 564 _closedWrite = true; |
548 _closedRead = true; | 565 _closedRead = true; |
549 if (_socket != null) { | 566 if (_socket != null) { |
550 _socket.close(); | 567 _socket.close(); |
551 } | 568 } |
552 _socketClosedWrite = true; | 569 _socketClosedWrite = true; |
553 _socketClosedRead = true; | 570 _socketClosedRead = true; |
554 if (_secureFilter != null) { | 571 if (!_filterActive && _secureFilter != null) { |
555 _secureFilter.destroy(); | 572 _secureFilter.destroy(); |
556 _secureFilter = null; | 573 _secureFilter = null; |
557 } | 574 } |
558 if (_socketSubscription != null) { | 575 if (_socketSubscription != null) { |
559 _socketSubscription.cancel(); | 576 _socketSubscription.cancel(); |
560 } | 577 } |
561 _controller.close(); | 578 _controller.close(); |
562 _status = CLOSED; | 579 _status = CLOSED; |
563 } | 580 } |
564 | 581 |
565 void shutdown(SocketDirection direction) { | 582 void shutdown(SocketDirection direction) { |
566 if (direction == SocketDirection.SEND || | 583 if (direction == SocketDirection.SEND || |
567 direction == SocketDirection.BOTH) { | 584 direction == SocketDirection.BOTH) { |
568 _closedWrite = true; | 585 _closedWrite = true; |
569 _writeEncryptedData(); | 586 if (_filterStatus.writeEmpty) { |
570 if (_filterWriteEmpty) { | |
571 _socket.shutdown(SocketDirection.SEND); | 587 _socket.shutdown(SocketDirection.SEND); |
572 _socketClosedWrite = true; | 588 _socketClosedWrite = true; |
573 if (_closedRead) { | 589 if (_closedRead) { |
574 _close(); | 590 _close(); |
575 } | 591 } |
576 } | 592 } |
577 } | 593 } |
578 if (direction == SocketDirection.RECEIVE || | 594 if (direction == SocketDirection.RECEIVE || |
579 direction == SocketDirection.BOTH) { | 595 direction == SocketDirection.BOTH) { |
580 _closedRead = true; | 596 _closedRead = true; |
581 _socketClosedRead = true; | 597 _socketClosedRead = true; |
582 _socket.shutdown(SocketDirection.RECEIVE); | 598 _socket.shutdown(SocketDirection.RECEIVE); |
583 if (_socketClosedWrite) { | 599 if (_socketClosedWrite) { |
584 _close(); | 600 _close(); |
585 } | 601 } |
586 } | 602 } |
587 } | 603 } |
588 | 604 |
589 bool get writeEventsEnabled => _writeEventsEnabled; | 605 bool get writeEventsEnabled => _writeEventsEnabled; |
590 | 606 |
591 void set writeEventsEnabled(bool value) { | 607 void set writeEventsEnabled(bool value) { |
592 if (value && | 608 _writeEventsEnabled = value; |
593 _controller.hasListener && | 609 if (value) { |
594 _secureFilter != null && | 610 Timer.run(() => _sendWriteEvent()); |
595 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { | |
596 Timer.run(() => _controller.add(RawSocketEvent.WRITE)); | |
597 } else { | |
598 _writeEventsEnabled = value; | |
599 } | 611 } |
600 } | 612 } |
601 | 613 |
602 bool get readEventsEnabled => _readEventsEnabled; | 614 bool get readEventsEnabled => _readEventsEnabled; |
603 | 615 |
604 void set readEventsEnabled(bool value) { | 616 void set readEventsEnabled(bool value) { |
605 _readEventsEnabled = value; | 617 _readEventsEnabled = value; |
606 if (value && | 618 _scheduleReadEvent(); |
607 ((_secureFilter != null && | |
608 _secureFilter.buffers[READ_PLAINTEXT].length > 0) || | |
609 _socketClosedRead)) { | |
610 // We might not have no underlying socket to set off read events. | |
611 Timer.run(_readHandler); | |
612 } | |
613 } | 619 } |
614 | 620 |
615 List<int> read([int len]) { | 621 List<int> read([int length]) { |
622 if (length != null && (length is! int || length < 0)) { | |
623 throw new ArgumentError( | |
624 "Invalid length parameter in SecureSocket.read (length: $length)"); | |
625 } | |
616 if (_closedRead) { | 626 if (_closedRead) { |
617 throw new SocketException("Reading from a closed socket"); | 627 throw new SocketException("Reading from a closed socket"); |
618 } | 628 } |
619 if (_status != CONNECTED) { | 629 if (_status != CONNECTED) { |
620 return null; | 630 return null; |
621 } | 631 } |
622 var buffer = _secureFilter.buffers[READ_PLAINTEXT]; | 632 var result = _secureFilter.buffers[READ_PLAINTEXT].read(length); |
623 _readEncryptedData(); | 633 _scheduleFilter(); |
624 int toRead = buffer.length; | |
625 if (len != null) { | |
626 if (len is! int || len < 0) { | |
627 throw new ArgumentError( | |
628 "Invalid len parameter in SecureSocket.read (len: $len)"); | |
629 } | |
630 if (len < toRead) { | |
631 toRead = len; | |
632 } | |
633 } | |
634 List<int> result = (toRead == 0) ? null : | |
635 buffer.data.sublist(buffer.start, buffer.start + toRead); | |
636 buffer.advanceStart(toRead); | |
637 | |
638 // Set up a read event if the filter still has data. | |
639 if (!_filterReadEmpty) { | |
640 Timer.run(_readHandler); | |
641 } | |
642 | |
643 if (_socketClosedRead) { // An onClose event is pending. | |
644 // _closedRead is false, since we are in a read call. | |
645 if (!_filterReadEmpty) { | |
646 // _filterReadEmpty may be out of date since read empties | |
647 // the plaintext buffer after calling _readEncryptedData. | |
648 // TODO(whesse): Fix this as part of fixing read. | |
649 _readEncryptedData(); | |
650 } | |
651 if (_filterReadEmpty) { | |
652 // This can't be an else clause: the value of _filterReadEmpty changes. | |
653 // This must be asynchronous, because we are in a read call. | |
654 Timer.run(_closeHandler); | |
655 } | |
656 } | |
657 | |
658 return result; | 634 return result; |
659 } | 635 } |
660 | 636 |
661 // Write the data to the socket, and flush it as much as possible | 637 // Write the data to the socket, and schedule the filter to encrypt it. |
662 // until it would block. If the write would block, _writeEncryptedData sets | |
663 // up handlers to flush the pipeline when possible. | |
664 int write(List<int> data, [int offset, int bytes]) { | 638 int write(List<int> data, [int offset, int bytes]) { |
639 if (bytes != null && (bytes is! int || bytes < 0)) { | |
640 throw new ArgumentError( | |
641 "Invalid bytes parameter in SecureSocket.read (bytes: $bytes)"); | |
642 } | |
643 if (offset != null && (offset is! int || offset < 0)) { | |
644 throw new ArgumentError( | |
645 "Invalid offset parameter in SecureSocket.read (offset: $offset)"); | |
646 } | |
665 if (_closedWrite) { | 647 if (_closedWrite) { |
666 _controller.addError(new SocketException("Writing to a closed socket")); | 648 _controller.addError(new SocketException("Writing to a closed socket")); |
667 return 0; | 649 return 0; |
668 } | 650 } |
669 if (_status != CONNECTED) return 0; | 651 if (_status != CONNECTED) return 0; |
670 | |
671 if (offset == null) offset = 0; | 652 if (offset == null) offset = 0; |
672 if (bytes == null) bytes = data.length - offset; | 653 if (bytes == null) bytes = data.length - offset; |
673 | 654 |
674 var buffer = _secureFilter.buffers[WRITE_PLAINTEXT]; | 655 int written = |
675 if (bytes > buffer.free) { | 656 _secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes); |
676 bytes = buffer.free; | 657 if (written > 0) { |
658 _filterStatus.writeEmpty = false; | |
677 } | 659 } |
678 if (bytes > 0) { | 660 _scheduleFilter(); |
679 int startIndex = buffer.start + buffer.length; | 661 return written; |
680 buffer.data.setRange(startIndex, startIndex + bytes, data, offset); | |
681 buffer.length += bytes; | |
682 } | |
683 _writeEncryptedData(); // Tries to flush all pipeline stages. | |
684 return bytes; | |
685 } | 662 } |
686 | 663 |
687 X509Certificate get peerCertificate => _secureFilter.peerCertificate; | 664 X509Certificate get peerCertificate => _secureFilter.peerCertificate; |
688 | 665 |
689 bool setOption(SocketOption option, bool enabled) { | 666 bool setOption(SocketOption option, bool enabled) { |
690 if (_socket == null) return false; | 667 if (_socket == null) return false; |
691 return _socket.setOption(option, enabled); | 668 return _socket.setOption(option, enabled); |
692 } | 669 } |
693 | 670 |
694 void _writeHandler() { | |
695 if (_status == CLOSED) return; | |
696 _writeEncryptedData(); | |
697 if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) { | |
698 // Close _socket for write, by calling shutdown(), to avoid cloning the | |
699 // socket closing code in shutdown(). | |
700 shutdown(SocketDirection.SEND); | |
701 } | |
702 if (_status == HANDSHAKE) { | |
703 try { | |
704 _secureHandshake(); | |
705 } catch (e) { _reportError(e, "RawSecureSocket error"); } | |
706 } else if (_status == CONNECTED && | |
707 _controller.hasListener && | |
708 _writeEventsEnabled && | |
709 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { | |
710 // Reset the one-shot handler. | |
711 _writeEventsEnabled = false; | |
712 _controller.add(RawSocketEvent.WRITE); | |
713 } | |
714 } | |
715 | |
716 void _eventDispatcher(RawSocketEvent event) { | 671 void _eventDispatcher(RawSocketEvent event) { |
717 if (event == RawSocketEvent.READ) { | 672 if (event == RawSocketEvent.READ) { |
718 _readHandler(); | 673 _readHandler(); |
719 } else if (event == RawSocketEvent.WRITE) { | 674 } else if (event == RawSocketEvent.WRITE) { |
720 _writeHandler(); | 675 _writeHandler(); |
721 } else if (event == RawSocketEvent.READ_CLOSED) { | 676 } else if (event == RawSocketEvent.READ_CLOSED) { |
722 _closeHandler(); | 677 _closeHandler(); |
723 } | 678 } |
724 } | 679 } |
725 | 680 |
726 void _readFromBuffered() { | |
727 assert(_bufferedData != null); | |
728 var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; | |
729 var bytes = _bufferedData.length - _bufferedDataIndex; | |
730 int startIndex = encrypted.start + encrypted.length; | |
731 encrypted.data.setRange(startIndex, | |
732 startIndex + bytes, | |
733 _bufferedData, | |
734 _bufferedDataIndex); | |
735 encrypted.length += bytes; | |
736 _bufferedDataIndex += bytes; | |
737 if (_bufferedData.length == _bufferedDataIndex) { | |
738 _bufferedData = null; | |
739 } | |
740 } | |
741 | |
742 void _readHandler() { | 681 void _readHandler() { |
743 if (_status == CLOSED) { | 682 _readSocket(); |
744 return; | 683 _scheduleFilter(); |
745 } else if (_status == HANDSHAKE) { | 684 } |
746 try { | 685 |
747 _secureHandshake(); | 686 void _writeHandler() { |
748 if (_status != HANDSHAKE) _readHandler(); | 687 _writeSocket(); |
749 } catch (e) { _reportError(e, "RawSecureSocket error"); } | 688 _scheduleFilter(); |
750 } else { | |
751 if (_status != CONNECTED) { | |
752 // Cannot happen. | |
753 throw new SocketException("Internal SocketIO Error"); | |
754 } | |
755 try { | |
756 _readEncryptedData(); | |
757 } catch (e) { _reportError(e, "RawSecureSocket error"); } | |
758 if (!_filterReadEmpty) { | |
759 if (_readEventsEnabled) { | |
760 if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) { | |
761 _controller.add(RawSocketEvent.READ); | |
762 } | |
763 if (_socketClosedRead) { | |
764 // Keep firing read events until we are paused or buffer is empty. | |
765 Timer.run(_readHandler); | |
766 } | |
767 } | |
768 } else if (_socketClosedRead) { | |
769 _closeHandler(); | |
770 } | |
771 } | |
772 } | 689 } |
773 | 690 |
774 void _doneHandler() { | 691 void _doneHandler() { |
775 if (_filterReadEmpty) { | 692 if (_filterStatus.readEmpty) { |
776 _close(); | 693 _close(); |
777 } | 694 } |
778 } | 695 } |
779 | 696 |
780 void _errorHandler(e) { | 697 void _errorHandler(e) { |
781 _reportError(e, 'Error on underlying RawSocket'); | 698 _reportError(e, 'Error on underlying RawSocket'); |
782 } | 699 } |
783 | 700 |
784 void _reportError(e, String message) { | 701 void _reportError(e, String message) { |
785 // TODO(whesse): Call _reportError from all internal functions that throw. | 702 // TODO(whesse): Call _reportError from all internal functions that throw. |
786 if (e is SocketException) { | 703 if (e is SocketException) { |
787 e = new SocketException('$message (${e.message})', e.osError); | 704 e = new SocketException('$message (${e.message})', e.osError); |
788 } else if (e is OSError) { | 705 } else if (e is OSError) { |
789 e = new SocketException(message, e); | 706 e = new SocketException(message, e); |
790 } else { | 707 } else { |
791 e = new SocketException('$message (${e.toString()})', null); | 708 e = new SocketException('$message (${e.toString()})', null); |
792 } | 709 } |
793 if (_connectPending) { | 710 if (_connectPending) { |
794 _handshakeComplete.completeError(e); | 711 _handshakeComplete.completeError(e); |
795 } else { | 712 } else { |
796 _controller.addError(e); | 713 _controller.addError(e); |
797 } | 714 } |
798 _close(); | 715 _close(); |
799 } | 716 } |
800 | 717 |
801 void _closeHandler() { | 718 void _closeHandler() { |
802 if (_status == CONNECTED) { | 719 if (_status == CONNECTED) { |
803 if (_closedRead) return; | 720 if (_closedRead) return; |
804 _socketClosedRead = true; | 721 _socketClosedRead = true; |
805 if (_filterReadEmpty) { | 722 if (_filterStatus.readEmpty) { |
806 _closedRead = true; | 723 _closedRead = true; |
807 _controller.add(RawSocketEvent.READ_CLOSED); | 724 _controller.add(RawSocketEvent.READ_CLOSED); |
808 if (_socketClosedWrite) { | 725 if (_socketClosedWrite) { |
809 _close(); | 726 _close(); |
810 } | 727 } |
728 } else { | |
729 _scheduleFilter(); | |
811 } | 730 } |
812 } else if (_status == HANDSHAKE) { | 731 } else if (_status == HANDSHAKE) { |
732 _socketClosedRead = true; | |
733 if (_filterStatus.readEmpty) { | |
813 _reportError( | 734 _reportError( |
814 new SocketException('Connection terminated during handshake'), | 735 new SocketException('Connection terminated during handshake'), |
815 'handshake error'); | 736 'RawSecureSocket error'); |
737 } else { | |
738 _secureHandshake(); | |
739 } | |
816 } | 740 } |
817 } | 741 } |
818 | 742 |
819 void _secureHandshake() { | 743 void _secureHandshake() { |
820 _readEncryptedData(); | 744 try { |
821 _secureFilter.handshake(); | 745 _secureFilter.handshake(); |
822 _writeEncryptedData(); | 746 _filterStatus.writeEmpty = false; |
747 _readSocket(); | |
748 _writeSocket(); | |
749 _scheduleFilter(); | |
750 } catch (e) { | |
751 _reportError(e, "RawSecureSocket error"); | |
752 } | |
823 } | 753 } |
824 | 754 |
825 void _secureHandshakeCompleteHandler() { | 755 void _secureHandshakeCompleteHandler() { |
826 _status = CONNECTED; | 756 _status = CONNECTED; |
827 if (_connectPending) { | 757 if (_connectPending) { |
828 _connectPending = false; | 758 _connectPending = false; |
829 // If we complete the future synchronously, user code will run here, | 759 // We don't want user code to run synchronously in this callback. |
830 // and modify the state of the RawSecureSocket. For example, it | |
831 // could close the socket, and set _filter to null. | |
832 Timer.run(() => _handshakeComplete.complete(this)); | 760 Timer.run(() => _handshakeComplete.complete(this)); |
833 } | 761 } |
834 } | 762 } |
835 | 763 |
836 void _onPauseStateChange() { | 764 void _onPauseStateChange() { |
765 if (_controller.isPaused) { | |
766 _pauseCount++; | |
767 } else { | |
768 _pauseCount--; | |
769 if (_pauseCount == 0) { | |
770 _scheduleReadEvent(); | |
771 _sendWriteEvent(); // Can send event synchronously. | |
772 } | |
773 } | |
774 | |
837 if (!_socketClosedRead || !_socketClosedWrite) { | 775 if (!_socketClosedRead || !_socketClosedWrite) { |
838 if (_controller.isPaused) { | 776 if (_controller.isPaused) { |
839 _socketSubscription.pause(); | 777 _socketSubscription.pause(); |
840 } else { | 778 } else { |
841 _socketSubscription.resume(); | 779 _socketSubscription.resume(); |
842 } | 780 } |
843 } | 781 } |
844 } | 782 } |
845 | 783 |
846 void _onSubscriptionStateChange() { | 784 void _onSubscriptionStateChange() { |
847 if (_controller.hasListener) { | 785 if (_controller.hasListener) { |
848 // TODO(ajohnsen): Do something here? | 786 // TODO(ajohnsen): Do something here? |
849 } | 787 } |
850 } | 788 } |
851 | 789 |
852 void _readEncryptedData() { | 790 void _scheduleFilter() { |
853 // Read from the socket, and push it through the filter as far as | 791 _filterPending = true; |
854 // possible. | 792 _tryFilter(); |
855 var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; | 793 } |
856 var plaintext = _secureFilter.buffers[READ_PLAINTEXT]; | 794 |
857 bool progress = true; | 795 void _tryFilter() { |
858 while (progress) { | 796 if (_status == CLOSED) return; |
859 progress = false; | 797 if (_filterPending && !_filterActive) { |
860 // Do not try to read plaintext from the filter while handshaking. | 798 _filterActive = true; |
861 if ((_status == CONNECTED) && plaintext.free > 0) { | 799 _filterPending = false; |
862 int bytes = _secureFilter.processBuffer(READ_PLAINTEXT); | 800 _pushAllFilterStages().then((status) { |
863 if (bytes > 0) { | 801 _filterStatus = status; |
864 plaintext.length += bytes; | 802 _filterActive = false; |
865 progress = true; | 803 if (_status == CLOSED) { |
866 } | 804 _secureFilter.destroy(); |
867 } | 805 _secureFilter = null; |
868 if (encrypted.length > 0) { | 806 return; |
869 int bytes = _secureFilter.processBuffer(READ_ENCRYPTED); | 807 } |
870 if (bytes > 0) { | 808 if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) { |
871 encrypted.advanceStart(bytes); | 809 // Checks for and handles all cases of partially closed sockets. |
872 progress = true; | 810 shutdown(SocketDirection.SEND); |
873 } | 811 if (_status == CLOSED) return; |
874 } | 812 } |
875 if (!_socketClosedRead && encrypted.free > 0) { | 813 if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) { |
876 if (_bufferedData != null) { | 814 if (_status == HANDSHAKE) { |
877 _readFromBuffered(); | 815 _secureFilter.handshake(); |
878 progress = true; | 816 if (_status == HANDSHAKE) { |
879 } else { | 817 _reportError( |
880 List<int> data = _socket.read(encrypted.free); | 818 new SocketException('Connection terminated during handshake'), |
881 if (data != null) { | 819 'RawSecureSocket error'); |
882 int bytes = data.length; | 820 } |
883 int startIndex = encrypted.start + encrypted.length; | |
884 encrypted.data.setRange(startIndex, startIndex + bytes, data); | |
885 encrypted.length += bytes; | |
886 progress = true; | |
887 } | 821 } |
888 } | 822 _closeHandler(); |
889 } | 823 } |
890 } | 824 if (_status == CLOSED) return; |
891 // If there is any data in any stages of the filter, there should | 825 if (_filterStatus.progress) { |
892 // be data in the plaintext buffer after this process. | 826 _filterPending = true; |
893 // TODO(whesse): Verify that this is true, and there can be no | 827 if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent(); |
894 // partial encrypted block stuck in the secureFilter. | 828 if (_filterStatus.readEncryptedNoLongerFull) _readSocket(); |
895 _filterReadEmpty = (plaintext.length == 0); | 829 if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket(); |
896 } | 830 if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent(); |
897 | 831 if (_status == HANDSHAKE) _secureHandshake(); |
898 void _writeEncryptedData() { | 832 } |
833 _tryFilter(); | |
834 }); | |
835 } | |
836 } | |
837 | |
838 List<int> _readSocketOrBufferedData(int bytes) { | |
839 if (_bufferedData != null) { | |
840 if (bytes > _bufferedData.length - _bufferedDataIndex) { | |
841 bytes = _bufferedData.length - _bufferedDataIndex; | |
842 } | |
843 var result = _bufferedData.sublist(_bufferedDataIndex, | |
844 _bufferedDataIndex + bytes); | |
845 _bufferedDataIndex += bytes; | |
846 if (_bufferedData.length == _bufferedDataIndex) { | |
847 _bufferedData = null; | |
848 } | |
849 return result; | |
850 } else if (!_socketClosedRead) { | |
851 try { | |
852 return _socket.read(bytes); | |
853 } catch (e) { | |
854 _reportError(e, "RawSecureSocket error reading encrypted socket"); | |
855 return null; | |
856 } | |
857 } else { | |
858 return null; | |
859 } | |
860 } | |
861 | |
862 void _readSocket() { | |
863 if (_status == CLOSED) return; | |
864 var buffer = _secureFilter.buffers[READ_ENCRYPTED]; | |
865 if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) { | |
866 _filterStatus.readEmpty = false; | |
867 } | |
868 } | |
869 | |
870 void _writeSocket() { | |
899 if (_socketClosedWrite) return; | 871 if (_socketClosedWrite) return; |
900 var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; | 872 var buffer = _secureFilter.buffers[WRITE_ENCRYPTED]; |
901 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; | 873 if (buffer.readToSocket(_socket)) { // Returns true if blocked |
902 while (true) { | 874 _socket.writeEventsEnabled = true; |
903 if (encrypted.length > 0) { | 875 } |
904 // Write from the filter to the socket. | 876 } |
905 int bytes = _socket.write(encrypted.data, | 877 |
906 encrypted.start, | 878 // If a read event should be sent, add it to the controller. |
907 encrypted.length); | 879 _scheduleReadEvent() { |
908 encrypted.advanceStart(bytes); | 880 if (!_pendingReadEvent && |
909 if (encrypted.length > 0) { | 881 _readEventsEnabled && |
910 // The socket has blocked while we have data to write. | 882 _pauseCount == 0 && |
911 // We must be notified when it becomes unblocked. | 883 _secureFilter != null && |
912 _socket.writeEventsEnabled = true; | 884 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
913 _filterWriteEmpty = false; | 885 _pendingReadEvent = true; |
914 break; | 886 Timer.run(_sendReadEvent); |
915 } | 887 } |
916 } else { | 888 } |
917 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; | 889 |
918 if (plaintext.length > 0) { | 890 _sendReadEvent() { |
919 int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT); | 891 _pendingReadEvent = false; |
920 plaintext.advanceStart(plaintext_bytes); | 892 if (_readEventsEnabled && |
921 } | 893 _pauseCount == 0 && |
922 int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED); | 894 _secureFilter != null && |
923 if (bytes <= 0) { | 895 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
924 // We know the WRITE_ENCRYPTED buffer is empty, and the | 896 _controller.add(RawSocketEvent.READ); |
925 // filter wrote zero bytes to it, so the filter must be empty. | 897 _scheduleReadEvent(); |
926 // Also, the WRITE_PLAINTEXT buffer must have been empty, or | 898 } |
927 // it would have written to the filter. | 899 } |
928 // TODO(whesse): Verify that the filter works this way. | 900 |
929 _filterWriteEmpty = true; | 901 // If a write event should be sent, add it to the controller. |
930 break; | 902 _sendWriteEvent() { |
931 } | 903 if (!_closedWrite && |
932 encrypted.length += bytes; | 904 _writeEventsEnabled && |
933 } | 905 _pauseCount == 0 && |
934 } | 906 _secureFilter != null && |
907 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { | |
908 _writeEventsEnabled = false; | |
909 _controller.add(RawSocketEvent.WRITE); | |
910 } | |
911 } | |
912 | |
913 Future<_FilterStatus> _pushAllFilterStages() { | |
914 if (_filterService == null) { | |
915 _filterService = _SecureFilter._newServicePort(); | |
916 } | |
917 List args = [_filterPointer, _status != CONNECTED]; | |
918 var bufs = _secureFilter.buffers; | |
919 for (var i = 0; i < NUM_BUFFERS; ++i) { | |
920 args.add(bufs[i].start); | |
921 args.add(bufs[i].end); | |
922 } | |
923 | |
924 return _filterService.call(args).then((positions) { | |
925 int start(int index) => positions[2 * index + 2]; | |
926 int end(int index) => positions[2 * index + 3]; | |
927 | |
928 _FilterStatus status = new _FilterStatus(); | |
929 // Compute writeEmpty as "write plaintext buffer and write encrypted | |
930 // buffer were empty when we started and are empty now". | |
931 status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty && | |
932 start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED); | |
933 // If we were in handshake when this started, _writeEmpty may be false | |
934 // because the handshake wrote data after we checked. | |
Søren Gjesse
2013/06/17 07:31:07
Just checking the boolean position[1] looks a bit
Bill Hesse
2013/06/20 14:56:26
Done.
I made the function wasInHandshake(), wishe
| |
935 if (positions[1]) status.writeEmpty = false; | |
936 | |
937 // Compute readEmpty as "both read buffers were empty when we started | |
938 // and are empty now". | |
939 status.readEmpty = bufs[READ_ENCRYPTED].isEmpty && | |
940 start(READ_PLAINTEXT) == end(READ_PLAINTEXT); | |
941 | |
942 _ExternalBuffer buffer = bufs[WRITE_PLAINTEXT]; | |
943 int new_start = start(WRITE_PLAINTEXT); | |
944 if (new_start != buffer.start) { | |
945 status.progress = true; | |
946 if (buffer.free == 0) { | |
947 status.writePlaintextNoLongerFull = true; | |
948 } | |
949 buffer.start = new_start; | |
950 } | |
951 buffer = bufs[READ_ENCRYPTED]; | |
952 new_start = start(READ_ENCRYPTED); | |
953 if (new_start != buffer.start) { | |
954 status.progress = true; | |
955 if (buffer.free == 0) { | |
956 status.readEncryptedNoLongerFull = true; | |
957 } | |
958 buffer.start = new_start; | |
959 } | |
960 buffer = bufs[WRITE_ENCRYPTED]; | |
961 int new_end = end(WRITE_ENCRYPTED); | |
962 if (new_end != buffer.end) { | |
963 status.progress = true; | |
964 if (buffer.length == 0) { | |
965 status.writeEncryptedNoLongerEmpty = true; | |
966 } | |
967 buffer.end = new_end; | |
968 } | |
969 buffer = bufs[READ_PLAINTEXT]; | |
970 new_end = end(READ_PLAINTEXT); | |
971 if (new_end != buffer.end) { | |
972 status.progress = true; | |
973 if (buffer.length == 0) { | |
974 status.readPlaintextNoLongerEmpty = true; | |
975 } | |
976 buffer.end = new_end; | |
977 } | |
978 return status; | |
979 }); | |
935 } | 980 } |
936 } | 981 } |
937 | 982 |
938 | 983 |
984 /** | |
985 * A circular buffer backed by an external byte array. Accessed from | |
986 * both C++ and Dart code in an unsynchronized way, with one reading | |
987 * and one writing. All updates to start and end are done by Dart code. | |
988 */ | |
939 class _ExternalBuffer { | 989 class _ExternalBuffer { |
940 // Performance is improved if a full buffer of plaintext fits | 990 _ExternalBuffer(this.size) : start = 0, end = 0; |
941 // in the encrypted buffer, when encrypted. | 991 |
942 static final int SIZE = 8 * 1024; | 992 void advanceStart(int bytes) { |
943 static final int ENCRYPTED_SIZE = 10 * 1024; | 993 assert(start > end || start + bytes <= end); |
944 _ExternalBuffer() : start = 0, length = 0; | 994 start += bytes; |
945 | 995 if (start >= size) { |
946 // TODO(whesse): Consider making this a circular buffer. Only if it helps. | 996 start -= size; |
947 void advanceStart(int numBytes) { | 997 assert(start <= end); |
948 start += numBytes; | 998 assert(start < size); |
949 length -= numBytes; | 999 } |
950 if (length == 0) { | 1000 } |
951 start = 0; | 1001 |
952 } | 1002 void advanceEnd(int bytes) { |
953 } | 1003 assert(start <= end || start > end + bytes); |
954 | 1004 end += bytes; |
955 int get free => data.length - (start + length); | 1005 if (end >= size) { |
1006 end -= size; | |
1007 assert(end < start); | |
1008 assert(end < size); | |
1009 } | |
1010 } | |
1011 | |
1012 bool get isEmpty => end == start; | |
1013 | |
1014 int get length { | |
1015 if (start > end) return size + end - start; | |
1016 return end - start; | |
1017 } | |
1018 | |
1019 int get linearLength { | |
1020 if (start > end) return size - start; | |
1021 return end - start; | |
1022 } | |
1023 | |
1024 int get free { | |
1025 if (start > end) return start - end - 1; | |
1026 return size + start - end - 1; | |
1027 } | |
1028 | |
1029 int get linearFree { | |
1030 if (start > end) return start - end - 1; | |
1031 if (start == 0) return size - end - 1; | |
1032 return size - end; | |
1033 } | |
1034 | |
1035 List<int> read(int bytes) { | |
1036 if (bytes == null) { | |
1037 bytes = length; | |
1038 } else { | |
1039 bytes = min(bytes, length); | |
1040 } | |
1041 if (bytes == 0) return null; | |
1042 List<int> result = new Uint8List(bytes); | |
1043 int bytesRead = 0; | |
1044 while (bytesRead < bytes) { // Loop over two linear data ranges. | |
Søren Gjesse
2013/06/17 07:31:07
two -> one or two.
Bill Hesse
2013/06/20 14:56:26
Done.
| |
1045 int toRead = linearLength; | |
1046 result.setRange(bytesRead, | |
1047 bytesRead + toRead, | |
1048 data, | |
1049 start); | |
1050 advanceStart(toRead); | |
1051 bytesRead += toRead; | |
1052 } | |
1053 return result; | |
1054 } | |
1055 | |
1056 int write(List<int> inputData, int offset, int bytes) { | |
1057 if (bytes > free) { | |
1058 bytes = free; | |
1059 } | |
1060 int written = 0; | |
1061 int toWrite = min(bytes, linearFree); | |
1062 while (toWrite > 0) { // Loop over two linear free ranges. | |
Søren Gjesse
2013/06/17 07:31:07
Ditto.
| |
1063 data.setRange(end, end + toWrite, inputData, offset); | |
1064 advanceEnd(toWrite); | |
1065 offset += toWrite; | |
1066 written += toWrite; | |
1067 toWrite = min(bytes - written, linearFree); | |
1068 } | |
1069 return written; | |
1070 } | |
1071 | |
1072 int writeFromSource(List<int> getData(int requested)) { | |
1073 int written = 0; | |
1074 int toWrite = linearFree; | |
1075 while (toWrite > 0) { // Loop over two linear free ranges. | |
1076 // Source returns at most toWrite bytes, and it returns null when empty. | |
1077 var inputData = getData(toWrite); | |
1078 if (inputData == null) break; | |
1079 var len = inputData.length; | |
1080 data.setRange(end, end + len, inputData); | |
1081 advanceEnd(len); | |
1082 written += len; | |
1083 toWrite = linearFree; | |
1084 } | |
1085 return written; | |
1086 } | |
1087 | |
1088 bool readToSocket(RawSocket socket) { | |
1089 while (true) { // Loop over two linear data ranges. | |
1090 var toWrite = linearLength; | |
1091 if (toWrite == 0) return false; | |
1092 int bytes = socket.write(data, start, toWrite); | |
1093 advanceStart(bytes); | |
1094 if (bytes < toWrite) { | |
1095 // The socket has blocked while we have data to write. | |
1096 return true; | |
1097 } | |
1098 } | |
1099 } | |
956 | 1100 |
957 List data; // This will be a ExternalByteArray, backed by C allocated data. | 1101 List data; // This will be a ExternalByteArray, backed by C allocated data. |
958 int start; | 1102 int start; |
959 int length; | 1103 int end; |
Søren Gjesse
2013/06/17 07:31:07
size should be final.
Bill Hesse
2013/06/20 14:56:26
Done.
| |
1104 int size; | |
960 } | 1105 } |
961 | 1106 |
962 | 1107 |
963 abstract class _SecureFilter { | 1108 abstract class _SecureFilter { |
964 external factory _SecureFilter(); | 1109 external factory _SecureFilter(); |
965 | 1110 |
1111 external static SendPort _newServicePort(); | |
1112 | |
966 void connect(String hostName, | 1113 void connect(String hostName, |
967 int port, | 1114 int port, |
968 bool is_server, | 1115 bool is_server, |
969 String certificateName, | 1116 String certificateName, |
970 bool requestClientCertificate, | 1117 bool requestClientCertificate, |
971 bool requireClientCertificate, | 1118 bool requireClientCertificate, |
972 bool sendClientCertificate); | 1119 bool sendClientCertificate); |
973 void destroy(); | 1120 void destroy(); |
974 void handshake(); | 1121 void handshake(); |
975 void init(); | 1122 void init(); |
976 X509Certificate get peerCertificate; | 1123 X509Certificate get peerCertificate; |
977 int processBuffer(int bufferIndex); | 1124 int processBuffer(int bufferIndex); |
978 void registerBadCertificateCallback(Function callback); | 1125 void registerBadCertificateCallback(Function callback); |
979 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); | 1126 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); |
1127 int _pointer(); | |
980 | 1128 |
981 List<_ExternalBuffer> get buffers; | 1129 List<_ExternalBuffer> get buffers; |
982 } | 1130 } |
OLD | NEW |