Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: remoting/protocol/message_reader.cc

Issue 1655433002: Remove done notifications from incoming message handlers. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/message_reader.h" 5 #include "remoting/protocol/message_reader.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/thread_task_runner_handle.h" 13 #include "base/thread_task_runner_handle.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "net/base/io_buffer.h" 15 #include "net/base/io_buffer.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "remoting/base/compound_buffer.h" 17 #include "remoting/base/compound_buffer.h"
18 #include "remoting/proto/internal.pb.h" 18 #include "remoting/proto/internal.pb.h"
19 #include "remoting/protocol/p2p_stream_socket.h" 19 #include "remoting/protocol/p2p_stream_socket.h"
20 20
21 namespace remoting { 21 namespace remoting {
22 namespace protocol { 22 namespace protocol {
23 23
24 static const int kReadBufferSize = 4096; 24 static const int kReadBufferSize = 4096;
25 25
26 MessageReader::MessageReader() 26 MessageReader::MessageReader()
27 : socket_(nullptr), 27 : socket_(nullptr),
28 read_pending_(false), 28 read_pending_(false),
29 pending_messages_(0),
30 closed_(false), 29 closed_(false),
Jamie 2016/01/29 23:48:12 Optional: Since you're modifying this anyway, cons
Sergey Ulanov 2016/01/30 00:14:47 Done.
31 weak_factory_(this) { 30 weak_factory_(this) {}
32 }
33 31
34 MessageReader::~MessageReader() { 32 MessageReader::~MessageReader() {}
35 }
36 33
37 void MessageReader::SetMessageReceivedCallback( 34 void MessageReader::SetMessageReceivedCallback(
38 const MessageReceivedCallback& callback) { 35 const MessageReceivedCallback& callback) {
39 DCHECK(CalledOnValidThread()); 36 DCHECK(CalledOnValidThread());
40 message_received_callback_ = callback; 37 message_received_callback_ = callback;
41 } 38 }
42 39
43 void MessageReader::StartReading( 40 void MessageReader::StartReading(
44 P2PStreamSocket* socket, 41 P2PStreamSocket* socket,
45 const ReadFailedCallback& read_failed_callback) { 42 const ReadFailedCallback& read_failed_callback) {
46 DCHECK(CalledOnValidThread()); 43 DCHECK(CalledOnValidThread());
47 DCHECK(socket); 44 DCHECK(socket);
48 DCHECK(!read_failed_callback.is_null()); 45 DCHECK(!read_failed_callback.is_null());
49 46
50 socket_ = socket; 47 socket_ = socket;
51 read_failed_callback_ = read_failed_callback; 48 read_failed_callback_ = read_failed_callback;
52 DoRead(); 49 DoRead();
53 } 50 }
54 51
55 void MessageReader::DoRead() { 52 void MessageReader::DoRead() {
56 DCHECK(CalledOnValidThread()); 53 DCHECK(CalledOnValidThread());
57 // Don't try to read again if there is another read pending or we 54 // Don't try to read again if there is another read pending or we
58 // have messages that we haven't finished processing yet. 55 // have messages that we haven't finished processing yet.
59 bool read_succeeded = true; 56 bool read_succeeded = true;
60 while (read_succeeded && !closed_ && !read_pending_ && 57 while (read_succeeded && !closed_ && !read_pending_) {
61 pending_messages_ == 0) {
62 read_buffer_ = new net::IOBuffer(kReadBufferSize); 58 read_buffer_ = new net::IOBuffer(kReadBufferSize);
63 int result = socket_->Read( 59 int result = socket_->Read(
64 read_buffer_.get(), 60 read_buffer_.get(),
65 kReadBufferSize, 61 kReadBufferSize,
66 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr())); 62 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr()));
67 63
68 HandleReadResult(result, &read_succeeded); 64 HandleReadResult(result, &read_succeeded);
69 } 65 }
70 } 66 }
71 67
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { 105 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
110 DCHECK(CalledOnValidThread()); 106 DCHECK(CalledOnValidThread());
111 message_decoder_.AddData(data, data_size); 107 message_decoder_.AddData(data, data_size);
112 108
113 // Get list of all new messages first, and then call the callback 109 // Get list of all new messages first, and then call the callback
114 // for all of them. 110 // for all of them.
115 while (true) { 111 while (true) {
116 CompoundBuffer* buffer = message_decoder_.GetNextMessage(); 112 CompoundBuffer* buffer = message_decoder_.GetNextMessage();
117 if (!buffer) 113 if (!buffer)
118 break; 114 break;
119 pending_messages_++;
120 base::ThreadTaskRunnerHandle::Get()->PostTask( 115 base::ThreadTaskRunnerHandle::Get()->PostTask(
121 FROM_HERE, 116 FROM_HERE,
122 base::Bind(&MessageReader::RunCallback, 117 base::Bind(&MessageReader::RunCallback, weak_factory_.GetWeakPtr(),
123 weak_factory_.GetWeakPtr(),
124 base::Passed(make_scoped_ptr(buffer)))); 118 base::Passed(make_scoped_ptr(buffer))));
125 } 119 }
126 } 120 }
127 121
128 void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) { 122 void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) {
129 if (!message_received_callback_.is_null()){ 123 if (!message_received_callback_.is_null())
130 message_received_callback_.Run( 124 message_received_callback_.Run(std::move(message));
131 std::move(message),
132 base::Bind(&MessageReader::OnMessageDone, weak_factory_.GetWeakPtr()));
133 }
134 }
135
136 void MessageReader::OnMessageDone() {
137 DCHECK(CalledOnValidThread());
138 pending_messages_--;
139 DCHECK_GE(pending_messages_, 0);
140
141 // Start next read if necessary.
142 DoRead();
143 } 125 }
144 126
145 } // namespace protocol 127 } // namespace protocol
146 } // namespace remoting 128 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698