Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(703)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.h

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_ 5 #ifndef MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_
6 #define MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_ 6 #define MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_
7 7
8 #include <stddef.h> 8 #include <stddef.h>
9 #include <stdint.h> 9 #include <stdint.h>
10 10
11 #include "base/macros.h"
11 #include "base/memory/ref_counted.h" 12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/edk/embedder/platform_handle_vector.h"
16 #include "mojo/edk/embedder/platform_shared_buffer.h"
17 #include "mojo/edk/embedder/scoped_platform_handle.h"
12 #include "mojo/edk/system/awakable_list.h" 18 #include "mojo/edk/system/awakable_list.h"
13 #include "mojo/edk/system/dispatcher.h" 19 #include "mojo/edk/system/dispatcher.h"
14 #include "mojo/edk/system/raw_channel.h" 20 #include "mojo/edk/system/ports/port_ref.h"
15 #include "mojo/edk/system/system_impl_export.h" 21 #include "mojo/edk/system/system_impl_export.h"
16 #include "mojo/public/cpp/system/macros.h"
17 22
18 namespace mojo { 23 namespace mojo {
19 namespace edk { 24 namespace edk {
20 25
21 // This is the |Dispatcher| implementation for the consumer handle for data 26 struct DataPipeControlMessage;
22 // pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is 27 class NodeController;
28
29 // This is the Dispatcher implementation for the consumer handle for data
30 // pipes created by the Mojo primitive MojoCreateDataPipe(). This class is
23 // thread-safe. 31 // thread-safe.
24 class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final 32 class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final
25 : public Dispatcher, public RawChannel::Delegate { 33 : public Dispatcher {
26 public: 34 public:
27 static scoped_refptr<DataPipeConsumerDispatcher> Create( 35 DataPipeConsumerDispatcher(
28 const MojoCreateDataPipeOptions& options) { 36 NodeController* node_controller,
29 return make_scoped_refptr(new DataPipeConsumerDispatcher(options)); 37 const ports::PortRef& control_port,
30 } 38 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
39 const MojoCreateDataPipeOptions& options,
40 bool initialized,
41 uint64_t pipe_id);
31 42
32 // Must be called before any other methods. 43 // Dispatcher:
33 void Init(ScopedPlatformHandle message_pipe, 44 Type GetType() const override;
34 char* serialized_read_buffer, size_t serialized_read_buffer_size); 45 MojoResult Close() override;
46 MojoResult ReadData(void* elements,
47 uint32_t* num_bytes,
48 MojoReadDataFlags flags) override;
49 MojoResult BeginReadData(const void** buffer,
50 uint32_t* buffer_num_bytes,
51 MojoReadDataFlags flags) override;
52 MojoResult EndReadData(uint32_t num_bytes_read) override;
53 HandleSignalsState GetHandleSignalsState() const override;
54 MojoResult AddAwakable(Awakable* awakable,
55 MojoHandleSignals signals,
56 uintptr_t context,
57 HandleSignalsState* signals_state) override;
58 void RemoveAwakable(Awakable* awakable,
59 HandleSignalsState* signals_state) override;
60 void StartSerialize(uint32_t* num_bytes,
61 uint32_t* num_ports,
62 uint32_t* num_handles) override;
63 bool EndSerialize(void* destination,
64 ports::PortName* ports,
65 PlatformHandle* handles) override;
66 bool BeginTransit() override;
67 void CompleteTransitAndClose() override;
68 void CancelTransit() override;
35 69
36 // |Dispatcher| public methods:
37 Type GetType() const override;
38
39 // The "opposite" of |SerializeAndClose()|. (Typically this is called by
40 // |Dispatcher::Deserialize()|.)
41 static scoped_refptr<DataPipeConsumerDispatcher> 70 static scoped_refptr<DataPipeConsumerDispatcher>
42 Deserialize(const void* source, 71 Deserialize(const void* data,
43 size_t size, 72 size_t num_bytes,
44 PlatformHandleVector* platform_handles); 73 const ports::PortName* ports,
74 size_t num_ports,
75 PlatformHandle* handles,
76 size_t num_handles);
45 77
46 private: 78 private:
47 DataPipeConsumerDispatcher(const MojoCreateDataPipeOptions& options); 79 class PortObserverThunk;
80 friend class PortObserverThunk;
81
48 ~DataPipeConsumerDispatcher() override; 82 ~DataPipeConsumerDispatcher() override;
49 83
50 void InitOnIO(); 84 void InitializeNoLock();
51 void CloseOnIO(); 85 MojoResult CloseNoLock();
86 HandleSignalsState GetHandleSignalsStateNoLock() const;
87 void NotifyRead(uint32_t num_bytes);
88 void OnPortStatusChanged();
89 void UpdateSignalsStateNoLock();
52 90
53 // |Dispatcher| protected methods: 91 const MojoCreateDataPipeOptions options_;
54 void CancelAllAwakablesNoLock() override; 92 NodeController* const node_controller_;
55 void CloseImplNoLock() override; 93 const ports::PortRef control_port_;
56 scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock() 94 const uint64_t pipe_id_;
57 override;
58 MojoResult ReadDataImplNoLock(void* elements,
59 uint32_t* num_bytes,
60 MojoReadDataFlags flags) override;
61 MojoResult BeginReadDataImplNoLock(const void** buffer,
62 uint32_t* buffer_num_bytes,
63 MojoReadDataFlags flags) override;
64 MojoResult EndReadDataImplNoLock(uint32_t num_bytes_read) override;
65 HandleSignalsState GetHandleSignalsStateImplNoLock() const override;
66 MojoResult AddAwakableImplNoLock(Awakable* awakable,
67 MojoHandleSignals signals,
68 uintptr_t context,
69 HandleSignalsState* signals_state) override;
70 void RemoveAwakableImplNoLock(Awakable* awakable,
71 HandleSignalsState* signals_state) override;
72 void StartSerializeImplNoLock(size_t* max_size,
73 size_t* max_platform_handles) override;
74 bool EndSerializeAndCloseImplNoLock(
75 void* destination,
76 size_t* actual_size,
77 PlatformHandleVector* platform_handles) override;
78 void TransportStarted() override;
79 void TransportEnded() override;
80 bool IsBusyNoLock() const override;
81 95
82 // |RawChannel::Delegate methods: 96 // Guards access to the fields below.
83 void OnReadMessage( 97 mutable base::Lock lock_;
84 const MessageInTransit::View& message_view,
85 ScopedPlatformHandleVectorPtr platform_handles) override;
86 void OnError(Error error) override;
87 98
88 // See comment in MessagePipeDispatcher for this method.
89 void SerializeInternal();
90
91 MojoCreateDataPipeOptions options_;
92
93 // Protected by |lock()|:
94 RawChannel* channel_; // This will be null if closed.
95
96 // Queue of incoming messages.
97 std::vector<char> data_;
98 AwakableList awakable_list_; 99 AwakableList awakable_list_;
99 100
100 // If DispatcherTransport is created. Must be set before lock() is called to 101 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_;
101 // avoid deadlocks with RawChannel calling us. 102 scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_;
102 base::Lock started_transport_; 103 ScopedPlatformHandle buffer_handle_for_transit_;
103 104
104 bool calling_init_; 105 bool in_two_phase_read_ = false;
106 uint32_t two_phase_max_bytes_read_ = 0;
105 107
106 bool in_two_phase_read_; 108 bool in_transit_ = false;
107 uint32_t two_phase_max_bytes_read_; 109 bool is_closed_ = false;
108 // If we get data from the channel while we're in two-phase read, we can't 110 bool peer_closed_ = false;
109 // resize data_ since it's being used. So instead we store it temporarly. 111 bool transferred_ = false;
110 std::vector<char> data_received_during_two_phase_read_;
111 112
112 bool error_; 113 uint32_t read_offset_ = 0;
114 uint32_t bytes_available_ = 0;
113 115
114 bool serialized_; 116 DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
115 std::vector<char> serialized_read_buffer_;
116 ScopedPlatformHandle serialized_platform_handle_;
117
118 MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
119 }; 117 };
120 118
121 } // namespace edk 119 } // namespace edk
122 } // namespace mojo 120 } // namespace mojo
123 121
124 #endif // MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_ 122 #endif // MOJO_EDK_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698