Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(369)

Side by Side Diff: remoting/jingle_glue/jingle_channel.cc

Issue 3167047: Jingle_glue bugfixes. (Closed)
Patch Set: - Created 10 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « remoting/jingle_glue/jingle_channel.h ('k') | remoting/jingle_glue/jingle_channel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/jingle_glue/jingle_channel.h" 5 #include "remoting/jingle_glue/jingle_channel.h"
6 6
7 #include "base/lock.h" 7 #include "base/lock.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop.h" 9 #include "base/message_loop.h"
10 #include "base/waitable_event.h" 10 #include "base/waitable_event.h"
11 #include "media/base/data_buffer.h" 11 #include "media/base/data_buffer.h"
12 #include "remoting/jingle_glue/jingle_thread.h" 12 #include "remoting/jingle_glue/jingle_thread.h"
13 #include "third_party/libjingle/source/talk/base/stream.h" 13 #include "third_party/libjingle/source/talk/base/stream.h"
14 14
15 using media::DataBuffer; 15 using media::DataBuffer;
16 16
17 namespace remoting { 17 namespace remoting {
18 18
19 // Size of a read buffer chunk in bytes. 19 // Size of a read buffer chunk in bytes.
20 const size_t kReadBufferSize = 4096; 20 const size_t kReadBufferSize = 4096;
21 21
22 JingleChannel::JingleChannel(Callback* callback) 22 JingleChannel::JingleChannel(Callback* callback)
23 : state_(INITIALIZING), 23 : state_(INITIALIZING),
24 callback_(callback), 24 callback_(callback),
25 closed_(false),
25 event_handler_(this), 26 event_handler_(this),
26 write_buffer_size_(0), 27 write_buffer_size_(0),
27 current_write_buf_pos_(0) { 28 current_write_buf_pos_(0) {
28 DCHECK(callback_ != NULL); 29 DCHECK(callback_ != NULL);
29 } 30 }
30 31
31 // This constructor is only used in unit test. 32 // This constructor is only used in unit test.
32 JingleChannel::JingleChannel() 33 JingleChannel::JingleChannel()
33 : state_(CLOSED), 34 : state_(CLOSED),
35 closed_(false),
34 write_buffer_size_(0), 36 write_buffer_size_(0),
35 current_write_buf_pos_(0) { 37 current_write_buf_pos_(0) {
36 } 38 }
37 39
38 JingleChannel::~JingleChannel() { 40 JingleChannel::~JingleChannel() {
39 DCHECK_EQ(CLOSED, state_); 41 DCHECK(closed_ || stream_ == NULL);
40 } 42 }
41 43
42 void JingleChannel::Init(JingleThread* thread, 44 void JingleChannel::Init(JingleThread* thread,
43 talk_base::StreamInterface* stream, 45 talk_base::StreamInterface* stream,
44 const std::string& jid) { 46 const std::string& jid) {
45 thread_ = thread; 47 thread_ = thread;
46 stream_.reset(stream); 48 stream_.reset(stream);
47 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); 49 stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent);
48 jid_ = jid; 50 jid_ = jid;
49 51
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
94 scoped_refptr<DataBuffer> buffer( 96 scoped_refptr<DataBuffer> buffer(
95 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); 97 new DataBuffer(new uint8[bytes_to_read], kReadBufferSize));
96 size_t bytes_read; 98 size_t bytes_read;
97 int error; 99 int error;
98 talk_base::StreamResult result = stream_->Read( 100 talk_base::StreamResult result = stream_->Read(
99 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); 101 buffer->GetWritableData(), bytes_to_read, &bytes_read, &error);
100 switch (result) { 102 switch (result) {
101 case talk_base::SR_SUCCESS: { 103 case talk_base::SR_SUCCESS: {
102 DCHECK_GT(bytes_read, 0U); 104 DCHECK_GT(bytes_read, 0U);
103 buffer->SetDataSize(bytes_read); 105 buffer->SetDataSize(bytes_read);
104 callback_->OnPacketReceived(this, buffer); 106 {
107 AutoLock auto_lock(state_lock_);
108 // Drop received data if the channel is already closed.
109 if (!closed_)
110 callback_->OnPacketReceived(this, buffer);
111 }
105 break; 112 break;
106 } 113 }
107 case talk_base::SR_BLOCK: { 114 case talk_base::SR_BLOCK: {
108 return; 115 return;
109 } 116 }
110 case talk_base::SR_EOS: { 117 case talk_base::SR_EOS: {
111 SetState(CLOSED); 118 SetState(CLOSED);
112 return; 119 return;
113 } 120 }
114 case talk_base::SR_ERROR: { 121 case talk_base::SR_ERROR: {
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 if (state_ == OPEN && (events & talk_base::SE_WRITE)) 177 if (state_ == OPEN && (events & talk_base::SE_WRITE))
171 DoWrite(); 178 DoWrite();
172 179
173 if (state_ == OPEN && (events & talk_base::SE_READ)) 180 if (state_ == OPEN && (events & talk_base::SE_READ))
174 DoRead(); 181 DoRead();
175 182
176 if (events & talk_base::SE_CLOSE) 183 if (events & talk_base::SE_CLOSE)
177 SetState(CLOSED); 184 SetState(CLOSED);
178 } 185 }
179 186
180 void JingleChannel::SetState(State state) { 187 void JingleChannel::SetState(State new_state) {
181 if (state == state_) 188 if (new_state != state_) {
182 return; 189 state_ = new_state;
183 state_ = state; 190 {
184 callback_->OnStateChange(this, state); 191 AutoLock auto_lock(state_lock_);
192 if (!closed_)
193 callback_->OnStateChange(this, new_state);
194 }
195 }
185 } 196 }
186 197
187 void JingleChannel::Close() { 198 void JingleChannel::Close() {
188 base::WaitableEvent event(true, false); 199 Close(NULL);
189 thread_->message_loop()->PostTask(
190 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event));
191 event.Wait();
192 } 200 }
193 201
194 void JingleChannel::DoClose(base::WaitableEvent* done_event) { 202 void JingleChannel::Close(Task* closed_task) {
195 if (stream_.get()) 203 {
196 stream_->Close(); 204 AutoLock auto_lock(state_lock_);
197 SetState(CLOSED); 205 if (closed_) {
198 done_event->Signal(); 206 // We are already closed.
207 if (closed_task)
208 thread_->message_loop()->PostTask(FROM_HERE, closed_task);
209 return;
210 }
211 closed_ = true;
212 if (closed_task)
213 closed_task_.reset(closed_task);
214 }
215
216 thread_->message_loop()->PostTask(
217 FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose));
218 }
219
220
221 void JingleChannel::DoClose() {
222 DCHECK(closed_);
223 stream_->Close();
224 stream_.reset();
225
226 // TODO(sergeyu): Even though we have called Close() for the stream, it
227 // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_|
228 // is called too early. If the client is closed right after that the other
229 // side will not receive notification that the channel was closed.
230 if (closed_task_.get()) {
231 closed_task_->Run();
232 closed_task_.reset();
233 }
199 } 234 }
200 235
201 size_t JingleChannel::write_buffer_size() { 236 size_t JingleChannel::write_buffer_size() {
202 AutoLock auto_lock(write_lock_); 237 AutoLock auto_lock(write_lock_);
203 return write_buffer_size_; 238 return write_buffer_size_;
204 } 239 }
205 240
206 } // namespace remoting 241 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/jingle_glue/jingle_channel.h ('k') | remoting/jingle_glue/jingle_channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698