Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(318)

Side by Side Diff: remoting/protocol/message_reader.cc

Issue 1655433002: Remove done notifications from incoming message handlers. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « remoting/protocol/message_reader.h ('k') | remoting/protocol/message_reader_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/message_reader.h" 5 #include "remoting/protocol/message_reader.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/thread_task_runner_handle.h" 13 #include "base/thread_task_runner_handle.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "net/base/io_buffer.h" 15 #include "net/base/io_buffer.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "remoting/base/compound_buffer.h" 17 #include "remoting/base/compound_buffer.h"
18 #include "remoting/proto/internal.pb.h" 18 #include "remoting/proto/internal.pb.h"
19 #include "remoting/protocol/p2p_stream_socket.h" 19 #include "remoting/protocol/p2p_stream_socket.h"
20 20
21 namespace remoting { 21 namespace remoting {
22 namespace protocol { 22 namespace protocol {
23 23
24 static const int kReadBufferSize = 4096; 24 static const int kReadBufferSize = 4096;
25 25
26 MessageReader::MessageReader() 26 MessageReader::MessageReader() : weak_factory_(this) {}
27 : socket_(nullptr), 27 MessageReader::~MessageReader() {}
28 read_pending_(false),
29 pending_messages_(0),
30 closed_(false),
31 weak_factory_(this) {
32 }
33
34 MessageReader::~MessageReader() {
35 }
36 28
37 void MessageReader::SetMessageReceivedCallback( 29 void MessageReader::SetMessageReceivedCallback(
38 const MessageReceivedCallback& callback) { 30 const MessageReceivedCallback& callback) {
39 DCHECK(CalledOnValidThread()); 31 DCHECK(CalledOnValidThread());
40 message_received_callback_ = callback; 32 message_received_callback_ = callback;
41 } 33 }
42 34
43 void MessageReader::StartReading( 35 void MessageReader::StartReading(
44 P2PStreamSocket* socket, 36 P2PStreamSocket* socket,
45 const ReadFailedCallback& read_failed_callback) { 37 const ReadFailedCallback& read_failed_callback) {
46 DCHECK(CalledOnValidThread()); 38 DCHECK(CalledOnValidThread());
47 DCHECK(socket); 39 DCHECK(socket);
48 DCHECK(!read_failed_callback.is_null()); 40 DCHECK(!read_failed_callback.is_null());
49 41
50 socket_ = socket; 42 socket_ = socket;
51 read_failed_callback_ = read_failed_callback; 43 read_failed_callback_ = read_failed_callback;
52 DoRead(); 44 DoRead();
53 } 45 }
54 46
55 void MessageReader::DoRead() { 47 void MessageReader::DoRead() {
56 DCHECK(CalledOnValidThread()); 48 DCHECK(CalledOnValidThread());
57 // Don't try to read again if there is another read pending or we 49 // Don't try to read again if there is another read pending or we
58 // have messages that we haven't finished processing yet. 50 // have messages that we haven't finished processing yet.
59 bool read_succeeded = true; 51 bool read_succeeded = true;
60 while (read_succeeded && !closed_ && !read_pending_ && 52 while (read_succeeded && !closed_ && !read_pending_) {
61 pending_messages_ == 0) {
62 read_buffer_ = new net::IOBuffer(kReadBufferSize); 53 read_buffer_ = new net::IOBuffer(kReadBufferSize);
63 int result = socket_->Read( 54 int result = socket_->Read(
64 read_buffer_.get(), 55 read_buffer_.get(),
65 kReadBufferSize, 56 kReadBufferSize,
66 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr())); 57 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr()));
67 58
68 HandleReadResult(result, &read_succeeded); 59 HandleReadResult(result, &read_succeeded);
69 } 60 }
70 } 61 }
71 62
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { 100 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
110 DCHECK(CalledOnValidThread()); 101 DCHECK(CalledOnValidThread());
111 message_decoder_.AddData(data, data_size); 102 message_decoder_.AddData(data, data_size);
112 103
113 // Get list of all new messages first, and then call the callback 104 // Get list of all new messages first, and then call the callback
114 // for all of them. 105 // for all of them.
115 while (true) { 106 while (true) {
116 CompoundBuffer* buffer = message_decoder_.GetNextMessage(); 107 CompoundBuffer* buffer = message_decoder_.GetNextMessage();
117 if (!buffer) 108 if (!buffer)
118 break; 109 break;
119 pending_messages_++;
120 base::ThreadTaskRunnerHandle::Get()->PostTask( 110 base::ThreadTaskRunnerHandle::Get()->PostTask(
121 FROM_HERE, 111 FROM_HERE,
122 base::Bind(&MessageReader::RunCallback, 112 base::Bind(&MessageReader::RunCallback, weak_factory_.GetWeakPtr(),
123 weak_factory_.GetWeakPtr(),
124 base::Passed(make_scoped_ptr(buffer)))); 113 base::Passed(make_scoped_ptr(buffer))));
125 } 114 }
126 } 115 }
127 116
128 void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) { 117 void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) {
129 if (!message_received_callback_.is_null()){ 118 if (!message_received_callback_.is_null())
130 message_received_callback_.Run( 119 message_received_callback_.Run(std::move(message));
131 std::move(message),
132 base::Bind(&MessageReader::OnMessageDone, weak_factory_.GetWeakPtr()));
133 }
134 }
135
136 void MessageReader::OnMessageDone() {
137 DCHECK(CalledOnValidThread());
138 pending_messages_--;
139 DCHECK_GE(pending_messages_, 0);
140
141 // Start next read if necessary.
142 DoRead();
143 } 120 }
144 121
145 } // namespace protocol 122 } // namespace protocol
146 } // namespace remoting 123 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/message_reader.h ('k') | remoting/protocol/message_reader_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698