Index: lib/src/client.dart |
diff --git a/lib/src/client.dart b/lib/src/client.dart |
index e75e7f38c0041f64bab8a4a90f29416c2a914ac3..5df21e438c0e8423b3ef0101bd77310f285eb379 100644 |
--- a/lib/src/client.dart |
+++ b/lib/src/client.dart |
@@ -5,9 +5,10 @@ |
import 'dart:async'; |
import 'package:stack_trace/stack_trace.dart'; |
+import 'package:stream_channel/stream_channel.dart'; |
+import 'channel_manager.dart'; |
import 'exception.dart'; |
-import 'two_way_stream.dart'; |
import 'utils.dart'; |
/// A JSON-RPC 2.0 client. |
@@ -16,7 +17,7 @@ import 'utils.dart'; |
/// those method calls. Methods can be called with [sendRequest], or with |
/// [sendNotification] if no response is expected. |
class Client { |
- final TwoWayStream _streams; |
+ final ChannelManager _manager; |
/// The next request id. |
var _id = 0; |
@@ -29,55 +30,53 @@ class Client { |
/// The map of request ids to pending requests. |
final _pendingRequests = new Map<int, _Request>(); |
- /// Returns a [Future] that completes when the connection is closed. |
+ /// Returns a [Future] that completes when the underlying connection is |
+ /// closed. |
/// |
- /// This is the same future that's returned by [listen]. |
- Future get done => _streams.done; |
+ /// This is the same future that's returned by [listen] and [close]. It may |
+ /// complete before [close] is called if the remote endpoint closes the |
+ /// connection. |
+ Future get done => _manager.done; |
- /// Whether the connection is closed. |
- bool get isClosed => _streams.isClosed; |
- |
- /// Creates a [Client] that writes requests to [requests] and reads responses |
- /// from [responses]. |
+ /// Whether the underlying connection is closed. |
/// |
- /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a |
- /// `WebSocket`), [requests] may be omitted. |
+ /// Note that this will be `true` before [close] is called if the remote |
+ /// endpoint closes the connection. |
+ bool get isClosed => _manager.isClosed; |
+ |
+ /// Creates a [Client] that communicates over [channel]. |
/// |
/// Note that the client won't begin listening to [responses] until |
/// [Client.listen] is called. |
- Client(Stream<String> responses, [StreamSink<String> requests]) |
- : _streams = new TwoWayStream( |
- "Client", responses, "responses", requests, "requests"); |
+ Client(StreamChannel<String> channel) |
+ : this.withoutJson(channel |
+ .transform(jsonDocument) |
+ .transformStream(ignoreFormatExceptions)); |
- /// Creates a [Client] that writes decoded responses to [responses] and reads |
- /// decoded requests from [requests]. |
+ /// Creates a [Client] that communicates using decoded messages over |
+ /// [channel]. |
/// |
/// Unlike [new Client], this doesn't read or write JSON strings. Instead, it |
/// reads and writes decoded maps or lists. |
/// |
- /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be |
- /// omitted. |
- /// |
/// Note that the client won't begin listening to [responses] until |
/// [Client.listen] is called. |
- Client.withoutJson(Stream responses, [StreamSink requests]) |
- : _streams = new TwoWayStream.withoutJson( |
- "Client", responses, "responses", requests, "requests"); |
+ Client.withoutJson(StreamChannel channel) |
+ : _manager = new ChannelManager("Client", channel); |
/// Starts listening to the underlying stream. |
/// |
- /// Returns a [Future] that will complete when the stream is closed or when it |
- /// has an error. |
+ /// Returns a [Future] that will complete when the connection is closed or |
+ /// when it has an error. This is the same as [done]. |
/// |
/// [listen] may only be called once. |
- Future listen() => _streams.listen(_handleResponse); |
+ Future listen() => _manager.listen(_handleResponse); |
- /// Closes the server's request sink and response subscription. |
+ /// Closes the underlying connection. |
/// |
/// Returns a [Future] that completes when all resources have been released. |
- /// |
- /// A client can't be closed before [listen] has been called. |
- Future close() => _streams.close(); |
+ /// This is the same as [done]. |
+ Future close() => _manager.close(); |
/// Sends a JSON-RPC 2 request to invoke the given [method]. |
/// |
@@ -132,7 +131,7 @@ class Client { |
if (_batch != null) { |
_batch.add(message); |
} else { |
- _streams.add(message); |
+ _manager.add(message); |
} |
} |
@@ -153,7 +152,7 @@ class Client { |
_batch = []; |
return tryFinally(callback, () { |
- _streams.add(_batch); |
+ _manager.add(_batch); |
_batch = null; |
}); |
} |