OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/local_message_pipe_endpoint.h" | |
6 | |
7 #include <string.h> | |
8 | |
9 #include "base/logging.h" | |
10 #include "mojo/edk/system/dispatcher.h" | |
11 #include "mojo/edk/system/message_in_transit.h" | |
12 | |
13 namespace mojo { | |
14 namespace system { | |
15 | |
16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() | |
17 : is_open_(true), is_peer_open_(true) { | |
18 } | |
19 | |
20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { | |
21 DCHECK(!is_open_); | |
22 DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open. | |
23 } | |
24 | |
25 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const { | |
26 return kTypeLocal; | |
27 } | |
28 | |
29 bool LocalMessagePipeEndpoint::OnPeerClose() { | |
30 DCHECK(is_open_); | |
31 DCHECK(is_peer_open_); | |
32 | |
33 HandleSignalsState old_state = GetHandleSignalsState(); | |
34 is_peer_open_ = false; | |
35 HandleSignalsState new_state = GetHandleSignalsState(); | |
36 | |
37 if (!new_state.equals(old_state)) | |
38 waiter_list_.AwakeWaitersForStateChange(new_state); | |
39 | |
40 return true; | |
41 } | |
42 | |
43 void LocalMessagePipeEndpoint::EnqueueMessage( | |
44 scoped_ptr<MessageInTransit> message) { | |
45 DCHECK(is_open_); | |
46 DCHECK(is_peer_open_); | |
47 | |
48 bool was_empty = message_queue_.IsEmpty(); | |
49 message_queue_.AddMessage(message.Pass()); | |
50 if (was_empty) | |
51 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); | |
52 } | |
53 | |
54 void LocalMessagePipeEndpoint::Close() { | |
55 DCHECK(is_open_); | |
56 is_open_ = false; | |
57 message_queue_.Clear(); | |
58 } | |
59 | |
60 void LocalMessagePipeEndpoint::CancelAllWaiters() { | |
61 DCHECK(is_open_); | |
62 waiter_list_.CancelAllWaiters(); | |
63 } | |
64 | |
65 MojoResult LocalMessagePipeEndpoint::ReadMessage( | |
66 UserPointer<void> bytes, | |
67 UserPointer<uint32_t> num_bytes, | |
68 DispatcherVector* dispatchers, | |
69 uint32_t* num_dispatchers, | |
70 MojoReadMessageFlags flags) { | |
71 DCHECK(is_open_); | |
72 DCHECK(!dispatchers || dispatchers->empty()); | |
73 | |
74 const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get(); | |
75 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | |
76 | |
77 if (message_queue_.IsEmpty()) { | |
78 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT | |
79 : MOJO_RESULT_FAILED_PRECONDITION; | |
80 } | |
81 | |
82 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | |
83 // and release the lock immediately. | |
84 bool enough_space = true; | |
85 MessageInTransit* message = message_queue_.PeekMessage(); | |
86 if (!num_bytes.IsNull()) | |
87 num_bytes.Put(message->num_bytes()); | |
88 if (message->num_bytes() <= max_bytes) | |
89 bytes.PutArray(message->bytes(), message->num_bytes()); | |
90 else | |
91 enough_space = false; | |
92 | |
93 if (DispatcherVector* queued_dispatchers = message->dispatchers()) { | |
94 if (num_dispatchers) | |
95 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); | |
96 if (enough_space) { | |
97 if (queued_dispatchers->empty()) { | |
98 // Nothing to do. | |
99 } else if (queued_dispatchers->size() <= max_num_dispatchers) { | |
100 DCHECK(dispatchers); | |
101 dispatchers->swap(*queued_dispatchers); | |
102 } else { | |
103 enough_space = false; | |
104 } | |
105 } | |
106 } else { | |
107 if (num_dispatchers) | |
108 *num_dispatchers = 0; | |
109 } | |
110 | |
111 message = nullptr; | |
112 | |
113 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { | |
114 message_queue_.DiscardMessage(); | |
115 | |
116 // Now it's empty, thus no longer readable. | |
117 if (message_queue_.IsEmpty()) { | |
118 // It's currently not possible to wait for non-readability, but we should | |
119 // do the state change anyway. | |
120 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); | |
121 } | |
122 } | |
123 | |
124 if (!enough_space) | |
125 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
126 | |
127 return MOJO_RESULT_OK; | |
128 } | |
129 | |
130 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() const { | |
131 HandleSignalsState rv; | |
132 if (!message_queue_.IsEmpty()) { | |
133 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
134 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
135 } | |
136 if (is_peer_open_) { | |
137 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
138 rv.satisfiable_signals |= | |
139 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; | |
140 } | |
141 return rv; | |
142 } | |
143 | |
144 MojoResult LocalMessagePipeEndpoint::AddWaiter( | |
145 Waiter* waiter, | |
146 MojoHandleSignals signals, | |
147 uint32_t context, | |
148 HandleSignalsState* signals_state) { | |
149 DCHECK(is_open_); | |
150 | |
151 HandleSignalsState state = GetHandleSignalsState(); | |
152 if (state.satisfies(signals)) { | |
153 if (signals_state) | |
154 *signals_state = state; | |
155 return MOJO_RESULT_ALREADY_EXISTS; | |
156 } | |
157 if (!state.can_satisfy(signals)) { | |
158 if (signals_state) | |
159 *signals_state = state; | |
160 return MOJO_RESULT_FAILED_PRECONDITION; | |
161 } | |
162 | |
163 waiter_list_.AddWaiter(waiter, signals, context); | |
164 return MOJO_RESULT_OK; | |
165 } | |
166 | |
167 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter, | |
168 HandleSignalsState* signals_state) { | |
169 DCHECK(is_open_); | |
170 waiter_list_.RemoveWaiter(waiter); | |
171 if (signals_state) | |
172 *signals_state = GetHandleSignalsState(); | |
173 } | |
174 | |
175 } // namespace system | |
176 } // namespace mojo | |
OLD | NEW |