| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "blimp/net/stream_packet_writer.h" | |
| 6 | |
| 7 #include <iostream> | |
| 8 | |
| 9 #include "base/callback_helpers.h" | |
| 10 #include "base/location.h" | |
| 11 #include "base/logging.h" | |
| 12 #include "base/memory/ref_counted.h" | |
| 13 #include "base/single_thread_task_runner.h" | |
| 14 #include "base/sys_byteorder.h" | |
| 15 #include "base/threading/thread_task_runner_handle.h" | |
| 16 #include "blimp/common/proto/blimp_message.pb.h" | |
| 17 #include "blimp/net/blimp_stats.h" | |
| 18 #include "blimp/net/common.h" | |
| 19 #include "net/base/io_buffer.h" | |
| 20 #include "net/base/net_errors.h" | |
| 21 #include "net/socket/stream_socket.h" | |
| 22 | |
| 23 namespace blimp { | |
| 24 | |
| 25 std::ostream& operator<<(std::ostream& out, | |
| 26 const StreamPacketWriter::WriteState state) { | |
| 27 switch (state) { | |
| 28 case StreamPacketWriter::WriteState::IDLE: | |
| 29 out << "IDLE"; | |
| 30 break; | |
| 31 case StreamPacketWriter::WriteState::HEADER: | |
| 32 out << "HEADER"; | |
| 33 break; | |
| 34 case StreamPacketWriter::WriteState::PAYLOAD: | |
| 35 out << "PAYLOAD"; | |
| 36 break; | |
| 37 } | |
| 38 return out; | |
| 39 } | |
| 40 | |
| 41 StreamPacketWriter::StreamPacketWriter(net::StreamSocket* socket) | |
| 42 : write_state_(WriteState::IDLE), | |
| 43 socket_(socket), | |
| 44 header_buffer_( | |
| 45 new net::DrainableIOBuffer(new net::IOBuffer(kPacketHeaderSizeBytes), | |
| 46 kPacketHeaderSizeBytes)), | |
| 47 weak_factory_(this) { | |
| 48 DCHECK(socket_); | |
| 49 } | |
| 50 | |
| 51 StreamPacketWriter::~StreamPacketWriter() {} | |
| 52 | |
| 53 void StreamPacketWriter::WritePacket( | |
| 54 const scoped_refptr<net::DrainableIOBuffer>& data, | |
| 55 const net::CompletionCallback& callback) { | |
| 56 DCHECK_EQ(WriteState::IDLE, write_state_); | |
| 57 DCHECK(data); | |
| 58 CHECK(data->BytesRemaining()); | |
| 59 | |
| 60 write_state_ = WriteState::HEADER; | |
| 61 header_buffer_->SetOffset(0); | |
| 62 *reinterpret_cast<uint32_t*>(header_buffer_->data()) = | |
| 63 base::HostToNet32(data->BytesRemaining()); | |
| 64 payload_buffer_ = data; | |
| 65 | |
| 66 BlimpStats::GetInstance()->Add(BlimpStats::BYTES_SENT, | |
| 67 payload_buffer_->BytesRemaining()); | |
| 68 int result = DoWriteLoop(net::OK); | |
| 69 if (result != net::ERR_IO_PENDING) { | |
| 70 // Release the payload buffer, since the write operation has completed | |
| 71 // synchronously. | |
| 72 payload_buffer_ = nullptr; | |
| 73 | |
| 74 // Adapt synchronous completion to an asynchronous style. | |
| 75 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, | |
| 76 base::Bind(callback, result)); | |
| 77 } else { | |
| 78 callback_ = callback; | |
| 79 } | |
| 80 } | |
| 81 | |
| 82 int StreamPacketWriter::DoWriteLoop(int result) { | |
| 83 DCHECK_NE(net::ERR_IO_PENDING, result); | |
| 84 DCHECK_GE(result, 0); | |
| 85 DCHECK_NE(WriteState::IDLE, write_state_); | |
| 86 | |
| 87 while (result >= 0 && write_state_ != WriteState::IDLE) { | |
| 88 VLOG(2) << "DoWriteLoop (state=" << write_state_ << ", result=" << result | |
| 89 << ")"; | |
| 90 | |
| 91 switch (write_state_) { | |
| 92 case WriteState::HEADER: | |
| 93 result = DoWriteHeader(result); | |
| 94 break; | |
| 95 case WriteState::PAYLOAD: | |
| 96 result = DoWritePayload(result); | |
| 97 break; | |
| 98 case WriteState::IDLE: | |
| 99 NOTREACHED(); | |
| 100 result = net::ERR_UNEXPECTED; | |
| 101 break; | |
| 102 } | |
| 103 } | |
| 104 | |
| 105 return result; | |
| 106 } | |
| 107 | |
| 108 int StreamPacketWriter::DoWriteHeader(int result) { | |
| 109 DCHECK_EQ(WriteState::HEADER, write_state_); | |
| 110 DCHECK_GE(result, 0); | |
| 111 | |
| 112 header_buffer_->DidConsume(result); | |
| 113 if (header_buffer_->BytesRemaining() > 0) { | |
| 114 return DoWrite(header_buffer_.get(), header_buffer_->BytesRemaining()); | |
| 115 } | |
| 116 | |
| 117 write_state_ = WriteState::PAYLOAD; | |
| 118 return net::OK; | |
| 119 } | |
| 120 | |
| 121 int StreamPacketWriter::DoWritePayload(int result) { | |
| 122 DCHECK_EQ(WriteState::PAYLOAD, write_state_); | |
| 123 DCHECK_GE(result, 0); | |
| 124 | |
| 125 payload_buffer_->DidConsume(result); | |
| 126 if (payload_buffer_->BytesRemaining() > 0) { | |
| 127 return DoWrite(payload_buffer_.get(), payload_buffer_->BytesRemaining()); | |
| 128 } | |
| 129 | |
| 130 write_state_ = WriteState::IDLE; | |
| 131 return net::OK; | |
| 132 } | |
| 133 | |
| 134 void StreamPacketWriter::OnWriteComplete(int result) { | |
| 135 DCHECK_NE(net::ERR_IO_PENDING, result); | |
| 136 | |
| 137 if (result == 0) { | |
| 138 // Convert EOF return value to ERR_CONNECTION_CLOSED. | |
| 139 result = net::ERR_CONNECTION_CLOSED; | |
| 140 } else if (result > 0) { | |
| 141 // Write was successful; get the next one started. | |
| 142 result = DoWriteLoop(result); | |
| 143 if (result == net::ERR_IO_PENDING) { | |
| 144 return; | |
| 145 } | |
| 146 } | |
| 147 | |
| 148 payload_buffer_ = nullptr; | |
| 149 base::ResetAndReturn(&callback_).Run(result); | |
| 150 } | |
| 151 | |
| 152 int StreamPacketWriter::DoWrite(net::IOBuffer* buf, int buf_len) { | |
| 153 int result = socket_->Write(buf, buf_len, | |
| 154 base::Bind(&StreamPacketWriter::OnWriteComplete, | |
| 155 weak_factory_.GetWeakPtr())); | |
| 156 return (result != 0 ? result : net::ERR_CONNECTION_CLOSED); | |
| 157 } | |
| 158 | |
| 159 } // namespace blimp | |
| OLD | NEW |