Index: content/renderer/media/rtc_data_channel_handler.cc |
diff --git a/content/renderer/media/rtc_data_channel_handler.cc b/content/renderer/media/rtc_data_channel_handler.cc |
index 004cc46deb68c1a671c59eecdf2b8813aa01bd0d..084a02018cb9942a5c03e27193caade5a8351076 100644 |
--- a/content/renderer/media/rtc_data_channel_handler.cc |
+++ b/content/renderer/media/rtc_data_channel_handler.cc |
@@ -7,9 +7,13 @@ |
#include <limits> |
#include <string> |
+#include "base/bind.h" |
+#include "base/location.h" |
#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
#include "base/metrics/histogram.h" |
#include "base/strings/utf_string_conversions.h" |
+#include "base/thread_task_runner_handle.h" |
namespace content { |
@@ -32,101 +36,197 @@ void IncrementCounter(DataChannelCounters counter) { |
} // namespace |
+// Implementation of DataChannelObserver that receives events on libjingle's |
+// signaling thread and forwards them over to the main thread for handling. |
+// Since the handler's lifetime is scoped potentially narrower than what |
+// the callbacks allow for, we use reference counting here to make sure |
+// all callbacks have a valid pointer but won't do anything if the handler |
+// has gone away. |
+RtcDataChannelHandler::Observer::Observer( |
+ RtcDataChannelHandler* handler, |
+ const scoped_refptr<base::SingleThreadTaskRunner>& main_thread, |
+ webrtc::DataChannelInterface* channel) |
+ : handler_(handler), main_thread_(main_thread), channel_(channel) { |
+ channel_->RegisterObserver(this); |
+} |
+ |
+RtcDataChannelHandler::Observer::~Observer() {} |
+ |
+const scoped_refptr<base::SingleThreadTaskRunner>& |
+RtcDataChannelHandler::Observer::main_thread() const { |
+ return main_thread_; |
+} |
+ |
+const scoped_refptr<webrtc::DataChannelInterface>& |
+RtcDataChannelHandler::Observer::channel() const { |
+ return channel_; |
+} |
+ |
+void RtcDataChannelHandler::Observer::ClearHandler() { |
+ DCHECK(main_thread_->BelongsToCurrentThread()); |
+ handler_ = nullptr; |
+} |
+ |
+void RtcDataChannelHandler::Observer::OnStateChange() { |
+ main_thread_->PostTask(FROM_HERE, base::Bind( |
+ &RtcDataChannelHandler::Observer::OnStateChangeImpl, this, |
+ channel_->state())); |
+} |
+ |
+void RtcDataChannelHandler::Observer::OnMessage( |
+ const webrtc::DataBuffer& buffer) { |
+ // TODO(tommi): Figure out a way to transfer ownership of the buffer without |
+ // having to create a copy. See webrtc bug 3967. |
+ scoped_ptr<webrtc::DataBuffer> new_buffer(new webrtc::DataBuffer(buffer)); |
+ main_thread_->PostTask(FROM_HERE, |
+ base::Bind(&RtcDataChannelHandler::Observer::OnMessageImpl, this, |
+ base::Passed(&new_buffer))); |
+} |
+ |
+void RtcDataChannelHandler::Observer::OnStateChangeImpl( |
+ webrtc::DataChannelInterface::DataState state) { |
+ DCHECK(main_thread_->BelongsToCurrentThread()); |
+ if (handler_) |
+ handler_->OnStateChange(state); |
+} |
+ |
+void RtcDataChannelHandler::Observer::OnMessageImpl( |
+ scoped_ptr<webrtc::DataBuffer> buffer) { |
+ DCHECK(main_thread_->BelongsToCurrentThread()); |
+ if (handler_) |
+ handler_->OnMessage(buffer.Pass()); |
+} |
+ |
RtcDataChannelHandler::RtcDataChannelHandler( |
+ const scoped_refptr<base::SingleThreadTaskRunner>& main_thread, |
webrtc::DataChannelInterface* channel) |
- : channel_(channel), |
+ : observer_(new Observer(this, main_thread, channel)), |
webkit_client_(NULL) { |
- DVLOG(1) << "::ctor"; |
- channel_->RegisterObserver(this); |
+ DVLOG(1) << "RtcDataChannelHandler " << channel->label(); |
+ |
+ // Detach from the ctor thread since we can be constructed on either the main |
+ // or signaling threads. |
+ thread_checker_.DetachFromThread(); |
IncrementCounter(CHANNEL_CREATED); |
- if (isReliable()) |
+ if (channel->reliable()) |
IncrementCounter(CHANNEL_RELIABLE); |
- if (ordered()) |
+ if (channel->ordered()) |
IncrementCounter(CHANNEL_ORDERED); |
- if (negotiated()) |
+ if (channel->negotiated()) |
IncrementCounter(CHANNEL_NEGOTIATED); |
UMA_HISTOGRAM_CUSTOM_COUNTS("WebRTC.DataChannelMaxRetransmits", |
- maxRetransmits(), 0, |
+ channel->maxRetransmits(), 0, |
std::numeric_limits<unsigned short>::max(), 50); |
UMA_HISTOGRAM_CUSTOM_COUNTS("WebRTC.DataChannelMaxRetransmitTime", |
- maxRetransmitTime(), 0, |
+ channel->maxRetransmitTime(), 0, |
std::numeric_limits<unsigned short>::max(), 50); |
} |
RtcDataChannelHandler::~RtcDataChannelHandler() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
DVLOG(1) << "::dtor"; |
- channel_->UnregisterObserver(); |
+ observer_->ClearHandler(); |
} |
void RtcDataChannelHandler::setClient( |
blink::WebRTCDataChannelHandlerClient* client) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
webkit_client_ = client; |
} |
blink::WebString RtcDataChannelHandler::label() { |
- return base::UTF8ToUTF16(channel_->label()); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return base::UTF8ToUTF16(channel()->label()); |
} |
bool RtcDataChannelHandler::isReliable() { |
- return channel_->reliable(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->reliable(); |
} |
bool RtcDataChannelHandler::ordered() const { |
- return channel_->ordered(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->ordered(); |
} |
unsigned short RtcDataChannelHandler::maxRetransmitTime() const { |
- return channel_->maxRetransmitTime(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->maxRetransmitTime(); |
} |
unsigned short RtcDataChannelHandler::maxRetransmits() const { |
- return channel_->maxRetransmits(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->maxRetransmits(); |
} |
blink::WebString RtcDataChannelHandler::protocol() const { |
- return base::UTF8ToUTF16(channel_->protocol()); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return base::UTF8ToUTF16(channel()->protocol()); |
} |
bool RtcDataChannelHandler::negotiated() const { |
- return channel_->negotiated(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->negotiated(); |
} |
unsigned short RtcDataChannelHandler::id() const { |
- return channel_->id(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->id(); |
} |
unsigned long RtcDataChannelHandler::bufferedAmount() { |
- return channel_->buffered_amount(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return channel()->buffered_amount(); |
} |
bool RtcDataChannelHandler::sendStringData(const blink::WebString& data) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
std::string utf8_buffer = base::UTF16ToUTF8(data); |
rtc::Buffer buffer(utf8_buffer.c_str(), utf8_buffer.length()); |
webrtc::DataBuffer data_buffer(buffer, false); |
RecordMessageSent(data_buffer.size()); |
- return channel_->Send(data_buffer); |
+ return channel()->Send(data_buffer); |
} |
bool RtcDataChannelHandler::sendRawData(const char* data, size_t length) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
rtc::Buffer buffer(data, length); |
webrtc::DataBuffer data_buffer(buffer, true); |
RecordMessageSent(data_buffer.size()); |
- return channel_->Send(data_buffer); |
+ return channel()->Send(data_buffer); |
} |
void RtcDataChannelHandler::close() { |
- channel_->Close(); |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ channel()->Close(); |
+ // Note that even though Close() will run synchronously, the readyState has |
+ // not changed yet since the state changes that occured on the signaling |
+ // thread have been posted to this thread and will be delivered later. |
+ // To work around this, we could have a nested loop here and deliver the |
+ // callbacks before running from this function, but doing so can cause |
+ // undesired side effects in webkit, so we don't, and instead rely on the |
+ // user of the API handling readyState notifications. |
+} |
+ |
+const scoped_refptr<webrtc::DataChannelInterface>& |
+RtcDataChannelHandler::channel() const { |
+ return observer_->channel(); |
} |
-void RtcDataChannelHandler::OnStateChange() { |
+void RtcDataChannelHandler::OnStateChange( |
+ webrtc::DataChannelInterface::DataState state) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DVLOG(1) << "OnStateChange " << state; |
+ |
if (!webkit_client_) { |
- LOG(ERROR) << "WebRTCDataChannelHandlerClient not set."; |
+ // If this happens, the web application will not get notified of changes. |
+ NOTREACHED() << "WebRTCDataChannelHandlerClient not set."; |
return; |
} |
- DVLOG(1) << "OnStateChange " << channel_->state(); |
- switch (channel_->state()) { |
+ |
+ switch (state) { |
case webrtc::DataChannelInterface::kConnecting: |
webkit_client_->didChangeReadyState( |
blink::WebRTCDataChannelHandlerClient::ReadyStateConnecting); |
@@ -150,17 +250,21 @@ void RtcDataChannelHandler::OnStateChange() { |
} |
} |
-void RtcDataChannelHandler::OnMessage(const webrtc::DataBuffer& buffer) { |
+void RtcDataChannelHandler::OnMessage(scoped_ptr<webrtc::DataBuffer> buffer) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
if (!webkit_client_) { |
- LOG(ERROR) << "WebRTCDataChannelHandlerClient not set."; |
+ // If this happens, the web application will not get notified of changes. |
+ NOTREACHED() << "WebRTCDataChannelHandlerClient not set."; |
return; |
} |
- if (buffer.binary) { |
- webkit_client_->didReceiveRawData(buffer.data.data(), buffer.data.length()); |
+ if (buffer->binary) { |
+ webkit_client_->didReceiveRawData(buffer->data.data(), |
+ buffer->data.length()); |
} else { |
base::string16 utf16; |
- if (!base::UTF8ToUTF16(buffer.data.data(), buffer.data.length(), &utf16)) { |
+ if (!base::UTF8ToUTF16(buffer->data.data(), buffer->data.length(), |
+ &utf16)) { |
LOG(ERROR) << "Failed convert received data to UTF16"; |
return; |
} |
@@ -181,7 +285,7 @@ void RtcDataChannelHandler::RecordMessageSent(size_t num_bytes) { |
const int kMaxBucketSize = 100 * 1024 * 1024; |
const int kNumBuckets = 50; |
- if (isReliable()) { |
+ if (channel()->reliable()) { |
UMA_HISTOGRAM_CUSTOM_COUNTS("WebRTC.ReliableDataChannelMessageSize", |
num_bytes, |
1, kMaxBucketSize, kNumBuckets); |