Index: remoting/host/client_connection.cc |
=================================================================== |
--- remoting/host/client_connection.cc (revision 0) |
+++ remoting/host/client_connection.cc (revision 0) |
@@ -0,0 +1,173 @@ |
+// Copyright (c) 2010 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "remoting/host/client_connection.h" |
+ |
+#include "google/protobuf/message.h" |
+#include "media/base/data_buffer.h" |
+#include "remoting/base/protocol_decoder.h" |
+#include "remoting/base/protocol_util.h" |
+ |
+using media::DataBuffer; |
+ |
+namespace remoting { |
+ |
+// Determine how many update streams we should count to find the size of |
+// average update stream. |
+static const int kAverageUpdateStream = 10; |
+ |
+ClientConnection::ClientConnection(MessageLoop* message_loop, |
+ ProtocolDecoder* decoder, |
+ EventHandler* handler) |
+ : loop_(message_loop), |
+ decoder_(decoder), |
+ size_in_queue_(0), |
+ update_stream_size_(0), |
+ handler_(handler) { |
+ DCHECK(loop_); |
+ DCHECK(decoder_.get()); |
+ DCHECK(handler_); |
+} |
+ |
+ClientConnection::~ClientConnection() { |
+ // TODO(hclam): When we shut down the viewer we may have to close the |
+ // jingle channel. |
+} |
+ |
+void ClientConnection::SendInitClientMessage(int width, int height) { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ DCHECK(!update_stream_size_); |
+ DCHECK(channel_.get()); |
+ |
+ chromotocol_pb::HostMessage msg; |
+ msg.mutable_init_client()->set_width(width); |
+ msg.mutable_init_client()->set_height(height); |
+ DCHECK(msg.IsInitialized()); |
+ channel_->Write(SerializeAndFrameMessage(msg)); |
+} |
+ |
+void ClientConnection::SendBeginUpdateStreamMessage() { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ DCHECK(channel_.get()); |
+ |
+ chromotocol_pb::HostMessage msg; |
+ msg.mutable_begin_update_stream(); |
+ DCHECK(msg.IsInitialized()); |
+ |
+ scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg); |
+ DCHECK(!update_stream_size_); |
+ update_stream_size_ += data->GetDataSize(); |
+ channel_->Write(data); |
+} |
+ |
+void ClientConnection::SendUpdateStreamPacketMessage( |
+ chromotocol_pb::UpdateStreamPacketHeader* header, |
+ scoped_refptr<DataBuffer> data) { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ DCHECK(channel_.get()); |
+ |
+ chromotocol_pb::HostMessage msg; |
+ msg.mutable_update_stream_packet()->mutable_header()->CopyFrom(*header); |
+ // TODO(hclam): This introduce one memory copy. Eliminate it. |
+ msg.mutable_update_stream_packet()->set_data( |
+ data->GetData(), data->GetDataSize()); |
+ DCHECK(msg.IsInitialized()); |
+ |
+ scoped_refptr<DataBuffer> encoded_data = SerializeAndFrameMessage(msg); |
+ update_stream_size_ += data->GetDataSize(); |
+ channel_->Write(encoded_data); |
+} |
+ |
+void ClientConnection::SendEndUpdateStreamMessage() { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ DCHECK(channel_.get()); |
+ |
+ chromotocol_pb::HostMessage msg; |
+ msg.mutable_end_update_stream(); |
+ DCHECK(msg.IsInitialized()); |
+ |
+ scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg); |
+ update_stream_size_ += data->GetDataSize(); |
+ channel_->Write(data); |
+ |
+ // Here's some logic to help finding the average update stream size. |
+ size_in_queue_ += update_stream_size_; |
+ size_queue_.push_back(update_stream_size_); |
+ if (size_queue_.size() > kAverageUpdateStream) { |
+ size_in_queue_ -= size_queue_.front(); |
+ size_queue_.pop_front(); |
+ DCHECK_GE(size_in_queue_, 0); |
+ } |
+ update_stream_size_ = 0; |
+} |
+ |
+int ClientConnection::GetPendingUpdateStreamMessages() { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ |
+ if (!size_queue_.size()) |
+ return 0; |
+ int average_size = size_in_queue_ / size_queue_.size(); |
+ if (!average_size) |
+ return 0; |
+ return channel_->write_buffer_size() / average_size; |
+} |
+ |
+void ClientConnection::Disconnect() { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ |
+ DCHECK(channel_.get()); |
+ channel_->Close(); |
+} |
+ |
+void ClientConnection::OnStateChange(JingleChannel* channel, |
+ JingleChannel::State state) { |
+ DCHECK(channel); |
+ loop_->PostTask(FROM_HERE, |
+ NewRunnableMethod(this, &ClientConnection::StateChangeTask, state)); |
+} |
+ |
+void ClientConnection::OnPacketReceived(JingleChannel* channel, |
+ scoped_refptr<DataBuffer> data) { |
+ DCHECK_EQ(channel_.get(), channel); |
+ loop_->PostTask(FROM_HERE, |
+ NewRunnableMethod(this, &ClientConnection::PacketReceivedTask, data)); |
+} |
+ |
+void ClientConnection::StateChangeTask(JingleChannel::State state) { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ |
+ DCHECK(handler_); |
+ switch(state) { |
+ case JingleChannel::CONNECTING: |
+ break; |
+ // Don't care about this message. |
+ case JingleChannel::OPEN: |
+ handler_->OnConnectionOpened(this); |
+ break; |
+ case JingleChannel::CLOSED: |
+ handler_->OnConnectionClosed(this); |
+ break; |
+ case JingleChannel::FAILED: |
+ handler_->OnConnectionFailed(this); |
+ break; |
+ default: |
+ // We shouldn't receive other states. |
+ NOTREACHED(); |
+ } |
+} |
+ |
+void ClientConnection::PacketReceivedTask(scoped_refptr<DataBuffer> data) { |
+ DCHECK_EQ(loop_, MessageLoop::current()); |
+ |
+ // Use the decoder to parse incoming data. |
+ DCHECK(decoder_.get()); |
+ ClientMessageList list; |
+ decoder_->ParseClientMessages(data, &list); |
+ |
+ // Then submit the messages to the handler. |
+ DCHECK(handler_); |
+ handler_->HandleMessages(this, &list); |
+} |
+ |
+} // namespace remoting |
Property changes on: remoting/host/client_connection.cc |
___________________________________________________________________ |
Added: svn:eol-style |
+ LF |