OLD | NEW |
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/jingle_glue/jingle_channel.h" | 5 #include "remoting/jingle_glue/jingle_channel.h" |
6 | 6 |
7 #include "base/lock.h" | 7 #include "base/lock.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
9 #include "base/message_loop.h" | 9 #include "base/message_loop.h" |
10 #include "base/waitable_event.h" | 10 #include "base/waitable_event.h" |
11 #include "media/base/data_buffer.h" | 11 #include "media/base/data_buffer.h" |
12 #include "remoting/jingle_glue/jingle_thread.h" | 12 #include "remoting/jingle_glue/jingle_thread.h" |
13 #include "third_party/libjingle/source/talk/base/stream.h" | 13 #include "third_party/libjingle/source/talk/base/stream.h" |
14 | 14 |
15 using media::DataBuffer; | 15 using media::DataBuffer; |
16 | 16 |
17 namespace remoting { | 17 namespace remoting { |
18 | 18 |
19 // Size of a read buffer chunk in bytes. | 19 // Size of a read buffer chunk in bytes. |
20 const size_t kReadBufferSize = 4096; | 20 const size_t kReadBufferSize = 4096; |
21 | 21 |
22 JingleChannel::JingleChannel(Callback* callback) | 22 JingleChannel::JingleChannel(Callback* callback) |
23 : state_(INITIALIZING), | 23 : state_(INITIALIZING), |
24 callback_(callback), | 24 callback_(callback), |
| 25 closed_(false), |
25 event_handler_(this), | 26 event_handler_(this), |
26 write_buffer_size_(0), | 27 write_buffer_size_(0), |
27 current_write_buf_pos_(0) { | 28 current_write_buf_pos_(0) { |
28 DCHECK(callback_ != NULL); | 29 DCHECK(callback_ != NULL); |
29 } | 30 } |
30 | 31 |
31 // This constructor is only used in unit test. | 32 // This constructor is only used in unit test. |
32 JingleChannel::JingleChannel() | 33 JingleChannel::JingleChannel() |
33 : state_(CLOSED), | 34 : state_(CLOSED), |
| 35 closed_(false), |
34 write_buffer_size_(0), | 36 write_buffer_size_(0), |
35 current_write_buf_pos_(0) { | 37 current_write_buf_pos_(0) { |
36 } | 38 } |
37 | 39 |
38 JingleChannel::~JingleChannel() { | 40 JingleChannel::~JingleChannel() { |
39 DCHECK_EQ(CLOSED, state_); | 41 DCHECK(closed_ || stream_ == NULL); |
40 } | 42 } |
41 | 43 |
42 void JingleChannel::Init(JingleThread* thread, | 44 void JingleChannel::Init(JingleThread* thread, |
43 talk_base::StreamInterface* stream, | 45 talk_base::StreamInterface* stream, |
44 const std::string& jid) { | 46 const std::string& jid) { |
45 thread_ = thread; | 47 thread_ = thread; |
46 stream_.reset(stream); | 48 stream_.reset(stream); |
47 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); | 49 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); |
48 jid_ = jid; | 50 jid_ = jid; |
49 | 51 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
94 scoped_refptr<DataBuffer> buffer( | 96 scoped_refptr<DataBuffer> buffer( |
95 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); | 97 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); |
96 size_t bytes_read; | 98 size_t bytes_read; |
97 int error; | 99 int error; |
98 talk_base::StreamResult result = stream_->Read( | 100 talk_base::StreamResult result = stream_->Read( |
99 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); | 101 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); |
100 switch (result) { | 102 switch (result) { |
101 case talk_base::SR_SUCCESS: { | 103 case talk_base::SR_SUCCESS: { |
102 DCHECK_GT(bytes_read, 0U); | 104 DCHECK_GT(bytes_read, 0U); |
103 buffer->SetDataSize(bytes_read); | 105 buffer->SetDataSize(bytes_read); |
104 callback_->OnPacketReceived(this, buffer); | 106 { |
| 107 AutoLock auto_lock(state_lock_); |
| 108 // Drop received data if the channel is already closed. |
| 109 if (!closed_) |
| 110 callback_->OnPacketReceived(this, buffer); |
| 111 } |
105 break; | 112 break; |
106 } | 113 } |
107 case talk_base::SR_BLOCK: { | 114 case talk_base::SR_BLOCK: { |
108 return; | 115 return; |
109 } | 116 } |
110 case talk_base::SR_EOS: { | 117 case talk_base::SR_EOS: { |
111 SetState(CLOSED); | 118 SetState(CLOSED); |
112 return; | 119 return; |
113 } | 120 } |
114 case talk_base::SR_ERROR: { | 121 case talk_base::SR_ERROR: { |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
170 if (state_ == OPEN && (events & talk_base::SE_WRITE)) | 177 if (state_ == OPEN && (events & talk_base::SE_WRITE)) |
171 DoWrite(); | 178 DoWrite(); |
172 | 179 |
173 if (state_ == OPEN && (events & talk_base::SE_READ)) | 180 if (state_ == OPEN && (events & talk_base::SE_READ)) |
174 DoRead(); | 181 DoRead(); |
175 | 182 |
176 if (events & talk_base::SE_CLOSE) | 183 if (events & talk_base::SE_CLOSE) |
177 SetState(CLOSED); | 184 SetState(CLOSED); |
178 } | 185 } |
179 | 186 |
180 void JingleChannel::SetState(State state) { | 187 void JingleChannel::SetState(State new_state) { |
181 if (state == state_) | 188 if (new_state != state_) { |
182 return; | 189 state_ = new_state; |
183 state_ = state; | 190 { |
184 callback_->OnStateChange(this, state); | 191 AutoLock auto_lock(state_lock_); |
| 192 if (!closed_) |
| 193 callback_->OnStateChange(this, new_state); |
| 194 } |
| 195 } |
185 } | 196 } |
186 | 197 |
187 void JingleChannel::Close() { | 198 void JingleChannel::Close() { |
188 base::WaitableEvent event(true, false); | 199 Close(NULL); |
189 thread_->message_loop()->PostTask( | |
190 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event)); | |
191 event.Wait(); | |
192 } | 200 } |
193 | 201 |
194 void JingleChannel::DoClose(base::WaitableEvent* done_event) { | 202 void JingleChannel::Close(Task* closed_task) { |
195 if (stream_.get()) | 203 { |
196 stream_->Close(); | 204 AutoLock auto_lock(state_lock_); |
197 SetState(CLOSED); | 205 if (closed_) { |
198 done_event->Signal(); | 206 // We are already closed. |
| 207 if (closed_task) |
| 208 thread_->message_loop()->PostTask(FROM_HERE, closed_task); |
| 209 return; |
| 210 } |
| 211 closed_ = true; |
| 212 if (closed_task) |
| 213 closed_task_.reset(closed_task); |
| 214 } |
| 215 |
| 216 thread_->message_loop()->PostTask( |
| 217 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose)); |
| 218 } |
| 219 |
| 220 |
| 221 void JingleChannel::DoClose() { |
| 222 DCHECK(closed_); |
| 223 stream_->Close(); |
| 224 stream_.reset(); |
| 225 |
| 226 // TODO(sergeyu): Even though we have called Close() for the stream, it |
| 227 // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_| |
| 228 // is called too early. If the client is closed right after that the other |
| 229 // side will not receive notification that the channel was closed. |
| 230 if (closed_task_.get()) { |
| 231 closed_task_->Run(); |
| 232 closed_task_.reset(); |
| 233 } |
199 } | 234 } |
200 | 235 |
201 size_t JingleChannel::write_buffer_size() { | 236 size_t JingleChannel::write_buffer_size() { |
202 AutoLock auto_lock(write_lock_); | 237 AutoLock auto_lock(write_lock_); |
203 return write_buffer_size_; | 238 return write_buffer_size_; |
204 } | 239 } |
205 | 240 |
206 } // namespace remoting | 241 } // namespace remoting |
OLD | NEW |