OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <limits> | 10 #include <limits> |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
87 return MOJO_RESULT_INVALID_ARGUMENT; | 87 return MOJO_RESULT_INVALID_ARGUMENT; |
88 if (reader.options().capacity_num_bytes > | 88 if (reader.options().capacity_num_bytes > |
89 GetConfiguration().max_data_pipe_capacity_bytes) | 89 GetConfiguration().max_data_pipe_capacity_bytes) |
90 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 90 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
91 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 91 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
92 | 92 |
93 return MOJO_RESULT_OK; | 93 return MOJO_RESULT_OK; |
94 } | 94 } |
95 | 95 |
96 // static | 96 // static |
97 DataPipe* DataPipe::CreateLocal( | 97 RefPtr<DataPipe> DataPipe::CreateLocal( |
98 const MojoCreateDataPipeOptions& validated_options) { | 98 const MojoCreateDataPipeOptions& validated_options) { |
99 return new DataPipe(true, true, validated_options, | 99 return AdoptRef(new DataPipe(true, true, validated_options, |
100 util::MakeUnique<LocalDataPipeImpl>()); | 100 util::MakeUnique<LocalDataPipeImpl>())); |
101 } | 101 } |
102 | 102 |
103 // static | 103 // static |
104 DataPipe* DataPipe::CreateRemoteProducerFromExisting( | 104 RefPtr<DataPipe> DataPipe::CreateRemoteProducerFromExisting( |
105 const MojoCreateDataPipeOptions& validated_options, | 105 const MojoCreateDataPipeOptions& validated_options, |
106 MessageInTransitQueue* message_queue, | 106 MessageInTransitQueue* message_queue, |
107 RefPtr<ChannelEndpoint>&& channel_endpoint) { | 107 RefPtr<ChannelEndpoint>&& channel_endpoint) { |
108 std::unique_ptr<char, base::AlignedFreeDeleter> buffer; | 108 std::unique_ptr<char, base::AlignedFreeDeleter> buffer; |
109 size_t buffer_num_bytes = 0; | 109 size_t buffer_num_bytes = 0; |
110 if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 110 if (!RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
111 validated_options, message_queue, &buffer, &buffer_num_bytes)) | 111 validated_options, message_queue, &buffer, &buffer_num_bytes)) |
112 return nullptr; | 112 return nullptr; |
113 | 113 |
114 // Important: This is called under |IncomingEndpoint|'s (which is a | 114 // Important: This is called under |IncomingEndpoint|'s (which is a |
115 // |ChannelEndpointClient|) lock, in particular from | 115 // |ChannelEndpointClient|) lock, in particular from |
116 // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that | 116 // |IncomingEndpoint::ConvertToDataPipeConsumer()|. Before releasing that |
117 // lock, it will reset its |endpoint_| member, which makes any later or | 117 // lock, it will reset its |endpoint_| member, which makes any later or |
118 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will | 118 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
119 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| | 119 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
120 // is called. | 120 // is called. |
121 DataPipe* data_pipe = new DataPipe( | 121 RefPtr<DataPipe> data_pipe = AdoptRef(new DataPipe( |
122 false, true, validated_options, | 122 false, true, validated_options, |
123 util::MakeUnique<RemoteProducerDataPipeImpl>( | 123 util::MakeUnique<RemoteProducerDataPipeImpl>( |
124 channel_endpoint.Clone(), std::move(buffer), 0, buffer_num_bytes)); | 124 channel_endpoint.Clone(), std::move(buffer), 0, buffer_num_bytes))); |
125 if (channel_endpoint) { | 125 if (channel_endpoint) { |
126 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) | 126 if (!channel_endpoint->ReplaceClient(data_pipe.Clone(), 0)) |
127 data_pipe->OnDetachFromChannel(0); | 127 data_pipe->OnDetachFromChannel(0); |
128 } else { | 128 } else { |
129 data_pipe->SetProducerClosed(); | 129 data_pipe->SetProducerClosed(); |
130 } | 130 } |
131 return data_pipe; | 131 return data_pipe; |
132 } | 132 } |
133 | 133 |
134 // static | 134 // static |
135 DataPipe* DataPipe::CreateRemoteConsumerFromExisting( | 135 RefPtr<DataPipe> DataPipe::CreateRemoteConsumerFromExisting( |
136 const MojoCreateDataPipeOptions& validated_options, | 136 const MojoCreateDataPipeOptions& validated_options, |
137 size_t consumer_num_bytes, | 137 size_t consumer_num_bytes, |
138 MessageInTransitQueue* message_queue, | 138 MessageInTransitQueue* message_queue, |
139 RefPtr<ChannelEndpoint>&& channel_endpoint) { | 139 RefPtr<ChannelEndpoint>&& channel_endpoint) { |
140 if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 140 if (!RemoteConsumerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( |
141 validated_options, &consumer_num_bytes, message_queue)) | 141 validated_options, &consumer_num_bytes, message_queue)) |
142 return nullptr; | 142 return nullptr; |
143 | 143 |
144 // Important: This is called under |IncomingEndpoint|'s (which is a | 144 // Important: This is called under |IncomingEndpoint|'s (which is a |
145 // |ChannelEndpointClient|) lock, in particular from | 145 // |ChannelEndpointClient|) lock, in particular from |
146 // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that | 146 // |IncomingEndpoint::ConvertToDataPipeProducer()|. Before releasing that |
147 // lock, it will reset its |endpoint_| member, which makes any later or | 147 // lock, it will reset its |endpoint_| member, which makes any later or |
148 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will | 148 // ongoing call to |IncomingEndpoint::OnReadMessage()| return false. This will |
149 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| | 149 // make |ChannelEndpoint::OnReadMessage()| retry, until its |ReplaceClient()| |
150 // is called. | 150 // is called. |
151 DataPipe* data_pipe = new DataPipe( | 151 RefPtr<DataPipe> data_pipe = AdoptRef(new DataPipe( |
152 true, false, validated_options, | 152 true, false, validated_options, |
153 util::MakeUnique<RemoteConsumerDataPipeImpl>( | 153 util::MakeUnique<RemoteConsumerDataPipeImpl>( |
154 channel_endpoint.Clone(), consumer_num_bytes, nullptr, 0)); | 154 channel_endpoint.Clone(), consumer_num_bytes, nullptr, 0))); |
155 if (channel_endpoint) { | 155 if (channel_endpoint) { |
156 if (!channel_endpoint->ReplaceClient(data_pipe, 0)) | 156 if (!channel_endpoint->ReplaceClient(data_pipe.Clone(), 0)) |
157 data_pipe->OnDetachFromChannel(0); | 157 data_pipe->OnDetachFromChannel(0); |
158 } else { | 158 } else { |
159 data_pipe->SetConsumerClosed(); | 159 data_pipe->SetConsumerClosed(); |
160 } | 160 } |
161 return data_pipe; | 161 return data_pipe; |
162 } | 162 } |
163 | 163 |
164 // static | 164 // static |
165 bool DataPipe::ProducerDeserialize(Channel* channel, | 165 bool DataPipe::ProducerDeserialize(Channel* channel, |
166 const void* source, | 166 const void* source, |
167 size_t size, | 167 size_t size, |
168 scoped_refptr<DataPipe>* data_pipe) { | 168 RefPtr<DataPipe>* data_pipe) { |
169 DCHECK(!*data_pipe); // Not technically wrong, but unlikely. | 169 DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
170 | 170 |
171 bool consumer_open = false; | 171 bool consumer_open = false; |
172 if (size == sizeof(SerializedDataPipeProducerDispatcher)) { | 172 if (size == sizeof(SerializedDataPipeProducerDispatcher)) { |
173 consumer_open = false; | 173 consumer_open = false; |
174 } else if (size == | 174 } else if (size == |
175 sizeof(SerializedDataPipeProducerDispatcher) + | 175 sizeof(SerializedDataPipeProducerDispatcher) + |
176 channel->GetSerializedEndpointSize()) { | 176 channel->GetSerializedEndpointSize()) { |
177 consumer_open = true; | 177 consumer_open = true; |
178 } else { | 178 } else { |
(...skipping 10 matching lines...) Expand all Loading... |
189 return false; | 189 return false; |
190 } | 190 } |
191 | 191 |
192 if (!consumer_open) { | 192 if (!consumer_open) { |
193 if (s->consumer_num_bytes != static_cast<size_t>(-1)) { | 193 if (s->consumer_num_bytes != static_cast<size_t>(-1)) { |
194 LOG(ERROR) | 194 LOG(ERROR) |
195 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; | 195 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
196 return false; | 196 return false; |
197 } | 197 } |
198 | 198 |
199 *data_pipe = new DataPipe( | 199 *data_pipe = AdoptRef(new DataPipe( |
200 true, false, revalidated_options, | 200 true, false, revalidated_options, |
201 util::MakeUnique<RemoteConsumerDataPipeImpl>(nullptr, 0, nullptr, 0)); | 201 util::MakeUnique<RemoteConsumerDataPipeImpl>(nullptr, 0, nullptr, 0))); |
202 (*data_pipe)->SetConsumerClosed(); | 202 (*data_pipe)->SetConsumerClosed(); |
203 | 203 |
204 return true; | 204 return true; |
205 } | 205 } |
206 | 206 |
207 if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || | 207 if (s->consumer_num_bytes > revalidated_options.capacity_num_bytes || |
208 s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { | 208 s->consumer_num_bytes % revalidated_options.element_num_bytes != 0) { |
209 LOG(ERROR) | 209 LOG(ERROR) |
210 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; | 210 << "Invalid serialized data pipe producer (bad consumer_num_bytes)"; |
211 return false; | 211 return false; |
212 } | 212 } |
213 | 213 |
214 const void* endpoint_source = static_cast<const char*>(source) + | 214 const void* endpoint_source = static_cast<const char*>(source) + |
215 sizeof(SerializedDataPipeProducerDispatcher); | 215 sizeof(SerializedDataPipeProducerDispatcher); |
216 scoped_refptr<IncomingEndpoint> incoming_endpoint = | 216 RefPtr<IncomingEndpoint> incoming_endpoint = |
217 channel->DeserializeEndpoint(endpoint_source); | 217 channel->DeserializeEndpoint(endpoint_source); |
218 if (!incoming_endpoint) | 218 if (!incoming_endpoint) |
219 return false; | 219 return false; |
220 | 220 |
221 *data_pipe = incoming_endpoint->ConvertToDataPipeProducer( | 221 *data_pipe = incoming_endpoint->ConvertToDataPipeProducer( |
222 revalidated_options, s->consumer_num_bytes); | 222 revalidated_options, s->consumer_num_bytes); |
223 if (!*data_pipe) | 223 if (!*data_pipe) |
224 return false; | 224 return false; |
225 | 225 |
226 return true; | 226 return true; |
227 } | 227 } |
228 | 228 |
229 // static | 229 // static |
230 bool DataPipe::ConsumerDeserialize(Channel* channel, | 230 bool DataPipe::ConsumerDeserialize(Channel* channel, |
231 const void* source, | 231 const void* source, |
232 size_t size, | 232 size_t size, |
233 scoped_refptr<DataPipe>* data_pipe) { | 233 RefPtr<DataPipe>* data_pipe) { |
234 DCHECK(!*data_pipe); // Not technically wrong, but unlikely. | 234 DCHECK(!*data_pipe); // Not technically wrong, but unlikely. |
235 | 235 |
236 if (size != | 236 if (size != |
237 sizeof(SerializedDataPipeConsumerDispatcher) + | 237 sizeof(SerializedDataPipeConsumerDispatcher) + |
238 channel->GetSerializedEndpointSize()) { | 238 channel->GetSerializedEndpointSize()) { |
239 LOG(ERROR) << "Invalid serialized data pipe consumer"; | 239 LOG(ERROR) << "Invalid serialized data pipe consumer"; |
240 return false; | 240 return false; |
241 } | 241 } |
242 | 242 |
243 const SerializedDataPipeConsumerDispatcher* s = | 243 const SerializedDataPipeConsumerDispatcher* s = |
244 static_cast<const SerializedDataPipeConsumerDispatcher*>(source); | 244 static_cast<const SerializedDataPipeConsumerDispatcher*>(source); |
245 MojoCreateDataPipeOptions revalidated_options = {}; | 245 MojoCreateDataPipeOptions revalidated_options = {}; |
246 if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), | 246 if (ValidateCreateOptions(MakeUserPointer(&s->validated_options), |
247 &revalidated_options) != MOJO_RESULT_OK) { | 247 &revalidated_options) != MOJO_RESULT_OK) { |
248 LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)"; | 248 LOG(ERROR) << "Invalid serialized data pipe consumer (bad options)"; |
249 return false; | 249 return false; |
250 } | 250 } |
251 | 251 |
252 const void* endpoint_source = static_cast<const char*>(source) + | 252 const void* endpoint_source = static_cast<const char*>(source) + |
253 sizeof(SerializedDataPipeConsumerDispatcher); | 253 sizeof(SerializedDataPipeConsumerDispatcher); |
254 scoped_refptr<IncomingEndpoint> incoming_endpoint = | 254 RefPtr<IncomingEndpoint> incoming_endpoint = |
255 channel->DeserializeEndpoint(endpoint_source); | 255 channel->DeserializeEndpoint(endpoint_source); |
256 if (!incoming_endpoint) | 256 if (!incoming_endpoint) |
257 return false; | 257 return false; |
258 | 258 |
259 *data_pipe = | 259 *data_pipe = |
260 incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options); | 260 incoming_endpoint->ConvertToDataPipeConsumer(revalidated_options); |
261 if (!*data_pipe) | 261 if (!*data_pipe) |
262 return false; | 262 return false; |
263 | 263 |
264 return true; | 264 return true; |
(...skipping 538 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
803 SetProducerClosedNoLock(); | 803 SetProducerClosedNoLock(); |
804 } | 804 } |
805 | 805 |
806 void DataPipe::SetConsumerClosed() { | 806 void DataPipe::SetConsumerClosed() { |
807 MutexLocker locker(&mutex_); | 807 MutexLocker locker(&mutex_); |
808 SetConsumerClosedNoLock(); | 808 SetConsumerClosedNoLock(); |
809 } | 809 } |
810 | 810 |
811 } // namespace system | 811 } // namespace system |
812 } // namespace mojo | 812 } // namespace mojo |
OLD | NEW |