Index: sdk/lib/io/directory_impl.dart |
diff --git a/sdk/lib/io/directory_impl.dart b/sdk/lib/io/directory_impl.dart |
index cdb6df479a6914240df87f22c7736de07ca9bb02..f63ab8593ce574a03702c966f9f0b2b9783a0603 100644 |
--- a/sdk/lib/io/directory_impl.dart |
+++ b/sdk/lib/io/directory_impl.dart |
@@ -9,8 +9,10 @@ class _Directory implements Directory { |
static const DELETE_REQUEST = 1; |
static const EXISTS_REQUEST = 2; |
static const CREATE_TEMP_REQUEST = 3; |
- static const LIST_REQUEST = 4; |
- static const RENAME_REQUEST = 5; |
+ static const LIST_START_REQUEST = 4; |
+ static const LIST_NEXT_REQUEST = 5; |
+ static const LIST_STOP_REQUEST = 6; |
+ static const RENAME_REQUEST = 7; |
_Directory(String this._path); |
_Directory.fromPath(Path path) : this(path.toNativePath()); |
@@ -251,55 +253,113 @@ class _Directory implements Directory { |
const int RESPONSE_COMPLETE = 1; |
const int RESPONSE_ERROR = 2; |
- var controller = new StreamController<FileSystemEntity>(sync: true); |
- |
- List request = [ _Directory.LIST_REQUEST, path, recursive, followLinks ]; |
- ReceivePort responsePort = new ReceivePort(); |
- // Use a separate directory service port for each listing as |
- // listing operations on the same directory can run in parallel. |
- _Directory._newServicePort().send(request, responsePort.toSendPort()); |
- responsePort.receive((message, replyTo) { |
- if (message is !List || message[RESPONSE_TYPE] is !int) { |
- responsePort.close(); |
- controller.addError(new DirectoryException("Internal error")); |
+ int id; |
+ |
+ Function next; |
+ Function close; |
+ |
+ void onResume() { |
+ if (id != null) next(); |
+ } |
+ |
+ bool nextRunning = false; |
+ bool canceled = false; |
+ |
+ void onCancel() { |
+ canceled = true; |
+ // If we are active, but not requesting, close. |
+ if (id != null && !nextRunning) { |
+ close(); |
+ } |
+ } |
+ |
+ var controller = new StreamController<FileSystemEntity>(sync: true, |
+ onResume: onResume, |
+ onCancel: onCancel); |
+ |
+ bool closed = false; |
+ close = () { |
+ if (closed) return; |
+ closed = true; |
+ _Directory._newServicePort().call([_Directory.LIST_STOP_REQUEST, id]) |
+ .then((_) { |
+ controller.close(); |
+ }); |
+ }; |
+ |
+ void error(message) { |
+ var errorType = |
+ message[RESPONSE_ERROR][_ERROR_RESPONSE_ERROR_TYPE]; |
+ if (errorType == _ILLEGAL_ARGUMENT_RESPONSE) { |
+ controller.addError(new ArgumentError()); |
+ } else if (errorType == _OSERROR_RESPONSE) { |
+ var responseError = message[RESPONSE_ERROR]; |
+ var err = new OSError( |
+ responseError[_OSERROR_RESPONSE_MESSAGE], |
+ responseError[_OSERROR_RESPONSE_ERROR_CODE]); |
+ var errorPath = message[RESPONSE_PATH]; |
+ if (errorPath == null) errorPath = path; |
+ controller.addError( |
+ new DirectoryException("Directory listing failed", |
+ errorPath, |
+ err)); |
+ } else { |
+ controller.addError( |
+ new DirectoryException("Internal error")); |
+ } |
+ } |
+ |
+ next = () { |
+ if (canceled) { |
+ close(); |
return; |
} |
- switch (message[RESPONSE_TYPE]) { |
- case LIST_FILE: |
- controller.add(new File(message[RESPONSE_PATH])); |
- break; |
- case LIST_DIRECTORY: |
- controller.add(new Directory(message[RESPONSE_PATH])); |
- break; |
- case LIST_LINK: |
- controller.add(new Link(message[RESPONSE_PATH])); |
- break; |
- case LIST_ERROR: |
- var errorType = |
- message[RESPONSE_ERROR][_ERROR_RESPONSE_ERROR_TYPE]; |
- if (errorType == _ILLEGAL_ARGUMENT_RESPONSE) { |
- controller.addError(new ArgumentError()); |
- } else if (errorType == _OSERROR_RESPONSE) { |
- var responseError = message[RESPONSE_ERROR]; |
- var err = new OSError( |
- responseError[_OSERROR_RESPONSE_MESSAGE], |
- responseError[_OSERROR_RESPONSE_ERROR_CODE]); |
- var errorPath = message[RESPONSE_PATH]; |
- if (errorPath == null) errorPath = path; |
- controller.addError( |
- new DirectoryException("Directory listing failed", |
- errorPath, |
- err)); |
+ if (nextRunning) return; |
+ nextRunning = true; |
+ _Directory._newServicePort().call([_Directory.LIST_NEXT_REQUEST, id]) |
+ .then((result) { |
+ if (result is List) { |
+ for (var message in result) { |
+ if (message == null) break; |
+ switch (message[RESPONSE_TYPE]) { |
+ case LIST_FILE: |
+ controller.add(new File(message[RESPONSE_PATH])); |
+ break; |
+ case LIST_DIRECTORY: |
+ controller.add(new Directory(message[RESPONSE_PATH])); |
+ break; |
+ case LIST_LINK: |
+ controller.add(new Link(message[RESPONSE_PATH])); |
+ break; |
+ case LIST_ERROR: |
+ error(message); |
+ break; |
+ case LIST_DONE: |
+ close(); |
+ return; |
+ } |
+ } |
+ } else { |
+ controller.addError(new DirectoryException("Internal error")); |
+ } |
+ nextRunning = false; |
+ if (!controller.isPaused) { |
+ next(); |
+ } |
+ }); |
+ }; |
+ |
+ var request = [_Directory.LIST_START_REQUEST, path, recursive, followLinks]; |
+ _Directory._newServicePort().call(request) |
+ .then((response) { |
+ if (response is int) { |
+ id = response; |
+ next(); |
} else { |
- controller.addError(new DirectoryException("Internal error")); |
+ error(response); |
+ controller.close(); |
} |
- break; |
- case LIST_DONE: |
- responsePort.close(); |
- controller.close(); |
- break; |
- } |
- }); |
+ }); |
return controller.stream; |
} |