Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: remoting/base/buffered_socket_writer.cc

Issue 1582583003: Fix BufferedSocketWriter to buffer everything before it starts writing. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/base/buffered_socket_writer.h" 5 #include "remoting/base/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/callback_helpers.h"
8 #include "base/stl_util.h" 9 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h" 10 #include "net/base/io_buffer.h"
10 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
11 #include "net/socket/socket.h" 12 #include "net/socket/socket.h"
12 13
13 namespace remoting { 14 namespace remoting {
14 15
15 namespace { 16 namespace {
16 17
17 int WriteNetSocket(net::Socket* socket, 18 int WriteNetSocket(net::Socket* socket,
(...skipping 14 matching lines...) Expand all
32 33
33 scoped_refptr<net::DrainableIOBuffer> data; 34 scoped_refptr<net::DrainableIOBuffer> data;
34 base::Closure done_task; 35 base::Closure done_task;
35 }; 36 };
36 37
37 // static 38 // static
38 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket( 39 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
39 net::Socket* socket, 40 net::Socket* socket,
40 const WriteFailedCallback& write_failed_callback) { 41 const WriteFailedCallback& write_failed_callback) {
41 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter()); 42 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
42 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback); 43 result->Start(base::Bind(&WriteNetSocket, socket), write_failed_callback);
43 return result; 44 return result;
44 } 45 }
45 46
46 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {} 47 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {}
47 48
48 BufferedSocketWriter::~BufferedSocketWriter() { 49 BufferedSocketWriter::~BufferedSocketWriter() {
49 STLDeleteElements(&queue_); 50 STLDeleteElements(&queue_);
50 } 51 }
51 52
52 void BufferedSocketWriter::Init( 53 void BufferedSocketWriter::Start(
53 const WriteCallback& write_callback, 54 const WriteCallback& write_callback,
54 const WriteFailedCallback& write_failed_callback) { 55 const WriteFailedCallback& write_failed_callback) {
55 write_callback_ = write_callback; 56 write_callback_ = write_callback;
56 write_failed_callback_ = write_failed_callback; 57 write_failed_callback_ = write_failed_callback;
58 DoWrite();
57 } 59 }
58 60
59 void BufferedSocketWriter::Write( 61 void BufferedSocketWriter::Write(
60 const scoped_refptr<net::IOBufferWithSize>& data, 62 const scoped_refptr<net::IOBufferWithSize>& data,
61 const base::Closure& done_task) { 63 const base::Closure& done_task) {
62 DCHECK(thread_checker_.CalledOnValidThread()); 64 DCHECK(thread_checker_.CalledOnValidThread());
63 DCHECK(data.get()); 65 DCHECK(data.get());
64 66
65 // Don't write after error. 67 // Don't write after error.
66 if (is_closed()) 68 if (closed_)
67 return; 69 return;
68 70
69 queue_.push_back(new PendingPacket( 71 queue_.push_back(new PendingPacket(
70 new net::DrainableIOBuffer(data.get(), data->size()), done_task)); 72 new net::DrainableIOBuffer(data.get(), data->size()), done_task));
71 73
72 DoWrite(); 74 DoWrite();
73 } 75 }
74 76
75 bool BufferedSocketWriter::is_closed() {
76 return write_callback_.is_null();
77 }
78
79 void BufferedSocketWriter::DoWrite() { 77 void BufferedSocketWriter::DoWrite() {
80 DCHECK(thread_checker_.CalledOnValidThread()); 78 DCHECK(thread_checker_.CalledOnValidThread());
81 79
82 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); 80 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
83 while (self && !write_pending_ && !is_closed() && !queue_.empty()) { 81 while (self && !write_pending_ && !write_callback_.is_null() &&
82 !queue_.empty()) {
84 int result = write_callback_.Run( 83 int result = write_callback_.Run(
85 queue_.front()->data.get(), queue_.front()->data->BytesRemaining(), 84 queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
86 base::Bind(&BufferedSocketWriter::OnWritten, 85 base::Bind(&BufferedSocketWriter::OnWritten,
87 weak_factory_.GetWeakPtr())); 86 weak_factory_.GetWeakPtr()));
88 HandleWriteResult(result); 87 HandleWriteResult(result);
89 } 88 }
90 } 89 }
91 90
92 void BufferedSocketWriter::HandleWriteResult(int result) { 91 void BufferedSocketWriter::HandleWriteResult(int result) {
93 if (result < 0) { 92 if (result < 0) {
94 if (result == net::ERR_IO_PENDING) { 93 if (result == net::ERR_IO_PENDING) {
95 write_pending_ = true; 94 write_pending_ = true;
96 } else { 95 } else {
96 closed_ = true;
97 write_callback_.Reset(); 97 write_callback_.Reset();
98 if (!write_failed_callback_.is_null()) { 98 if (!write_failed_callback_.is_null())
99 WriteFailedCallback callback = write_failed_callback_; 99 base::ResetAndReturn(&write_failed_callback_).Run(result);
100 callback.Run(result);
101 }
102 } 100 }
103 return; 101 return;
104 } 102 }
105 103
106 DCHECK(!queue_.empty()); 104 DCHECK(!queue_.empty());
107 105
108 queue_.front()->data->DidConsume(result); 106 queue_.front()->data->DidConsume(result);
109 107
110 if (queue_.front()->data->BytesRemaining() == 0) { 108 if (queue_.front()->data->BytesRemaining() == 0) {
111 base::Closure done_task = queue_.front()->done_task; 109 base::Closure done_task = queue_.front()->done_task;
(...skipping 10 matching lines...) Expand all
122 DCHECK(write_pending_); 120 DCHECK(write_pending_);
123 write_pending_ = false; 121 write_pending_ = false;
124 122
125 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); 123 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
126 HandleWriteResult(result); 124 HandleWriteResult(result);
127 if (self) 125 if (self)
128 DoWrite(); 126 DoWrite();
129 } 127 }
130 128
131 } // namespace remoting 129 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/base/buffered_socket_writer.h ('k') | remoting/base/buffered_socket_writer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698