Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: mojo/system/local_message_pipe_endpoint.cc

Issue 147983009: Mojo: Refactor some message pipe stuff. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « mojo/system/local_message_pipe_endpoint.h ('k') | mojo/system/message_pipe.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/local_message_pipe_endpoint.h" 5 #include "mojo/system/local_message_pipe_endpoint.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h" 10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h" 11 #include "mojo/system/message_in_transit.h"
12 12
13 namespace mojo { 13 namespace mojo {
14 namespace system { 14 namespace system {
15 15
16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry() 16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
17 : message(NULL) { 17 : message_(NULL) {
18 } 18 }
19 19
20 // See comment in header file. 20 // See comment in header file.
21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry( 21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
22 const MessageQueueEntry& other) 22 const MessageQueueEntry& other)
23 : message(NULL) { 23 : message_(NULL) {
24 DCHECK(!other.message); 24 DCHECK(!other.message_);
25 DCHECK(other.dispatchers.empty()); 25 DCHECK(other.dispatchers_.empty());
26 } 26 }
27 27
28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() { 28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
29 if (message) 29 if (message_)
30 message->Destroy(); 30 message_->Destroy();
31 // Close all the dispatchers. 31 // Close all the dispatchers.
32 for (size_t i = 0; i < dispatchers.size(); i++) { 32 for (size_t i = 0; i < dispatchers_.size(); i++) {
33 // Note: Taking the |Dispatcher| locks is okay, since no one else should 33 // Note: Taking the |Dispatcher| locks is okay, since no one else should
34 // have a reference to the dispatchers (and the locks shouldn't be held). 34 // have a reference to the dispatchers (and the locks shouldn't be held).
35 DCHECK(dispatchers[i]->HasOneRef()); 35 DCHECK(dispatchers_[i]->HasOneRef());
36 dispatchers[i]->Close(); 36 dispatchers_[i]->Close();
37 } 37 }
38 } 38 }
39 39
40 void LocalMessagePipeEndpoint::MessageQueueEntry::Init(
41 MessageInTransit* message,
42 const std::vector<Dispatcher*>* dispatchers) {
43 DCHECK(message);
44 DCHECK(!dispatchers || !dispatchers->empty());
45 DCHECK(!message_);
46 DCHECK(dispatchers_.empty());
47
48 message_ = message;
49 if (dispatchers) {
50 dispatchers_.reserve(dispatchers->size());
51 for (size_t i = 0; i < dispatchers->size(); i++) {
52 dispatchers_.push_back(
53 (*dispatchers)[i]->CreateEquivalentDispatcherAndCloseNoLock());
54
55 #ifndef NDEBUG
56 // It's important that we have "ownership" of these dispatchers. In
57 // particular, they must not be in the global handle table (i.e., have
58 // live handles referring to them). If we need to destroy any queued
59 // messages, we need to know that any handles in them should be closed.
60 DCHECK(dispatchers_[i]->HasOneRef());
61 #endif
62 }
63 }
64 }
65
40 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() 66 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
41 : is_open_(true), 67 : is_open_(true),
42 is_peer_open_(true) { 68 is_peer_open_(true) {
43 } 69 }
44 70
45 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { 71 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
46 DCHECK(!is_open_); 72 DCHECK(!is_open_);
47 } 73 }
48 74
49 void LocalMessagePipeEndpoint::Close() { 75 void LocalMessagePipeEndpoint::Close() {
(...skipping 12 matching lines...) Expand all
62 MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); 88 MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
63 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); 89 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
64 90
65 if (new_satisfied_flags != old_satisfied_flags || 91 if (new_satisfied_flags != old_satisfied_flags ||
66 new_satisfiable_flags != old_satisfiable_flags) { 92 new_satisfiable_flags != old_satisfiable_flags) {
67 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, 93 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
68 new_satisfiable_flags); 94 new_satisfiable_flags);
69 } 95 }
70 } 96 }
71 97
72 MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage( 98 MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
73 const MessageInTransit* /*message*/,
74 const std::vector<Dispatcher*>* /*dispatchers*/) {
75 return MOJO_RESULT_OK;
76 }
77
78 void LocalMessagePipeEndpoint::EnqueueMessage(
79 MessageInTransit* message, 99 MessageInTransit* message,
80 std::vector<scoped_refptr<Dispatcher> >* dispatchers) { 100 const std::vector<Dispatcher*>* dispatchers) {
81 DCHECK(is_open_); 101 DCHECK(is_open_);
82 DCHECK(is_peer_open_); 102 DCHECK(is_peer_open_);
103 DCHECK(!dispatchers || !dispatchers->empty());
83 104
84 bool was_empty = message_queue_.empty(); 105 bool was_empty = message_queue_.empty();
106 // TODO(vtl): Use |emplace_back()| (and a suitable constructor, instead of
107 // |Init()|) when that becomes available.
85 message_queue_.push_back(MessageQueueEntry()); 108 message_queue_.push_back(MessageQueueEntry());
86 message_queue_.back().message = message; 109 message_queue_.back().Init(message, dispatchers);
87 if (dispatchers) {
88 #ifndef NDEBUG
89 // It's important that we're taking "ownership" of the dispatchers. In
90 // particular, they must not be in the global handle table (i.e., have live
91 // handles referring to them). If we need to destroy any queued messages, we
92 // need to know that any handles in them should be closed.
93 for (size_t i = 0; i < dispatchers->size(); i++)
94 DCHECK((*dispatchers)[i]->HasOneRef());
95 #endif
96 message_queue_.back().dispatchers.swap(*dispatchers);
97 }
98 if (was_empty) { 110 if (was_empty) {
99 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), 111 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
100 SatisfiableFlags()); 112 SatisfiableFlags());
101 } 113 }
114
115 return MOJO_RESULT_OK;
102 } 116 }
103 117
104 void LocalMessagePipeEndpoint::CancelAllWaiters() { 118 void LocalMessagePipeEndpoint::CancelAllWaiters() {
105 DCHECK(is_open_); 119 DCHECK(is_open_);
106 waiter_list_.CancelAllWaiters(); 120 waiter_list_.CancelAllWaiters();
107 } 121 }
108 122
109 MojoResult LocalMessagePipeEndpoint::ReadMessage( 123 MojoResult LocalMessagePipeEndpoint::ReadMessage(
110 void* bytes, uint32_t* num_bytes, 124 void* bytes, uint32_t* num_bytes,
111 std::vector<scoped_refptr<Dispatcher> >* dispatchers, 125 std::vector<scoped_refptr<Dispatcher> >* dispatchers,
112 uint32_t* num_dispatchers, 126 uint32_t* num_dispatchers,
113 MojoReadMessageFlags flags) { 127 MojoReadMessageFlags flags) {
114 DCHECK(is_open_); 128 DCHECK(is_open_);
115 DCHECK(!dispatchers || dispatchers->empty()); 129 DCHECK(!dispatchers || dispatchers->empty());
116 130
117 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; 131 const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
118 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; 132 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
119 133
120 if (message_queue_.empty()) { 134 if (message_queue_.empty()) {
121 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT : 135 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT :
122 MOJO_RESULT_FAILED_PRECONDITION; 136 MOJO_RESULT_FAILED_PRECONDITION;
123 } 137 }
124 138
125 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 139 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
126 // and release the lock immediately. 140 // and release the lock immediately.
127 bool enough_space = true; 141 bool enough_space = true;
128 const MessageInTransit* queued_message = message_queue_.front().message; 142 const MessageInTransit* queued_message = message_queue_.front().message();
129 if (num_bytes) 143 if (num_bytes)
130 *num_bytes = queued_message->data_size(); 144 *num_bytes = queued_message->data_size();
131 if (queued_message->data_size() <= max_bytes) 145 if (queued_message->data_size() <= max_bytes)
132 memcpy(bytes, queued_message->data(), queued_message->data_size()); 146 memcpy(bytes, queued_message->data(), queued_message->data_size());
133 else 147 else
134 enough_space = false; 148 enough_space = false;
135 149
136 std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers = 150 std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
137 &message_queue_.front().dispatchers; 151 message_queue_.front().dispatchers();
138 if (num_dispatchers) 152 if (num_dispatchers)
139 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); 153 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
140 if (enough_space) { 154 if (enough_space) {
141 if (queued_dispatchers->empty()) { 155 if (queued_dispatchers->empty()) {
142 // Nothing to do. 156 // Nothing to do.
143 } else if (queued_dispatchers->size() <= max_num_dispatchers) { 157 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
144 DCHECK(dispatchers); 158 DCHECK(dispatchers);
145 dispatchers->swap(*queued_dispatchers); 159 dispatchers->swap(*queued_dispatchers);
146 } else { 160 } else {
147 enough_space = false; 161 enough_space = false;
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 MojoWaitFlags satisfiable_flags = 0; 212 MojoWaitFlags satisfiable_flags = 0;
199 if (!message_queue_.empty() || is_peer_open_) 213 if (!message_queue_.empty() || is_peer_open_)
200 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; 214 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
201 if (is_peer_open_) 215 if (is_peer_open_)
202 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; 216 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
203 return satisfiable_flags; 217 return satisfiable_flags;
204 } 218 }
205 219
206 } // namespace system 220 } // namespace system
207 } // namespace mojo 221 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/local_message_pipe_endpoint.h ('k') | mojo/system/message_pipe.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698