OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "blimp/net/stream_packet_writer.h" | |
6 | |
7 #include <iostream> | |
8 | |
9 #include "base/callback_helpers.h" | |
10 #include "base/location.h" | |
11 #include "base/logging.h" | |
12 #include "base/memory/ref_counted.h" | |
13 #include "base/single_thread_task_runner.h" | |
14 #include "base/sys_byteorder.h" | |
15 #include "base/threading/thread_task_runner_handle.h" | |
16 #include "blimp/common/proto/blimp_message.pb.h" | |
17 #include "blimp/net/blimp_stats.h" | |
18 #include "blimp/net/common.h" | |
19 #include "net/base/io_buffer.h" | |
20 #include "net/base/net_errors.h" | |
21 #include "net/socket/stream_socket.h" | |
22 | |
23 namespace blimp { | |
24 | |
25 std::ostream& operator<<(std::ostream& out, | |
26 const StreamPacketWriter::WriteState state) { | |
27 switch (state) { | |
28 case StreamPacketWriter::WriteState::IDLE: | |
29 out << "IDLE"; | |
30 break; | |
31 case StreamPacketWriter::WriteState::HEADER: | |
32 out << "HEADER"; | |
33 break; | |
34 case StreamPacketWriter::WriteState::PAYLOAD: | |
35 out << "PAYLOAD"; | |
36 break; | |
37 } | |
38 return out; | |
39 } | |
40 | |
41 StreamPacketWriter::StreamPacketWriter(net::StreamSocket* socket) | |
42 : write_state_(WriteState::IDLE), | |
43 socket_(socket), | |
44 header_buffer_( | |
45 new net::DrainableIOBuffer(new net::IOBuffer(kPacketHeaderSizeBytes), | |
46 kPacketHeaderSizeBytes)), | |
47 weak_factory_(this) { | |
48 DCHECK(socket_); | |
49 } | |
50 | |
51 StreamPacketWriter::~StreamPacketWriter() {} | |
52 | |
53 void StreamPacketWriter::WritePacket( | |
54 const scoped_refptr<net::DrainableIOBuffer>& data, | |
55 const net::CompletionCallback& callback) { | |
56 DCHECK_EQ(WriteState::IDLE, write_state_); | |
57 DCHECK(data); | |
58 CHECK(data->BytesRemaining()); | |
59 | |
60 write_state_ = WriteState::HEADER; | |
61 header_buffer_->SetOffset(0); | |
62 *reinterpret_cast<uint32_t*>(header_buffer_->data()) = | |
63 base::HostToNet32(data->BytesRemaining()); | |
64 payload_buffer_ = data; | |
65 | |
66 BlimpStats::GetInstance()->Add(BlimpStats::BYTES_SENT, | |
67 payload_buffer_->BytesRemaining()); | |
68 int result = DoWriteLoop(net::OK); | |
69 if (result != net::ERR_IO_PENDING) { | |
70 // Release the payload buffer, since the write operation has completed | |
71 // synchronously. | |
72 payload_buffer_ = nullptr; | |
73 | |
74 // Adapt synchronous completion to an asynchronous style. | |
75 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, | |
76 base::Bind(callback, result)); | |
77 } else { | |
78 callback_ = callback; | |
79 } | |
80 } | |
81 | |
82 int StreamPacketWriter::DoWriteLoop(int result) { | |
83 DCHECK_NE(net::ERR_IO_PENDING, result); | |
84 DCHECK_GE(result, 0); | |
85 DCHECK_NE(WriteState::IDLE, write_state_); | |
86 | |
87 while (result >= 0 && write_state_ != WriteState::IDLE) { | |
88 VLOG(2) << "DoWriteLoop (state=" << write_state_ << ", result=" << result | |
89 << ")"; | |
90 | |
91 switch (write_state_) { | |
92 case WriteState::HEADER: | |
93 result = DoWriteHeader(result); | |
94 break; | |
95 case WriteState::PAYLOAD: | |
96 result = DoWritePayload(result); | |
97 break; | |
98 case WriteState::IDLE: | |
99 NOTREACHED(); | |
100 result = net::ERR_UNEXPECTED; | |
101 break; | |
102 } | |
103 } | |
104 | |
105 return result; | |
106 } | |
107 | |
108 int StreamPacketWriter::DoWriteHeader(int result) { | |
109 DCHECK_EQ(WriteState::HEADER, write_state_); | |
110 DCHECK_GE(result, 0); | |
111 | |
112 header_buffer_->DidConsume(result); | |
113 if (header_buffer_->BytesRemaining() > 0) { | |
114 return DoWrite(header_buffer_.get(), header_buffer_->BytesRemaining()); | |
115 } | |
116 | |
117 write_state_ = WriteState::PAYLOAD; | |
118 return net::OK; | |
119 } | |
120 | |
121 int StreamPacketWriter::DoWritePayload(int result) { | |
122 DCHECK_EQ(WriteState::PAYLOAD, write_state_); | |
123 DCHECK_GE(result, 0); | |
124 | |
125 payload_buffer_->DidConsume(result); | |
126 if (payload_buffer_->BytesRemaining() > 0) { | |
127 return DoWrite(payload_buffer_.get(), payload_buffer_->BytesRemaining()); | |
128 } | |
129 | |
130 write_state_ = WriteState::IDLE; | |
131 return net::OK; | |
132 } | |
133 | |
134 void StreamPacketWriter::OnWriteComplete(int result) { | |
135 DCHECK_NE(net::ERR_IO_PENDING, result); | |
136 | |
137 if (result == 0) { | |
138 // Convert EOF return value to ERR_CONNECTION_CLOSED. | |
139 result = net::ERR_CONNECTION_CLOSED; | |
140 } else if (result > 0) { | |
141 // Write was successful; get the next one started. | |
142 result = DoWriteLoop(result); | |
143 if (result == net::ERR_IO_PENDING) { | |
144 return; | |
145 } | |
146 } | |
147 | |
148 payload_buffer_ = nullptr; | |
149 base::ResetAndReturn(&callback_).Run(result); | |
150 } | |
151 | |
152 int StreamPacketWriter::DoWrite(net::IOBuffer* buf, int buf_len) { | |
153 int result = socket_->Write(buf, buf_len, | |
154 base::Bind(&StreamPacketWriter::OnWriteComplete, | |
155 weak_factory_.GetWeakPtr())); | |
156 return (result != 0 ? result : net::ERR_CONNECTION_CLOSED); | |
157 } | |
158 | |
159 } // namespace blimp | |
OLD | NEW |