| Index: dart/pkg/mime/lib/src/bound_multipart_stream.dart
|
| diff --git a/dart/pkg/mime/lib/src/bound_multipart_stream.dart b/dart/pkg/mime/lib/src/bound_multipart_stream.dart
|
| index a049b1d11e9859f7d45ff3dc6a6112b6838b74ab..7d16b965178875c4fc0c175dbdf6817edbff8df0 100644
|
| --- a/dart/pkg/mime/lib/src/bound_multipart_stream.dart
|
| +++ b/dart/pkg/mime/lib/src/bound_multipart_stream.dart
|
| @@ -74,6 +74,16 @@ class BoundMultipartStream {
|
| final List<int> _headerField = [];
|
| final List<int> _headerValue = [];
|
|
|
| + // The following states belong to `_controller`, state changes will not be
|
| + // immediately acted upon but rather only after the current
|
| + // `_multipartController` is done.
|
| + static const int _CONTROLLER_STATE_IDLE = 0;
|
| + static const int _CONTROLLER_STATE_ACTIVE = 1;
|
| + static const int _CONTROLLER_STATE_PAUSED = 2;
|
| + static const int _CONTROLLER_STATE_CANCELED = 3;
|
| +
|
| + int _controllerState = _CONTROLLER_STATE_IDLE;
|
| +
|
| StreamController _controller;
|
|
|
| Stream<MimeMultipart> get stream => _controller.stream;
|
| @@ -97,13 +107,15 @@ class BoundMultipartStream {
|
| onPause: _pauseStream,
|
| onResume:_resumeStream,
|
| onCancel: () {
|
| - _subscription.cancel();
|
| + _controllerState = _CONTROLLER_STATE_CANCELED;
|
| + _tryPropagateControllerState();
|
| },
|
| onListen: () {
|
| + _controllerState = _CONTROLLER_STATE_ACTIVE;
|
| _subscription = stream.listen(
|
| (data) {
|
| assert(_buffer == null);
|
| - _pauseStream();
|
| + _subscription.pause();
|
| _buffer = data;
|
| _index = 0;
|
| _parse();
|
| @@ -120,13 +132,33 @@ class BoundMultipartStream {
|
| }
|
|
|
| void _resumeStream() {
|
| - _subscription.resume();
|
| + assert (_controllerState == _CONTROLLER_STATE_PAUSED);
|
| + _controllerState = _CONTROLLER_STATE_ACTIVE;
|
| + _tryPropagateControllerState();
|
| }
|
|
|
| void _pauseStream() {
|
| - _subscription.pause();
|
| + _controllerState = _CONTROLLER_STATE_PAUSED;
|
| + _tryPropagateControllerState();
|
| }
|
|
|
| + void _tryPropagateControllerState() {
|
| + if (_multipartController == null) {
|
| + switch (_controllerState) {
|
| + case _CONTROLLER_STATE_ACTIVE:
|
| + if (_subscription.isPaused) _subscription.resume();
|
| + break;
|
| + case _CONTROLLER_STATE_PAUSED:
|
| + if (!_subscription.isPaused) _subscription.pause();
|
| + break;
|
| + case _CONTROLLER_STATE_CANCELED:
|
| + _subscription.cancel();
|
| + break;
|
| + default:
|
| + throw new StateError("This code should never be reached.");
|
| + }
|
| + }
|
| + }
|
|
|
| void _parse() {
|
| // Number of boundary bytes to artificially place before the supplied data.
|
| @@ -210,6 +242,7 @@ class BoundMultipartStream {
|
| if (_multipartController != null) {
|
| _multipartController.close();
|
| _multipartController = null;
|
| + _tryPropagateControllerState();
|
| }
|
| _state = _HEADER_START;
|
| break;
|
| @@ -282,11 +315,9 @@ class BoundMultipartStream {
|
| _expectByteValue(byte, CharCode.LF);
|
| _multipartController = new StreamController(
|
| sync: true,
|
| - onPause: () {
|
| - _pauseStream();
|
| - },
|
| + onPause: _subscription.pause,
|
| onResume: () {
|
| - _resumeStream();
|
| + _subscription.resume();
|
| _parse();
|
| });
|
| _controller.add(
|
| @@ -306,6 +337,8 @@ class BoundMultipartStream {
|
| _index--;
|
| }
|
| _multipartController.close();
|
| + _multipartController = null;
|
| + _tryPropagateControllerState();
|
| _boundaryIndex = 0;
|
| _state = _BOUNDARY_ENDING;
|
| }
|
| @@ -335,6 +368,7 @@ class BoundMultipartStream {
|
| if (_multipartController != null) {
|
| _multipartController.close();
|
| _multipartController = null;
|
| + _tryPropagateControllerState();
|
| }
|
| _state = _DONE;
|
| break;
|
| @@ -358,7 +392,7 @@ class BoundMultipartStream {
|
| if (_index == _buffer.length) {
|
| _buffer = null;
|
| _index = null;
|
| - _resumeStream();
|
| + _subscription.resume();
|
| }
|
| }
|
| }
|
|
|