Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(245)

Side by Side Diff: mojo/system/local_message_pipe_endpoint.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/local_message_pipe_endpoint.h" 5 #include "mojo/system/local_message_pipe_endpoint.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "mojo/system/message_in_transit.h" 10 #include "mojo/system/message_in_transit.h"
11 11
12 namespace mojo { 12 namespace mojo {
13 namespace system { 13 namespace system {
14 14
15 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() 15 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
16 : is_open_(true), 16 : is_open_(true),
17 is_peer_open_(true) { 17 is_peer_open_(true) {
18 } 18 }
19 19
20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { 20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
21 DCHECK(!is_open_); 21 DCHECK(!is_open_);
22 } 22 }
23 23
24 void LocalMessagePipeEndpoint::OnPeerClose() { 24 void LocalMessagePipeEndpoint::Close() {
25 DCHECK(is_open_);
26 is_open_ = false;
27 for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin();
28 it != message_queue_.end();
29 ++it) {
30 (*it)->Destroy();
31 }
32 message_queue_.clear();
33 }
34
35 bool LocalMessagePipeEndpoint::OnPeerClose() {
25 DCHECK(is_open_); 36 DCHECK(is_open_);
26 DCHECK(is_peer_open_); 37 DCHECK(is_peer_open_);
27 38
28 MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); 39 MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
29 MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); 40 MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
30 is_peer_open_ = false; 41 is_peer_open_ = false;
31 MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); 42 MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
32 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); 43 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
33 44
34 if (new_satisfied_flags != old_satisfied_flags || 45 if (new_satisfied_flags != old_satisfied_flags ||
35 new_satisfiable_flags != old_satisfiable_flags) { 46 new_satisfiable_flags != old_satisfiable_flags) {
36 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, 47 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
37 new_satisfiable_flags); 48 new_satisfiable_flags);
38 } 49 }
50
51 return true;
39 } 52 }
40 53
41 MojoResult LocalMessagePipeEndpoint::EnqueueMessage( 54 MojoResult LocalMessagePipeEndpoint::EnqueueMessage(MessageInTransit* message) {
42 const void* bytes, uint32_t num_bytes,
43 const MojoHandle* handles, uint32_t num_handles,
44 MojoWriteMessageFlags /*flags*/) {
45 DCHECK(is_open_); 55 DCHECK(is_open_);
46 DCHECK(is_peer_open_); 56 DCHECK(is_peer_open_);
47 57
48 bool was_empty = message_queue_.empty(); 58 bool was_empty = message_queue_.empty();
49 59 message_queue_.push_back(message);
50 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
51 message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes));
52 // TODO(vtl): Support sending handles.
53
54 if (was_empty) { 60 if (was_empty) {
55 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), 61 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
56 SatisfiableFlags()); 62 SatisfiableFlags());
57 } 63 }
58 64
59 return MOJO_RESULT_OK; 65 return MOJO_RESULT_OK;
60 } 66 }
61 67
62 void LocalMessagePipeEndpoint::CancelAllWaiters() { 68 void LocalMessagePipeEndpoint::CancelAllWaiters() {
63 DCHECK(is_open_); 69 DCHECK(is_open_);
64 waiter_list_.CancelAllWaiters(); 70 waiter_list_.CancelAllWaiters();
65 } 71 }
66 72
67 void LocalMessagePipeEndpoint::Close() {
68 DCHECK(is_open_);
69 is_open_ = false;
70 for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin();
71 it != message_queue_.end();
72 ++it) {
73 (*it)->Destroy();
74 }
75 message_queue_.clear();
76 }
77
78 MojoResult LocalMessagePipeEndpoint::ReadMessage( 73 MojoResult LocalMessagePipeEndpoint::ReadMessage(
79 void* bytes, uint32_t* num_bytes, 74 void* bytes, uint32_t* num_bytes,
80 MojoHandle* handles, uint32_t* num_handles, 75 MojoHandle* handles, uint32_t* num_handles,
81 MojoReadMessageFlags flags) { 76 MojoReadMessageFlags flags) {
82 DCHECK(is_open_); 77 DCHECK(is_open_);
83 78
84 const uint32_t max_bytes = num_bytes ? *num_bytes : 0; 79 const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
85 // TODO(vtl): We'll need this later: 80 // TODO(vtl): We'll need this later:
86 // const uint32_t max_handles = num_handles ? *num_handles : 0; 81 // const uint32_t max_handles = num_handles ? *num_handles : 0;
87 82
88 if (message_queue_.empty()) 83 if (message_queue_.empty()) {
89 return MOJO_RESULT_NOT_FOUND; 84 return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
85 MOJO_RESULT_FAILED_PRECONDITION;
86 }
90 87
91 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop 88 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
92 // and release the lock immediately. 89 // and release the lock immediately.
93 bool not_enough_space = false; 90 bool not_enough_space = false;
94 MessageInTransit* const message = message_queue_.front(); 91 MessageInTransit* const message = message_queue_.front();
95 if (num_bytes) 92 if (num_bytes)
96 *num_bytes = message->data_size(); 93 *num_bytes = message->data_size();
97 if (message->data_size() <= max_bytes) 94 if (message->data_size() <= max_bytes)
98 memcpy(bytes, message->data(), message->data_size()); 95 memcpy(bytes, message->data(), message->data_size());
99 else 96 else
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
154 MojoWaitFlags satisfiable_flags = 0; 151 MojoWaitFlags satisfiable_flags = 0;
155 if (!message_queue_.empty() || is_peer_open_) 152 if (!message_queue_.empty() || is_peer_open_)
156 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; 153 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
157 if (is_peer_open_) 154 if (is_peer_open_)
158 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; 155 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
159 return satisfiable_flags; 156 return satisfiable_flags;
160 } 157 }
161 158
162 } // namespace system 159 } // namespace system
163 } // namespace mojo 160 } // namespace mojo
164
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698