Index: dart/pkg/mime/lib/src/bound_multipart_stream.dart |
diff --git a/dart/pkg/mime/lib/src/bound_multipart_stream.dart b/dart/pkg/mime/lib/src/bound_multipart_stream.dart |
index a049b1d11e9859f7d45ff3dc6a6112b6838b74ab..7d16b965178875c4fc0c175dbdf6817edbff8df0 100644 |
--- a/dart/pkg/mime/lib/src/bound_multipart_stream.dart |
+++ b/dart/pkg/mime/lib/src/bound_multipart_stream.dart |
@@ -74,6 +74,16 @@ class BoundMultipartStream { |
final List<int> _headerField = []; |
final List<int> _headerValue = []; |
+ // The following states belong to `_controller`, state changes will not be |
+ // immediately acted upon but rather only after the current |
+ // `_multipartController` is done. |
+ static const int _CONTROLLER_STATE_IDLE = 0; |
+ static const int _CONTROLLER_STATE_ACTIVE = 1; |
+ static const int _CONTROLLER_STATE_PAUSED = 2; |
+ static const int _CONTROLLER_STATE_CANCELED = 3; |
+ |
+ int _controllerState = _CONTROLLER_STATE_IDLE; |
+ |
StreamController _controller; |
Stream<MimeMultipart> get stream => _controller.stream; |
@@ -97,13 +107,15 @@ class BoundMultipartStream { |
onPause: _pauseStream, |
onResume:_resumeStream, |
onCancel: () { |
- _subscription.cancel(); |
+ _controllerState = _CONTROLLER_STATE_CANCELED; |
+ _tryPropagateControllerState(); |
}, |
onListen: () { |
+ _controllerState = _CONTROLLER_STATE_ACTIVE; |
_subscription = stream.listen( |
(data) { |
assert(_buffer == null); |
- _pauseStream(); |
+ _subscription.pause(); |
_buffer = data; |
_index = 0; |
_parse(); |
@@ -120,13 +132,33 @@ class BoundMultipartStream { |
} |
void _resumeStream() { |
- _subscription.resume(); |
+ assert (_controllerState == _CONTROLLER_STATE_PAUSED); |
+ _controllerState = _CONTROLLER_STATE_ACTIVE; |
+ _tryPropagateControllerState(); |
} |
void _pauseStream() { |
- _subscription.pause(); |
+ _controllerState = _CONTROLLER_STATE_PAUSED; |
+ _tryPropagateControllerState(); |
} |
+ void _tryPropagateControllerState() { |
+ if (_multipartController == null) { |
+ switch (_controllerState) { |
+ case _CONTROLLER_STATE_ACTIVE: |
+ if (_subscription.isPaused) _subscription.resume(); |
+ break; |
+ case _CONTROLLER_STATE_PAUSED: |
+ if (!_subscription.isPaused) _subscription.pause(); |
+ break; |
+ case _CONTROLLER_STATE_CANCELED: |
+ _subscription.cancel(); |
+ break; |
+ default: |
+ throw new StateError("This code should never be reached."); |
+ } |
+ } |
+ } |
void _parse() { |
// Number of boundary bytes to artificially place before the supplied data. |
@@ -210,6 +242,7 @@ class BoundMultipartStream { |
if (_multipartController != null) { |
_multipartController.close(); |
_multipartController = null; |
+ _tryPropagateControllerState(); |
} |
_state = _HEADER_START; |
break; |
@@ -282,11 +315,9 @@ class BoundMultipartStream { |
_expectByteValue(byte, CharCode.LF); |
_multipartController = new StreamController( |
sync: true, |
- onPause: () { |
- _pauseStream(); |
- }, |
+ onPause: _subscription.pause, |
onResume: () { |
- _resumeStream(); |
+ _subscription.resume(); |
_parse(); |
}); |
_controller.add( |
@@ -306,6 +337,8 @@ class BoundMultipartStream { |
_index--; |
} |
_multipartController.close(); |
+ _multipartController = null; |
+ _tryPropagateControllerState(); |
_boundaryIndex = 0; |
_state = _BOUNDARY_ENDING; |
} |
@@ -335,6 +368,7 @@ class BoundMultipartStream { |
if (_multipartController != null) { |
_multipartController.close(); |
_multipartController = null; |
+ _tryPropagateControllerState(); |
} |
_state = _DONE; |
break; |
@@ -358,7 +392,7 @@ class BoundMultipartStream { |
if (_index == _buffer.length) { |
_buffer = null; |
_index = null; |
- _resumeStream(); |
+ _subscription.resume(); |
} |
} |
} |