Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(934)

Side by Side Diff: ipc/ipc_channel_mojo_unittest.cc

Issue 2301123004: Mojo Channel: Fix deferred proxy dispatch; support paused channels (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_mojo.h" 5 #include "ipc/ipc_channel_mojo.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <memory> 10 #include <memory>
(...skipping 721 matching lines...) Expand 10 before | Expand all | Expand 10 after
732 base::WaitableEvent::InitialState::NOT_SIGNALED) { 732 base::WaitableEvent::InitialState::NOT_SIGNALED) {
733 } 733 }
734 734
735 void CreateProxy(IPC::Listener* listener) { 735 void CreateProxy(IPC::Listener* listener) {
736 io_thread_.StartWithOptions( 736 io_thread_.StartWithOptions(
737 base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); 737 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
738 proxy_ = IPC::SyncChannel::Create( 738 proxy_ = IPC::SyncChannel::Create(
739 listener, io_thread_.task_runner(), &never_signaled_); 739 listener, io_thread_.task_runner(), &never_signaled_);
740 } 740 }
741 741
742 void RunProxy() { 742 void RunProxy(bool create_paused) {
743 std::unique_ptr<IPC::ChannelFactory> factory; 743 std::unique_ptr<IPC::ChannelFactory> factory;
744 if (for_server_) { 744 if (for_server_) {
745 factory = IPC::ChannelMojo::CreateServerFactory( 745 factory = IPC::ChannelMojo::CreateServerFactory(
746 std::move(handle_), io_thread_.task_runner()); 746 std::move(handle_), io_thread_.task_runner());
747 } else { 747 } else {
748 factory = IPC::ChannelMojo::CreateClientFactory( 748 factory = IPC::ChannelMojo::CreateClientFactory(
749 std::move(handle_), io_thread_.task_runner()); 749 std::move(handle_), io_thread_.task_runner());
750 } 750 }
751 proxy_->Init(std::move(factory), true); 751 proxy_->Init(std::move(factory), true, create_paused);
752 } 752 }
753 753
754 IPC::ChannelProxy* proxy() { return proxy_.get(); } 754 IPC::ChannelProxy* proxy() { return proxy_.get(); }
755 755
756 private: 756 private:
757 const bool for_server_; 757 const bool for_server_;
758 758
759 mojo::ScopedMessagePipeHandle handle_; 759 mojo::ScopedMessagePipeHandle handle_;
760 base::Thread io_thread_; 760 base::Thread io_thread_;
761 std::unique_ptr<IPC::ChannelProxy> proxy_; 761 std::unique_ptr<IPC::ChannelProxy> proxy_;
762 base::WaitableEvent never_signaled_; 762 base::WaitableEvent never_signaled_;
763 763
764 DISALLOW_COPY_AND_ASSIGN(ChannelProxyRunner); 764 DISALLOW_COPY_AND_ASSIGN(ChannelProxyRunner);
765 }; 765 };
766 766
767 class IPCChannelProxyMojoTest : public IPCChannelMojoTestBase { 767 class IPCChannelProxyMojoTest : public IPCChannelMojoTestBase {
768 public: 768 public:
769 void InitWithMojo(const std::string& client_name) { 769 void InitWithMojo(const std::string& client_name) {
770 IPCChannelMojoTestBase::InitWithMojo(client_name); 770 IPCChannelMojoTestBase::InitWithMojo(client_name);
771 runner_.reset(new ChannelProxyRunner(TakeHandle(), true)); 771 runner_.reset(new ChannelProxyRunner(TakeHandle(), true));
772 } 772 }
773 void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); } 773 void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
774 void RunProxy() { runner_->RunProxy(); } 774 void RunProxy(bool create_paused = false) {
775 runner_->RunProxy(create_paused);
776 }
775 void DestroyProxy() { 777 void DestroyProxy() {
776 runner_.reset(); 778 runner_.reset();
777 base::RunLoop().RunUntilIdle(); 779 base::RunLoop().RunUntilIdle();
778 } 780 }
779 781
780 IPC::ChannelProxy* proxy() { return runner_->proxy(); } 782 IPC::ChannelProxy* proxy() { return runner_->proxy(); }
781 783
782 private: 784 private:
783 base::MessageLoop message_loop_; 785 base::MessageLoop message_loop_;
784 std::unique_ptr<ChannelProxyRunner> runner_; 786 std::unique_ptr<ChannelProxyRunner> runner_;
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
869 } 871 }
870 872
871 class ChannelProxyClient { 873 class ChannelProxyClient {
872 public: 874 public:
873 void Init(mojo::ScopedMessagePipeHandle handle) { 875 void Init(mojo::ScopedMessagePipeHandle handle) {
874 runner_.reset(new ChannelProxyRunner(std::move(handle), false)); 876 runner_.reset(new ChannelProxyRunner(std::move(handle), false));
875 } 877 }
876 878
877 void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); } 879 void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
878 880
879 void RunProxy() { runner_->RunProxy(); } 881 void RunProxy() { runner_->RunProxy(false); }
880 882
881 void DestroyProxy() { 883 void DestroyProxy() {
882 runner_.reset(); 884 runner_.reset();
883 base::RunLoop().RunUntilIdle(); 885 base::RunLoop().RunUntilIdle();
884 } 886 }
885 887
886 void RequestQuitAndWaitForAck(IPC::mojom::SimpleTestDriver* driver) { 888 void RequestQuitAndWaitForAck(IPC::mojom::SimpleTestDriver* driver) {
887 base::RunLoop loop; 889 base::RunLoop loop;
888 driver->RequestQuit(loop.QuitClosure()); 890 driver->RequestQuit(loop.QuitClosure());
889 loop.Run(); 891 loop.Run();
(...skipping 26 matching lines...) Expand all
916 ++i) { 918 ++i) {
917 driver->ExpectValue(i); 919 driver->ExpectValue(i);
918 SendValue(proxy(), i); 920 SendValue(proxy(), i);
919 } 921 }
920 driver->RequestQuit(base::MessageLoop::QuitWhenIdleClosure()); 922 driver->RequestQuit(base::MessageLoop::QuitWhenIdleClosure());
921 base::RunLoop().Run(); 923 base::RunLoop().Run();
922 924
923 DestroyProxy(); 925 DestroyProxy();
924 } 926 }
925 927
928 class ListenerWithIndirectProxyAssociatedInterface
929 : public IPC::Listener,
930 public IPC::mojom::IndirectTestDriver,
931 public IPC::mojom::PingReceiver {
932 public:
933 ListenerWithIndirectProxyAssociatedInterface()
934 : driver_binding_(this), ping_receiver_binding_(this) {}
935 ~ListenerWithIndirectProxyAssociatedInterface() override {}
936
937 bool OnMessageReceived(const IPC::Message& message) override { return true; }
938
939 void RegisterInterfaceFactory(IPC::ChannelProxy* proxy) {
940 proxy->AddAssociatedInterface(
941 base::Bind(&ListenerWithIndirectProxyAssociatedInterface::BindRequest,
942 base::Unretained(this)));
943 }
944
945 void set_ping_handler(const base::Closure& handler) {
946 ping_handler_ = handler;
947 }
948
949 private:
950 // IPC::mojom::IndirectTestDriver:
951 void GetPingReceiver(
952 IPC::mojom::PingReceiverAssociatedRequest request) override {
953 ping_receiver_binding_.Bind(std::move(request));
954 }
955
956 // IPC::mojom::PingReceiver:
957 void Ping(const PingCallback& callback) override {
958 callback.Run();
959 ping_handler_.Run();
960 }
961
962 void BindRequest(IPC::mojom::IndirectTestDriverAssociatedRequest request) {
963 DCHECK(!driver_binding_.is_bound());
964 driver_binding_.Bind(std::move(request));
965 }
966
967 mojo::AssociatedBinding<IPC::mojom::IndirectTestDriver> driver_binding_;
968 mojo::AssociatedBinding<IPC::mojom::PingReceiver> ping_receiver_binding_;
969
970 base::Closure ping_handler_;
971 };
972
973 TEST_F(IPCChannelProxyMojoTest, ProxyThreadAssociatedInterfaceIndirect) {
974 // Tests that we can pipeline interface requests and subsequent messages
975 // targeting proxy thread bindings, and the channel will still dispatch
976 // messages appropriately.
977
978 InitWithMojo("ProxyThreadAssociatedInterfaceIndirectClient");
979
980 ListenerWithIndirectProxyAssociatedInterface listener;
981 CreateProxy(&listener);
982 listener.RegisterInterfaceFactory(proxy());
983 RunProxy();
984
985 base::RunLoop loop;
986 listener.set_ping_handler(loop.QuitClosure());
987 loop.Run();
988
989 EXPECT_TRUE(WaitForClientShutdown());
990
991 DestroyProxy();
992 }
993
994 DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT(
995 ProxyThreadAssociatedInterfaceIndirectClient,
996 ChannelProxyClient) {
997 DummyListener listener;
998 CreateProxy(&listener);
999 RunProxy();
1000
1001 // Use an interface requested via another interface. On the remote end both
1002 // interfaces are bound on the proxy thread. This ensures that the Ping
1003 // message we send will still be dispatched properly even though the remote
1004 // endpoint may not have been bound yet by the time the message is initially
1005 // processed on the IO thread.
1006 IPC::mojom::IndirectTestDriverAssociatedPtr driver;
1007 IPC::mojom::PingReceiverAssociatedPtr ping_receiver;
1008 proxy()->GetRemoteAssociatedInterface(&driver);
1009 driver->GetPingReceiver(
1010 mojo::GetProxy(&ping_receiver, driver.associated_group()));
1011
1012 base::RunLoop loop;
1013 ping_receiver->Ping(loop.QuitClosure());
1014 loop.Run();
1015
1016 DestroyProxy();
1017 }
1018
926 class ListenerWithSyncAssociatedInterface 1019 class ListenerWithSyncAssociatedInterface
927 : public IPC::Listener, 1020 : public IPC::Listener,
928 public IPC::mojom::SimpleTestDriver { 1021 public IPC::mojom::SimpleTestDriver {
929 public: 1022 public:
930 ListenerWithSyncAssociatedInterface() : binding_(this) {} 1023 ListenerWithSyncAssociatedInterface() : binding_(this) {}
931 ~ListenerWithSyncAssociatedInterface() override {} 1024 ~ListenerWithSyncAssociatedInterface() override {}
932 1025
933 void set_sync_sender(IPC::Sender* sync_sender) { sync_sender_ = sync_sender; } 1026 void set_sync_sender(IPC::Sender* sync_sender) { sync_sender_ = sync_sender; }
934 1027
935 void RegisterInterfaceFactory(IPC::ChannelProxy* proxy) { 1028 void RegisterInterfaceFactory(IPC::ChannelProxy* proxy) {
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after
1152 client_impl.WaitForValueRequest(); 1245 client_impl.WaitForValueRequest();
1153 1246
1154 // Wait for the test driver to perform a classical sync IPC request, with our 1247 // Wait for the test driver to perform a classical sync IPC request, with our
1155 // own sync associated interface message nested inside. 1248 // own sync associated interface message nested inside.
1156 client_impl.UseSyncSenderForRequest(false); 1249 client_impl.UseSyncSenderForRequest(false);
1157 client_impl.WaitForValueRequest(); 1250 client_impl.WaitForValueRequest();
1158 1251
1159 DestroyProxy(); 1252 DestroyProxy();
1160 } 1253 }
1161 1254
1255 TEST_F(IPCChannelProxyMojoTest, CreatePaused) {
1256 // Ensures that creating a paused channel elicits the expected behavior when
1257 // sending messages, unpausing, sending more messages, and then manually
1258 // flushing. Specifically a sequence like:
1259 //
1260 // Connect()
1261 // Send(A)
1262 // Send(B)
1263 // Unpause(false)
1264 // Send(C)
1265 // Flush()
1266 //
1267 // must result in the other end receiving messages C, A, and then B, in that
1268 // order.
1269 //
1270 // This behavior is required by some consumers of IPC::Channel, and it is not
1271 // sufficient to leave this up to the consumer to implement since associated
1272 // interface requests and messages also need to be queued according to the
1273 // same policy.
1274 InitWithMojo("CreatePausedClient");
1275
1276 DummyListener listener;
1277 CreateProxy(&listener);
1278 RunProxy(true /* create_paused */);
1279
1280 // These messages must be queued internally since the channel is paused.
1281 SendValue(proxy(), 1);
1282 SendValue(proxy(), 2);
1283
1284 proxy()->Unpause(false /* flush */);
1285
1286 // These messages must be sent immediately since the channel is unpaused.
1287 SendValue(proxy(), 3);
1288 SendValue(proxy(), 4);
1289
1290 // Now we flush the previously queued messages.
1291 proxy()->Flush();
1292
1293 EXPECT_TRUE(WaitForClientShutdown());
1294 DestroyProxy();
1295 }
1296
1297 class ExpectValueSequenceListener : public IPC::Listener {
1298 public:
1299 explicit ExpectValueSequenceListener(std::queue<int32_t>* expected_values)
1300 : expected_values_(expected_values) {}
1301 ~ExpectValueSequenceListener() override {}
1302
1303 // IPC::Listener:
1304 bool OnMessageReceived(const IPC::Message& message) override {
1305 DCHECK(!expected_values_->empty());
1306 base::PickleIterator iter(message);
1307 int32_t should_be_expected;
1308 EXPECT_TRUE(iter.ReadInt(&should_be_expected));
1309 EXPECT_EQ(expected_values_->front(), should_be_expected);
1310 expected_values_->pop();
1311 if (expected_values_->empty())
1312 base::MessageLoop::current()->QuitWhenIdle();
1313 return true;
1314 }
1315
1316 private:
1317 std::queue<int32_t>* expected_values_;
1318
1319 DISALLOW_COPY_AND_ASSIGN(ExpectValueSequenceListener);
1320 };
1321
1322 DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT(CreatePausedClient, ChannelProxyClient) {
1323 std::queue<int32_t> expected_values;
1324 ExpectValueSequenceListener listener(&expected_values);
1325 CreateProxy(&listener);
1326 expected_values.push(3);
1327 expected_values.push(4);
1328 expected_values.push(1);
1329 expected_values.push(2);
1330 RunProxy();
1331 base::RunLoop().Run();
1332 EXPECT_TRUE(expected_values.empty());
1333 DestroyProxy();
1334 }
1335
1162 #if defined(OS_POSIX) 1336 #if defined(OS_POSIX)
1163 1337
1164 class ListenerThatExpectsFile : public IPC::Listener { 1338 class ListenerThatExpectsFile : public IPC::Listener {
1165 public: 1339 public:
1166 ListenerThatExpectsFile() : sender_(NULL) {} 1340 ListenerThatExpectsFile() : sender_(NULL) {}
1167 1341
1168 ~ListenerThatExpectsFile() override {} 1342 ~ListenerThatExpectsFile() override {}
1169 1343
1170 bool OnMessageReceived(const IPC::Message& message) override { 1344 bool OnMessageReceived(const IPC::Message& message) override {
1171 base::PickleIterator iter(message); 1345 base::PickleIterator iter(message);
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
1314 Connect(&listener); 1488 Connect(&listener);
1315 1489
1316 base::RunLoop().Run(); 1490 base::RunLoop().Run();
1317 1491
1318 Close(); 1492 Close();
1319 } 1493 }
1320 1494
1321 #endif // OS_LINUX 1495 #endif // OS_LINUX
1322 1496
1323 } // namespace 1497 } // namespace
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698