Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(829)

Unified Diff: runtime/bin/file_impl.dart

Issue 8431028: Implement async file API on top of isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix type mismatch. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/file.dart ('k') | runtime/lib/array.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/file_impl.dart
diff --git a/runtime/bin/file_impl.dart b/runtime/bin/file_impl.dart
index 5e9d877834c0177de4f18b02c99febc4e2f695db..abf87c82d49c2132967195387099604f01d2d83a 100644
--- a/runtime/bin/file_impl.dart
+++ b/runtime/bin/file_impl.dart
@@ -114,187 +114,559 @@ class _FileOutputStream implements FileOutputStream {
}
+class _FileOperationIsolate extends Isolate {
+ static int EXISTS = 0;
+ static int OPEN = 1;
+ static int CLOSE = 2;
+ static int READ_BYTE = 3;
+ static int READ_LIST = 4;
+ static int WRITE_BYTE = 5;
+ static int WRITE_LIST = 6;
+ static int WRITE_STRING = 7;
+ static int POSITION = 8;
+ static int LENGTH = 9;
+ static int FLUSH = 10;
+ static int EXIT = 11;
+
+ _FileOperationIsolate() : super.heavy();
+
+ void handleOperation(Map message, SendPort ignored) {
+ switch (message["type"]) {
+ case EXISTS:
+ message["reply"].send(_File._exists(message["name"]),
+ port.toSendPort());
+ break;
+ case OPEN:
+ var name = message["name"];
+ var writable = message["writable"];
+ message["reply"].send(_File._open(name, writable),
+ port.toSendPort());
+ break;
+ case CLOSE:
+ message["reply"].send(_File._close(message["id"]),
+ port.toSendPort());
+ break;
+ case READ_BYTE:
+ message["reply"].send(_File._readByte(message["id"]),
+ port.toSendPort());
+ break;
+ case READ_LIST:
+ var replyPort = message["reply"];
+ var bytes = message["bytes"];
+ var offset = message["offset"];
+ var length = message["length"];
+ var id = message["id"];
+ if (bytes == 0) {
+ replyPort.send(0, port.toSendPort());
+ return;
+ }
+ int index = _File._checkReadWriteListArguments(length, offset, bytes);
+ if (index != 0) {
+ replyPort.send("index out of range in readList: $index",
+ port.toSendPort());
+ return;
+ }
+ var buffer = new List(bytes);
+ var result = { "read": _File._readList(id, buffer, 0, bytes),
+ "buffer": buffer };
+ replyPort.send(result, port.toSendPort());
+ break;
+ case WRITE_BYTE:
+ message["reply"].send(_File._writeByte(message["id"], message["value"]),
+ port.toSendPort());
+ break;
+ case WRITE_LIST:
+ var replyPort = message["reply"];
+ var buffer = message["buffer"];
+ var bytes = message["bytes"];
+ var offset = message["offset"];
+ var id = message["id"];
+ if (bytes == 0) {
+ replyPort.send(0, port.toSendPort());
+ return;
+ }
+ int index =
+ _File._checkReadWriteListArguments(buffer.length, offset, bytes);
+ if (index != 0) {
+ replyPort.send("index out of range in writeList: $index",
+ port.toSendPort());
+ return;
+ }
+ var result = _File._writeList(id, buffer, offset, bytes);
+ replyPort.send(result, port.toSendPort());
+ break;
+ case WRITE_STRING:
+ var id = message["id"];
+ var string = message["string"];
+ message["reply"].send(_File._writeString(id, string),
+ port.toSendPort());
+ break;
+ case POSITION:
+ message["reply"].send(_File._position(message["id"]),
+ port.toSendPort());
+ break;
+ case LENGTH:
+ message["reply"].send(_File._length(message["id"]),
+ port.toSendPort());
+ break;
+ case FLUSH:
+ message["reply"].send(_File._flush(message["id"]),
+ port.toSendPort());
+ break;
+ case EXIT:
+ port.close();
+ return;
+ }
+ port.receive(handleOperation);
+ }
+
+ void main() {
+ port.receive(handleOperation);
+ }
+}
+
+
+class _FileOperationScheduler {
+ _FileOperationScheduler() : _queue = new Queue();
+
+ void schedule(SendPort port) {
+ assert(_isolate != null);
+ if (_queue.isEmpty()) {
+ port.send({ "type": _FileOperationIsolate.EXIT });
+ _isolate = null;
+ } else {
+ port.send(_queue.removeFirst());
+ }
+ }
+
+ void scheduleWrap(void callback(result, ignored)) {
+ return (result, replyTo) {
+ callback(result, replyTo);
+ schedule(replyTo);
+ };
+ }
+
+ void enqueue(Map params, void callback(result, ignored)) {
+ ReceivePort replyPort = new ReceivePort.singleShot();
+ replyPort.receive(scheduleWrap(callback));
+ params["reply"] = replyPort.toSendPort();
+ _queue.addLast(params);
+ if (_isolate == null) {
+ _isolate = new _FileOperationIsolate();
+ _isolate.spawn().then((port) {
+ schedule(port);
+ });
+ }
+ }
+
+ bool noPendingWrite() {
+ int queuedWrites = 0;
+ _queue.forEach((map) {
+ if (_isWriteOperation(map["type"])) {
+ queuedWrites++;
+ }
+ });
+ return queuedWrites == 0;
+ }
+
+ bool _isWriteOperation(int type) {
+ return (type == _FileOperationIsolate.WRITE_BYTE) ||
+ (type == _FileOperationIsolate.WRITE_LIST) ||
+ (type == _FileOperationIsolate.WRITE_STRING);
+ }
+
+ Queue<Map> _queue;
+ _FileOperationIsolate _isolate;
+}
+
+
// Class for encapsulating the native implementation of files.
class _File implements File {
// Constructor for file.
- _File(String this._name);
+ _File(String this._name)
+ : _scheduler = new _FileOperationScheduler(),
+ _asyncUsed = false;
+
+ static bool _exists(String name) native "File_Exists";
+ static int _open(String name, bool writable) native "File_Open";
+ static int _close(int id) native "File_Close";
+ static int _readByte(int id) native "File_ReadByte";
+ static int _readList(int id, List<int> buffer, int offset, int bytes)
+ native "File_ReadList";
+ static int _writeByte(int id, int value) native "File_WriteByte";
+ static int _writeList(int id, List<int> buffer, int offset, int bytes)
+ native "File_WriteList";
+ static int _writeString(int id, String string) native "File_WriteString";
+ static int _position(int id) native "File_Position";
+ static int _length(int id) native "File_Length";
+ static int _flush(int id) native "File_Flush";
+
+ static int _checkReadWriteListArguments(int length, int offset, int bytes) {
+ if (offset < 0) return offset;
+ if (bytes < 0) return bytes;
+ if ((offset + bytes) > length) return offset + bytes;
+ return 0;
+ }
void exists() {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler =
+ (_existsHandler != null) ? _existsHandler : (result) => null;
+ Map params = {
+ "type": _FileOperationIsolate.EXISTS,
+ "name": _name
+ };
+ _scheduler.enqueue(params, (result, ignored) { _existsHandler(result); });
}
bool existsSync() {
- return _fileExists(_name);
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ return _exists(_name);
}
- bool _fileExists(String name) native "File_Exists";
-
void create() {
+ _asyncUsed = true;
throw "Unimplemented";
}
void createSync() {
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
throw "Unimplemented";
}
void open([bool writable = false]) {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler = (_openHandler != null) ? _openHandler : () => null;
+ var handleOpenResult = (result, ignored) {
+ if (result != 0) {
+ _id = result;
+ handler();
+ } else if (_errorHandler != null) {
+ _errorHandler("Cannot open file: $_name");
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.OPEN,
+ "name": _name,
+ "writable": writable
+ };
+ _scheduler.enqueue(params, handleOpenResult);
}
void openSync([bool writable = false]) {
- if (!_openFile(_name, writable)) {
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ _id = _open(_name, writable);
+ if (_id == 0) {
throw new FileIOException("Cannot open file: $_name");
}
}
- bool _openFile(String name, bool writable) native "File_OpenFile";
-
void close() {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler = (_closeHandler != null) ? _closeHandler : () => null;
+ var handleOpenResult = (result, ignored) {
+ if (result != -1) {
+ _id = result;
+ handler();
+ } else if (_errorHandler != null) {
+ _errorHandler("Cannot open file: $_name");
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.CLOSE,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleOpenResult);
}
void closeSync() {
- _close();
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ _id = _close(_id);
+ if (_id == -1) {
+ throw new FileIOException("Cannot close file: $_name");
+ }
}
- int _close() native "File_Close";
-
void readByte() {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler =
+ (_readByteHandler != null) ? _readByteHandler : (byte) => null;
+ var handleReadByteResult = (result, ignored) {
+ if (result != -1) {
+ handler(result);
+ } else if (_errorHandler != null) {
+ _errorHandler("readByte failed");
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.READ_BYTE,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleReadByteResult);
}
int readByteSync() {
- int result = _readByte();
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _readByte(_id);
if (result == -1) {
- throw new FileIOException("Error: readByte failed");
+ throw new FileIOException("readByte failed");
}
return result;
}
- int _readByte() native "File_ReadByte";
-
void readList(List<int> buffer, int offset, int bytes) {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler =
+ (_readListHandler != null) ? _readListHandler : (result) => null;
+ var handleReadListResult = (result, ignored) {
+ if (result is Map && result["read"] != -1) {
+ var read = result["read"];
+ buffer.setRange(offset, read, result["buffer"]);
+ handler(read);
+ return;
+ }
+ if (_errorHandler != null) {
+ _errorHandler(result is String ? result : "readList failed");
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.READ_LIST,
+ "length": buffer.length,
+ "offset": offset,
+ "bytes": bytes,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleReadListResult);
}
int readListSync(List<int> buffer, int offset, int bytes) {
- if (bytes == 0) {
- return 0;
- }
- if (offset < 0) {
- throw new IndexOutOfRangeException(offset);
- }
- if (bytes < 0) {
- throw new IndexOutOfRangeException(bytes);
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
}
- if ((offset + bytes) > buffer.length) {
- throw new IndexOutOfRangeException(offset + bytes);
+ if (bytes == 0) return 0;
+ int index = _checkReadWriteListArguments(buffer.length, offset, bytes);
+ if (index != 0) {
+ throw new IndexOutOfRangeException(index);
}
- int result = _readList(buffer, offset, bytes);
+ int result = _readList(_id, buffer, offset, bytes);
if (result == -1) {
- throw new FileIOException("Error: readList failed");
+ throw new FileIOException("readList failed");
}
return result;
}
- int _readList(List<int> buffer, int offset, int bytes) native "File_ReadList";
+ void _checkPendingWrites() {
+ if (_scheduler.noPendingWrite() && _noPendingWriteHandler != null) {
+ _noPendingWriteHandler();
+ }
+ }
void writeByte(int value) {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handleReadByteResult = (result, ignored) {
+ if (result == -1 &&_errorHandler != null) {
+ _errorHandler("writeByte failed");
+ return;
+ }
+ _checkPendingWrites();
+ };
+ Map params = {
+ "type": _FileOperationIsolate.WRITE_BYTE,
+ "value": value,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleReadByteResult);
}
int writeByteSync(int value) {
- int result = _writeByte(value);
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _writeByte(_id, value);
if (result == -1) {
- throw new FileIOException("Error: writeByte failed");
+ throw new FileIOException("writeByte failed");
}
return result;
}
- int _writeByte(int value) native "File_WriteByte";
-
- int writeList(List<int> buffer, int offset, int bytes) {
- throw "Unimplemented";
+ void writeList(List<int> buffer, int offset, int bytes) {
+ _asyncUsed = true;
+ var handleWriteListResult = (result, ignored) {
+ if (result is !String && result != -1) {
+ if (result < bytes) {
+ writeList(buffer, offset + result, bytes - result);
+ } else {
+ _checkPendingWrites();
+ }
+ return;
+ }
+ if (_errorHandler != null) {
+ _errorHandler(result is String ? result : "writeList failed");
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.WRITE_LIST,
+ "buffer": buffer,
+ "offset": offset,
+ "bytes": bytes,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleWriteListResult);
}
int writeListSync(List<int> buffer, int offset, int bytes) {
- if (bytes == 0) {
- return 0;
- }
- if (offset < 0) {
- throw new IndexOutOfRangeException(offset);
- }
- if (bytes < 0) {
- throw new IndexOutOfRangeException(bytes);
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
}
- if ((offset + bytes) > buffer.length) {
- throw new IndexOutOfRangeException(offset + bytes);
+ if (bytes == 0) return 0;
+ int index = _checkReadWriteListArguments(buffer.length, offset, bytes);
+ if (index != 0) {
+ throw new IndexOutOfRangeException(index);
}
- int result = _writeList(buffer, offset, bytes);
+ int result = _writeList(_id, buffer, offset, bytes);
if (result == -1) {
- throw new FileIOException("Error: writeList failed");
+ throw new FileIOException("writeList failed");
}
return result;
}
- int _writeList(List<int> buffer, int offset, int bytes)
- native "File_WriteList";
-
- int writeString(String string) {
- throw "Unimplemented";
+ void writeString(String string) {
+ _asyncUsed = true;
+ var handleWriteStringResult = (result, ignored) {
+ if (result == -1 &&_errorHandler != null) {
+ _errorHandler("writeString failed");
+ return;
+ }
+ if (result < string.length) {
+ writeString(string.substring(result));
+ } else {
+ _checkPendingWrites();
+ }
+ };
+ Map params = {
+ "type": _FileOperationIsolate.WRITE_STRING,
+ "string": string,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleWriteStringResult);
}
int writeStringSync(String string) {
- int result = _writeString(string);
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _writeString(_id, string);
if (result == -1) {
throw new FileIOException("Error: writeString failed");
}
return result;
}
- int _writeString(String string) native "File_WriteString";
-
- int position() {
- throw "Unimplemented";
+ void position() {
+ _asyncUsed = true;
+ var handler = (_positionHandler != null) ? _positionHandler : (pos) => null;
+ var handlePositionResult = (result, ignored) {
+ if (result == -1 && _errorHandler != null) {
+ _errorHandler("position failed");
+ return;
+ }
+ handler(result);
+ };
+ Map params = {
+ "type": _FileOperationIsolate.POSITION,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handlePositionResult);
}
int positionSync() {
- int result = _position;
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _position(_id);
if (result == -1) {
- throw new FileIOException("Error: get position failed");
+ throw new FileIOException("position failed");
}
return result;
}
- int get _position() native "File_Position";
-
- int length() {
- throw "Unimplemented";
+ void length() {
+ _asyncUsed = true;
+ var handler = (_lengthHandler != null) ? _lengthHandler : (pos) => null;
+ var handleLengthResult = (result, ignored) {
+ if (result == -1 && _errorHandler != null) {
+ _errorHandler("length failed");
+ return;
+ }
+ handler(result);
+ };
+ Map params = {
+ "type": _FileOperationIsolate.LENGTH,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleLengthResult);
}
int lengthSync() {
- int result = _length;
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _length(_id);
if (result == -1) {
- throw new FileIOException("Error: get length failed");
+ throw new FileIOException("length failed");
}
return result;
}
- int get _length() native "File_Length";
-
void flush() {
- throw "Unimplemented";
+ _asyncUsed = true;
+ var handler = (_flushHandler != null) ? _flushHandler : (pos) => null;
+ var handleFlushResult = (result, ignored) {
+ if (result == -1 && _errorHandler != null) {
+ _errorHandler("flush failed");
+ return;
+ }
+ handler();
+ };
+ Map params = {
+ "type": _FileOperationIsolate.FLUSH,
+ "id": _id
+ };
+ _scheduler.enqueue(params, handleFlushResult);
}
void flushSync() {
- int result = _flush();
+ if (_asyncUsed) {
+ throw new FileIOException(
+ "Mixed use of synchronous and asynchronous API");
+ }
+ int result = _flush(_id);
if (result == -1) {
- throw new FileIOException("Error: flush failed");
+ throw new FileIOException("flush failed");
}
}
- int _flush() native "File_Flush";
-
InputStream openInputStream() {
return new _FileInputStream(this);
}
@@ -335,12 +707,27 @@ class _File implements File {
_noPendingWriteHandler = handler;
}
+ void set positionHandler(void handler(int pos)) {
+ _positionHandler = handler;
+ }
+
+ void set lengthHandler(void handler(int length)) {
+ _lengthHandler = handler;
+ }
+
+ void set flushHandler(void handler()) {
+ _flushHandler = handler;
+ }
+
void set errorHandler(void handler(String error)) {
_errorHandler = handler;
}
String _name;
int _id;
+ bool _asyncUsed;
+
+ _FileOperationScheduler _scheduler;
var _existsHandler;
var _createHandler;
@@ -349,5 +736,8 @@ class _File implements File {
var _readByteHandler;
var _readListHandler;
var _noPendingWriteHandler;
+ var _positionHandler;
+ var _lengthHandler;
+ var _flushHandler;
var _errorHandler;
}
« no previous file with comments | « runtime/bin/file.dart ('k') | runtime/lib/array.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698