Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(265)

Side by Side Diff: remoting/protocol/message_reader.cc

Issue 2911893003: Deprecate NonThreadSafe in remoting in favor of SequenceChecker. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « remoting/protocol/message_reader.h ('k') | remoting/protocol/pseudotcp_adapter.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/message_reader.h" 5 #include "remoting/protocol/message_reader.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/threading/thread_task_runner_handle.h" 15 #include "base/threading/thread_task_runner_handle.h"
16 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h" 17 #include "net/base/net_errors.h"
18 #include "remoting/base/compound_buffer.h" 18 #include "remoting/base/compound_buffer.h"
19 #include "remoting/proto/internal.pb.h" 19 #include "remoting/proto/internal.pb.h"
20 #include "remoting/protocol/p2p_stream_socket.h" 20 #include "remoting/protocol/p2p_stream_socket.h"
21 21
22 namespace remoting { 22 namespace remoting {
23 namespace protocol { 23 namespace protocol {
24 24
25 static const int kReadBufferSize = 4096; 25 static const int kReadBufferSize = 4096;
26 26
27 MessageReader::MessageReader() : weak_factory_(this) {} 27 MessageReader::MessageReader() : weak_factory_(this) {}
28 MessageReader::~MessageReader() {} 28 MessageReader::~MessageReader() {
29 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
30 }
29 31
30 void MessageReader::StartReading( 32 void MessageReader::StartReading(
31 P2PStreamSocket* socket, 33 P2PStreamSocket* socket,
32 const MessageReceivedCallback& message_received_callback, 34 const MessageReceivedCallback& message_received_callback,
33 const ReadFailedCallback& read_failed_callback) { 35 const ReadFailedCallback& read_failed_callback) {
34 DCHECK(CalledOnValidThread()); 36 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
35 DCHECK(!socket_); 37 DCHECK(!socket_);
36 DCHECK(socket); 38 DCHECK(socket);
37 DCHECK(!message_received_callback.is_null()); 39 DCHECK(!message_received_callback.is_null());
38 DCHECK(!read_failed_callback.is_null()); 40 DCHECK(!read_failed_callback.is_null());
39 41
40 socket_ = socket; 42 socket_ = socket;
41 message_received_callback_ = message_received_callback; 43 message_received_callback_ = message_received_callback;
42 read_failed_callback_ = read_failed_callback; 44 read_failed_callback_ = read_failed_callback;
43 DoRead(); 45 DoRead();
44 } 46 }
45 47
46 void MessageReader::DoRead() { 48 void MessageReader::DoRead() {
47 DCHECK(CalledOnValidThread()); 49 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
48 // Don't try to read again if there is another read pending or we 50 // Don't try to read again if there is another read pending or we
49 // have messages that we haven't finished processing yet. 51 // have messages that we haven't finished processing yet.
50 bool read_succeeded = true; 52 bool read_succeeded = true;
51 while (read_succeeded && !closed_ && !read_pending_) { 53 while (read_succeeded && !closed_ && !read_pending_) {
52 read_buffer_ = new net::IOBuffer(kReadBufferSize); 54 read_buffer_ = new net::IOBuffer(kReadBufferSize);
53 int result = socket_->Read( 55 int result = socket_->Read(
54 read_buffer_.get(), 56 read_buffer_.get(),
55 kReadBufferSize, 57 kReadBufferSize,
56 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr())); 58 base::Bind(&MessageReader::OnRead, weak_factory_.GetWeakPtr()));
57 59
58 HandleReadResult(result, &read_succeeded); 60 HandleReadResult(result, &read_succeeded);
59 } 61 }
60 } 62 }
61 63
62 void MessageReader::OnRead(int result) { 64 void MessageReader::OnRead(int result) {
63 DCHECK(CalledOnValidThread()); 65 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
64 DCHECK(read_pending_); 66 DCHECK(read_pending_);
65 read_pending_ = false; 67 read_pending_ = false;
66 68
67 if (!closed_) { 69 if (!closed_) {
68 bool read_succeeded; 70 bool read_succeeded;
69 HandleReadResult(result, &read_succeeded); 71 HandleReadResult(result, &read_succeeded);
70 if (read_succeeded) 72 if (read_succeeded)
71 DoRead(); 73 DoRead();
72 } 74 }
73 } 75 }
74 76
75 void MessageReader::HandleReadResult(int result, bool* read_succeeded) { 77 void MessageReader::HandleReadResult(int result, bool* read_succeeded) {
76 DCHECK(CalledOnValidThread()); 78 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
77 if (closed_) 79 if (closed_)
78 return; 80 return;
79 81
80 *read_succeeded = true; 82 *read_succeeded = true;
81 83
82 if (result > 0) { 84 if (result > 0) {
83 OnDataReceived(read_buffer_.get(), result); 85 OnDataReceived(read_buffer_.get(), result);
84 *read_succeeded = true; 86 *read_succeeded = true;
85 } else if (result == net::ERR_IO_PENDING) { 87 } else if (result == net::ERR_IO_PENDING) {
86 read_pending_ = true; 88 read_pending_ = true;
87 } else { 89 } else {
88 // Stop reading after any error. 90 // Stop reading after any error.
89 closed_ = true; 91 closed_ = true;
90 *read_succeeded = false; 92 *read_succeeded = false;
91 93
92 LOG(ERROR) << "Read() returned error " << result; 94 LOG(ERROR) << "Read() returned error " << result;
93 read_failed_callback_.Run(result); 95 read_failed_callback_.Run(result);
94 } 96 }
95 } 97 }
96 98
97 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { 99 void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
98 DCHECK(CalledOnValidThread()); 100 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
99 message_decoder_.AddData(data, data_size); 101 message_decoder_.AddData(data, data_size);
100 102
101 // Get list of all new messages first, and then call the callback 103 // Get list of all new messages first, and then call the callback
102 // for all of them. 104 // for all of them.
103 while (true) { 105 while (true) {
104 CompoundBuffer* buffer = message_decoder_.GetNextMessage(); 106 CompoundBuffer* buffer = message_decoder_.GetNextMessage();
105 if (!buffer) 107 if (!buffer)
106 break; 108 break;
107 base::ThreadTaskRunnerHandle::Get()->PostTask( 109 base::ThreadTaskRunnerHandle::Get()->PostTask(
108 FROM_HERE, 110 FROM_HERE,
109 base::Bind(&MessageReader::RunCallback, weak_factory_.GetWeakPtr(), 111 base::Bind(&MessageReader::RunCallback, weak_factory_.GetWeakPtr(),
110 base::Passed(base::WrapUnique(buffer)))); 112 base::Passed(base::WrapUnique(buffer))));
111 } 113 }
112 } 114 }
113 115
114 void MessageReader::RunCallback(std::unique_ptr<CompoundBuffer> message) { 116 void MessageReader::RunCallback(std::unique_ptr<CompoundBuffer> message) {
115 if (!message_received_callback_.is_null()) { 117 if (!message_received_callback_.is_null()) {
116 message_received_callback_.Run(std::move(message)); 118 message_received_callback_.Run(std::move(message));
117 } 119 }
118 } 120 }
119 121
120 } // namespace protocol 122 } // namespace protocol
121 } // namespace remoting 123 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/message_reader.h ('k') | remoting/protocol/pseudotcp_adapter.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698