| OLD | NEW |
| 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/jingle_glue/jingle_channel.h" | 5 #include "remoting/jingle_glue/jingle_channel.h" |
| 6 | 6 |
| 7 #include "base/lock.h" | 7 #include "base/lock.h" |
| 8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/message_loop.h" | 9 #include "base/message_loop.h" |
| 10 #include "base/waitable_event.h" | 10 #include "base/waitable_event.h" |
| 11 #include "media/base/data_buffer.h" | 11 #include "media/base/data_buffer.h" |
| 12 #include "remoting/jingle_glue/jingle_thread.h" | 12 #include "remoting/jingle_glue/jingle_thread.h" |
| 13 #include "third_party/libjingle/source/talk/base/stream.h" | 13 #include "third_party/libjingle/source/talk/base/stream.h" |
| 14 | 14 |
| 15 using media::DataBuffer; | 15 using media::DataBuffer; |
| 16 | 16 |
| 17 namespace remoting { | 17 namespace remoting { |
| 18 | 18 |
| 19 // Size of a read buffer chunk in bytes. | 19 // Size of a read buffer chunk in bytes. |
| 20 const size_t kReadBufferSize = 4096; | 20 const size_t kReadBufferSize = 4096; |
| 21 | 21 |
| 22 JingleChannel::JingleChannel(Callback* callback) | 22 JingleChannel::JingleChannel(Callback* callback) |
| 23 : state_(INITIALIZING), | 23 : state_(INITIALIZING), |
| 24 callback_(callback), | 24 callback_(callback), |
| 25 closed_(false), |
| 25 event_handler_(this), | 26 event_handler_(this), |
| 26 write_buffer_size_(0), | 27 write_buffer_size_(0), |
| 27 current_write_buf_pos_(0) { | 28 current_write_buf_pos_(0) { |
| 28 DCHECK(callback_ != NULL); | 29 DCHECK(callback_ != NULL); |
| 29 } | 30 } |
| 30 | 31 |
| 31 // This constructor is only used in unit test. | 32 // This constructor is only used in unit test. |
| 32 JingleChannel::JingleChannel() | 33 JingleChannel::JingleChannel() |
| 33 : state_(CLOSED), | 34 : state_(CLOSED), |
| 35 closed_(false), |
| 34 write_buffer_size_(0), | 36 write_buffer_size_(0), |
| 35 current_write_buf_pos_(0) { | 37 current_write_buf_pos_(0) { |
| 36 } | 38 } |
| 37 | 39 |
| 38 JingleChannel::~JingleChannel() { | 40 JingleChannel::~JingleChannel() { |
| 39 DCHECK_EQ(CLOSED, state_); | 41 DCHECK(closed_ || stream_ == NULL); |
| 40 } | 42 } |
| 41 | 43 |
| 42 void JingleChannel::Init(JingleThread* thread, | 44 void JingleChannel::Init(JingleThread* thread, |
| 43 talk_base::StreamInterface* stream, | 45 talk_base::StreamInterface* stream, |
| 44 const std::string& jid) { | 46 const std::string& jid) { |
| 45 thread_ = thread; | 47 thread_ = thread; |
| 46 stream_.reset(stream); | 48 stream_.reset(stream); |
| 47 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); | 49 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); |
| 48 jid_ = jid; | 50 jid_ = jid; |
| 49 | 51 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 94 scoped_refptr<DataBuffer> buffer( | 96 scoped_refptr<DataBuffer> buffer( |
| 95 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); | 97 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); |
| 96 size_t bytes_read; | 98 size_t bytes_read; |
| 97 int error; | 99 int error; |
| 98 talk_base::StreamResult result = stream_->Read( | 100 talk_base::StreamResult result = stream_->Read( |
| 99 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); | 101 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); |
| 100 switch (result) { | 102 switch (result) { |
| 101 case talk_base::SR_SUCCESS: { | 103 case talk_base::SR_SUCCESS: { |
| 102 DCHECK_GT(bytes_read, 0U); | 104 DCHECK_GT(bytes_read, 0U); |
| 103 buffer->SetDataSize(bytes_read); | 105 buffer->SetDataSize(bytes_read); |
| 104 callback_->OnPacketReceived(this, buffer); | 106 { |
| 107 AutoLock auto_lock(state_lock_); |
| 108 // Drop received data if the channel is already closed. |
| 109 if (!closed_) |
| 110 callback_->OnPacketReceived(this, buffer); |
| 111 } |
| 105 break; | 112 break; |
| 106 } | 113 } |
| 107 case talk_base::SR_BLOCK: { | 114 case talk_base::SR_BLOCK: { |
| 108 return; | 115 return; |
| 109 } | 116 } |
| 110 case talk_base::SR_EOS: { | 117 case talk_base::SR_EOS: { |
| 111 SetState(CLOSED); | 118 SetState(CLOSED); |
| 112 return; | 119 return; |
| 113 } | 120 } |
| 114 case talk_base::SR_ERROR: { | 121 case talk_base::SR_ERROR: { |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 170 if (state_ == OPEN && (events & talk_base::SE_WRITE)) | 177 if (state_ == OPEN && (events & talk_base::SE_WRITE)) |
| 171 DoWrite(); | 178 DoWrite(); |
| 172 | 179 |
| 173 if (state_ == OPEN && (events & talk_base::SE_READ)) | 180 if (state_ == OPEN && (events & talk_base::SE_READ)) |
| 174 DoRead(); | 181 DoRead(); |
| 175 | 182 |
| 176 if (events & talk_base::SE_CLOSE) | 183 if (events & talk_base::SE_CLOSE) |
| 177 SetState(CLOSED); | 184 SetState(CLOSED); |
| 178 } | 185 } |
| 179 | 186 |
| 180 void JingleChannel::SetState(State state) { | 187 void JingleChannel::SetState(State new_state) { |
| 181 if (state == state_) | 188 if (new_state != state_) { |
| 182 return; | 189 state_ = new_state; |
| 183 state_ = state; | 190 { |
| 184 callback_->OnStateChange(this, state); | 191 AutoLock auto_lock(state_lock_); |
| 192 if (!closed_) |
| 193 callback_->OnStateChange(this, new_state); |
| 194 } |
| 195 } |
| 185 } | 196 } |
| 186 | 197 |
| 187 void JingleChannel::Close() { | 198 void JingleChannel::Close() { |
| 188 base::WaitableEvent event(true, false); | 199 Close(NULL); |
| 189 thread_->message_loop()->PostTask( | |
| 190 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event)); | |
| 191 event.Wait(); | |
| 192 } | 200 } |
| 193 | 201 |
| 194 void JingleChannel::DoClose(base::WaitableEvent* done_event) { | 202 void JingleChannel::Close(Task* closed_task) { |
| 195 if (stream_.get()) | 203 { |
| 196 stream_->Close(); | 204 AutoLock auto_lock(state_lock_); |
| 197 SetState(CLOSED); | 205 if (closed_) { |
| 198 done_event->Signal(); | 206 // We are already closed. |
| 207 if (closed_task) |
| 208 thread_->message_loop()->PostTask(FROM_HERE, closed_task); |
| 209 return; |
| 210 } |
| 211 closed_ = true; |
| 212 if (closed_task) |
| 213 closed_task_.reset(closed_task); |
| 214 } |
| 215 |
| 216 thread_->message_loop()->PostTask( |
| 217 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose)); |
| 218 } |
| 219 |
| 220 |
| 221 void JingleChannel::DoClose() { |
| 222 DCHECK(closed_); |
| 223 stream_->Close(); |
| 224 stream_.reset(); |
| 225 |
| 226 // TODO(sergeyu): Even though we have called Close() for the stream, it |
| 227 // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_| |
| 228 // is called too early. If the client is closed right after that the other |
| 229 // side will not receive notification that the channel was closed. |
| 230 if (closed_task_.get()) { |
| 231 closed_task_->Run(); |
| 232 closed_task_.reset(); |
| 233 } |
| 199 } | 234 } |
| 200 | 235 |
| 201 size_t JingleChannel::write_buffer_size() { | 236 size_t JingleChannel::write_buffer_size() { |
| 202 AutoLock auto_lock(write_lock_); | 237 AutoLock auto_lock(write_lock_); |
| 203 return write_buffer_size_; | 238 return write_buffer_size_; |
| 204 } | 239 } |
| 205 | 240 |
| 206 } // namespace remoting | 241 } // namespace remoting |
| OLD | NEW |