Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(889)

Side by Side Diff: sdk/lib/io/directory_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 class _Directory implements Directory { 7 class _Directory implements Directory {
8 static const CREATE_REQUEST = 0; 8 static const CREATE_REQUEST = 0;
9 static const DELETE_REQUEST = 1; 9 static const DELETE_REQUEST = 1;
10 static const EXISTS_REQUEST = 2; 10 static const EXISTS_REQUEST = 2;
(...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after
315 final String path; 315 final String path;
316 final bool recursive; 316 final bool recursive;
317 final bool followLinks; 317 final bool followLinks;
318 318
319 StreamController controller; 319 StreamController controller;
320 int id; 320 int id;
321 bool canceled = false; 321 bool canceled = false;
322 bool nextRunning = false; 322 bool nextRunning = false;
323 bool closed = false; 323 bool closed = false;
324 324
325 var pending;
326 var cancelCompleter;
327
325 _AsyncDirectoryLister(String this.path, 328 _AsyncDirectoryLister(String this.path,
326 bool this.recursive, 329 bool this.recursive,
327 bool this.followLinks) { 330 bool this.followLinks) {
328 controller = new StreamController(onListen: onListen, 331 controller = new StreamController(onListen: onListen,
329 onResume: onResume, 332 onResume: onResume,
330 onCancel: onCancel, 333 onCancel: onCancel,
331 sync: true); 334 sync: true);
332 } 335 }
333 336
334 Stream get stream => controller.stream; 337 Stream get stream => controller.stream;
335 338
336 void onListen() { 339 void onListen() {
337 var request = [_Directory.LIST_START_REQUEST, path, recursive, followLinks]; 340 var request = [_Directory.LIST_START_REQUEST, path, recursive, followLinks];
338 _Directory._newServicePort().call(request) 341 _Directory._newServicePort().call(request)
339 .then((response) { 342 .then((response) {
340 if (response is int) { 343 if (response is int) {
341 id = response; 344 id = response;
342 next(); 345 next();
343 } else { 346 } else {
344 error(response); 347 error(response);
345 controller.close(); 348 controller.close();
346 } 349 }
347 }); 350 });
348 } 351 }
349 352
350 void onResume() { 353 void onResume() {
351 if (!nextRunning) next(); 354 handlePending();
352 } 355 }
353 356
354 void onCancel() { 357 onCancel() {
355 canceled = true; 358 canceled = true;
356 // If we are active, but not requesting, close. 359 // If we are active, but not requesting, close.
357 if (!nextRunning) { 360 cancelCompleter = new Completer();
358 close(); 361 close();
359 } 362 return cancelCompleter.future;
360 } 363 }
361 364
362 void next() { 365 void next() {
363 if (canceled) { 366 if (canceled) {
364 close(); 367 close();
365 return; 368 return;
366 } 369 }
370 if (nextRunning) return;
367 if (id == null) return; 371 if (id == null) return;
368 if (controller.isPaused) return; 372 if (pending != null) return;
369 assert(!nextRunning);
370 nextRunning = true; 373 nextRunning = true;
371 _Directory._newServicePort().call([_Directory.LIST_NEXT_REQUEST, id]) 374 _Directory._newServicePort().call([_Directory.LIST_NEXT_REQUEST, id])
372 .then((result) { 375 .then((result) {
373 if (result is List) {
374 assert(result.length % 2 == 0);
375 for (int i = 0; i < result.length; i++) {
376 assert(i % 2 == 0);
377 switch (result[i++]) {
378 case LIST_FILE:
379 controller.add(new File(result[i]));
380 break;
381 case LIST_DIRECTORY:
382 controller.add(new Directory(result[i]));
383 break;
384 case LIST_LINK:
385 controller.add(new Link(result[i]));
386 break;
387 case LIST_ERROR:
388 error(result[i]);
389 break;
390 case LIST_DONE:
391 close();
392 return;
393 }
394 }
395 } else {
396 controller.addError(new DirectoryException("Internal error"));
397 }
398 nextRunning = false; 376 nextRunning = false;
399 next(); 377 pending = result;
378 handlePending();
400 }); 379 });
401 } 380 }
402 381
382 void handlePending() {
383 if (canceled) {
384 close();
385 return;
386 }
387 if (pending != null) {
388 if (controller.isPaused) return;
389 var result = pending;
390 pending = null;
391 next();
392 if (result is List) {
393 assert(result.length % 2 == 0);
394 for (int i = 0; i < result.length; i++) {
395 assert(i % 2 == 0);
396 switch (result[i++]) {
397 case LIST_FILE:
398 controller.add(new File(result[i]));
399 break;
400 case LIST_DIRECTORY:
401 controller.add(new Directory(result[i]));
402 break;
403 case LIST_LINK:
404 controller.add(new Link(result[i]));
405 break;
406 case LIST_ERROR:
407 error(result[i]);
408 break;
409 case LIST_DONE:
410 close();
411 return;
412 }
413 }
414 } else {
415 controller.addError(new DirectoryException("Internal error"));
416 }
417 } else {
418 next();
419 }
420 }
421
403 void close() { 422 void close() {
423 if (nextRunning) return; // Wait for it to complete.
404 if (closed) return; 424 if (closed) return;
405 if (id == null) return; 425 if (id == null) return;
406 closed = true; 426 closed = true;
407 _Directory._newServicePort().call([_Directory.LIST_STOP_REQUEST, id]) 427 _Directory._newServicePort().call([_Directory.LIST_STOP_REQUEST, id])
408 .then((_) { 428 .then((_) {
409 controller.close(); 429 if (canceled) {
430 cancelCompleter.complete();
431 } else {
432 controller.close();
433 }
410 }); 434 });
411 } 435 }
412 436
413 void error(message) { 437 void error(message) {
414 var errorType = 438 var errorType =
415 message[RESPONSE_ERROR][_ERROR_RESPONSE_ERROR_TYPE]; 439 message[RESPONSE_ERROR][_ERROR_RESPONSE_ERROR_TYPE];
416 if (errorType == _ILLEGAL_ARGUMENT_RESPONSE) { 440 if (errorType == _ILLEGAL_ARGUMENT_RESPONSE) {
417 controller.addError(new ArgumentError()); 441 controller.addError(new ArgumentError());
418 } else if (errorType == _OSERROR_RESPONSE) { 442 } else if (errorType == _OSERROR_RESPONSE) {
419 var responseError = message[RESPONSE_ERROR]; 443 var responseError = message[RESPONSE_ERROR];
420 var err = new OSError( 444 var err = new OSError(
421 responseError[_OSERROR_RESPONSE_MESSAGE], 445 responseError[_OSERROR_RESPONSE_MESSAGE],
422 responseError[_OSERROR_RESPONSE_ERROR_CODE]); 446 responseError[_OSERROR_RESPONSE_ERROR_CODE]);
423 var errorPath = message[RESPONSE_PATH]; 447 var errorPath = message[RESPONSE_PATH];
424 if (errorPath == null) errorPath = path; 448 if (errorPath == null) errorPath = path;
425 controller.addError( 449 controller.addError(
426 new DirectoryException("Directory listing failed", 450 new DirectoryException("Directory listing failed",
427 errorPath, 451 errorPath,
428 err)); 452 err));
429 } else { 453 } else {
430 controller.addError( 454 controller.addError(
431 new DirectoryException("Internal error")); 455 new DirectoryException("Internal error"));
432 } 456 }
433 } 457 }
434 } 458 }
OLDNEW
« sdk/lib/async/stream_impl.dart ('K') | « sdk/lib/async/stream_impl.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698