OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/connection_handler.h" | 5 #include "google_apis/gcm/engine/connection_handler_impl.h" |
6 | 6 |
7 #include "base/message_loop/message_loop.h" | 7 #include "base/message_loop/message_loop.h" |
8 #include "google/protobuf/io/coded_stream.h" | 8 #include "google/protobuf/io/coded_stream.h" |
9 #include "google_apis/gcm/base/mcs_util.h" | 9 #include "google_apis/gcm/base/mcs_util.h" |
10 #include "google_apis/gcm/base/socket_stream.h" | 10 #include "google_apis/gcm/base/socket_stream.h" |
| 11 #include "google_apis/gcm/protocol/mcs.pb.h" |
11 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
12 #include "net/socket/stream_socket.h" | 13 #include "net/socket/stream_socket.h" |
13 | 14 |
14 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
15 | 16 |
16 namespace gcm { | 17 namespace gcm { |
17 | 18 |
18 namespace { | 19 namespace { |
19 | 20 |
20 // # of bytes a MCS version packet consumes. | 21 // # of bytes a MCS version packet consumes. |
21 const int kVersionPacketLen = 1; | 22 const int kVersionPacketLen = 1; |
22 // # of bytes a tag packet consumes. | 23 // # of bytes a tag packet consumes. |
23 const int kTagPacketLen = 1; | 24 const int kTagPacketLen = 1; |
24 // Max # of bytes a length packet consumes. | 25 // Max # of bytes a length packet consumes. |
25 const int kSizePacketLenMin = 1; | 26 const int kSizePacketLenMin = 1; |
26 const int kSizePacketLenMax = 2; | 27 const int kSizePacketLenMax = 2; |
27 | 28 |
28 // The current MCS protocol version. | 29 // The current MCS protocol version. |
| 30 // TODO(zea): bump to 41 once the server supports it. |
29 const int kMCSVersion = 38; | 31 const int kMCSVersion = 38; |
30 | 32 |
31 } // namespace | 33 } // namespace |
32 | 34 |
33 ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout) | 35 ConnectionHandlerImpl::ConnectionHandlerImpl( |
| 36 base::TimeDelta read_timeout, |
| 37 const ProtoReceivedCallback& read_callback, |
| 38 const ProtoSentCallback& write_callback, |
| 39 const ConnectionChangedCallback& connection_callback) |
34 : read_timeout_(read_timeout), | 40 : read_timeout_(read_timeout), |
35 handshake_complete_(false), | 41 handshake_complete_(false), |
36 message_tag_(0), | 42 message_tag_(0), |
37 message_size_(0), | 43 message_size_(0), |
| 44 read_callback_(read_callback), |
| 45 write_callback_(write_callback), |
| 46 connection_callback_(connection_callback), |
38 weak_ptr_factory_(this) { | 47 weak_ptr_factory_(this) { |
39 } | 48 } |
40 | 49 |
41 ConnectionHandler::~ConnectionHandler() { | 50 ConnectionHandlerImpl::~ConnectionHandlerImpl() { |
42 } | 51 } |
43 | 52 |
44 void ConnectionHandler::Init( | 53 void ConnectionHandlerImpl::Init( |
45 scoped_ptr<net::StreamSocket> socket, | 54 const mcs_proto::LoginRequest& login_request, |
46 const google::protobuf::MessageLite& login_request, | 55 scoped_ptr<net::StreamSocket> socket) { |
47 const ProtoReceivedCallback& read_callback, | 56 DCHECK(!read_callback_.is_null()); |
48 const ProtoSentCallback& write_callback, | 57 DCHECK(!write_callback_.is_null()); |
49 const ConnectionChangedCallback& connection_callback) { | 58 DCHECK(!connection_callback_.is_null()); |
50 DCHECK(!read_callback.is_null()); | |
51 DCHECK(!write_callback.is_null()); | |
52 DCHECK(!connection_callback.is_null()); | |
53 | 59 |
54 // Invalidate any previously outstanding reads. | 60 // Invalidate any previously outstanding reads. |
55 weak_ptr_factory_.InvalidateWeakPtrs(); | 61 weak_ptr_factory_.InvalidateWeakPtrs(); |
56 | 62 |
57 handshake_complete_ = false; | 63 handshake_complete_ = false; |
58 message_tag_ = 0; | 64 message_tag_ = 0; |
59 message_size_ = 0; | 65 message_size_ = 0; |
60 socket_ = socket.Pass(); | 66 socket_ = socket.Pass(); |
61 input_stream_.reset(new SocketInputStream(socket_.get())); | 67 input_stream_.reset(new SocketInputStream(socket_.get())); |
62 output_stream_.reset(new SocketOutputStream(socket_.get())); | 68 output_stream_.reset(new SocketOutputStream(socket_.get())); |
63 read_callback_ = read_callback; | |
64 write_callback_ = write_callback; | |
65 connection_callback_ = connection_callback; | |
66 | 69 |
67 Login(login_request); | 70 Login(login_request); |
68 } | 71 } |
69 | 72 |
70 bool ConnectionHandler::CanSendMessage() const { | 73 bool ConnectionHandlerImpl::CanSendMessage() const { |
71 return handshake_complete_ && output_stream_.get() && | 74 return handshake_complete_ && output_stream_.get() && |
72 output_stream_->GetState() == SocketOutputStream::EMPTY; | 75 output_stream_->GetState() == SocketOutputStream::EMPTY; |
73 } | 76 } |
74 | 77 |
75 void ConnectionHandler::SendMessage( | 78 void ConnectionHandlerImpl::SendMessage( |
76 const google::protobuf::MessageLite& message) { | 79 const google::protobuf::MessageLite& message) { |
77 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); | 80 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
78 DCHECK(handshake_complete_); | 81 DCHECK(handshake_complete_); |
79 | 82 |
80 { | 83 { |
81 CodedOutputStream coded_output_stream(output_stream_.get()); | 84 CodedOutputStream coded_output_stream(output_stream_.get()); |
82 DVLOG(1) << "Writing proto of size " << message.ByteSize(); | 85 DVLOG(1) << "Writing proto of size " << message.ByteSize(); |
83 int tag = GetMCSProtoTag(message); | 86 int tag = GetMCSProtoTag(message); |
84 DCHECK_NE(tag, -1); | 87 DCHECK_NE(tag, -1); |
85 coded_output_stream.WriteRaw(&tag, 1); | 88 coded_output_stream.WriteRaw(&tag, 1); |
86 coded_output_stream.WriteVarint32(message.ByteSize()); | 89 coded_output_stream.WriteVarint32(message.ByteSize()); |
87 message.SerializeToCodedStream(&coded_output_stream); | 90 message.SerializeToCodedStream(&coded_output_stream); |
88 } | 91 } |
89 | 92 |
90 if (output_stream_->Flush( | 93 if (output_stream_->Flush( |
91 base::Bind(&ConnectionHandler::OnMessageSent, | 94 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
92 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 95 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
93 OnMessageSent(); | 96 OnMessageSent(); |
94 } | 97 } |
95 } | 98 } |
96 | 99 |
97 void ConnectionHandler::Login( | 100 void ConnectionHandlerImpl::Login( |
98 const google::protobuf::MessageLite& login_request) { | 101 const google::protobuf::MessageLite& login_request) { |
99 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); | 102 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
100 | 103 |
101 const char version_byte[1] = {kMCSVersion}; | 104 const char version_byte[1] = {kMCSVersion}; |
102 const char login_request_tag[1] = {kLoginRequestTag}; | 105 const char login_request_tag[1] = {kLoginRequestTag}; |
103 { | 106 { |
104 CodedOutputStream coded_output_stream(output_stream_.get()); | 107 CodedOutputStream coded_output_stream(output_stream_.get()); |
105 coded_output_stream.WriteRaw(version_byte, 1); | 108 coded_output_stream.WriteRaw(version_byte, 1); |
106 coded_output_stream.WriteRaw(login_request_tag, 1); | 109 coded_output_stream.WriteRaw(login_request_tag, 1); |
107 coded_output_stream.WriteVarint32(login_request.ByteSize()); | 110 coded_output_stream.WriteVarint32(login_request.ByteSize()); |
108 login_request.SerializeToCodedStream(&coded_output_stream); | 111 login_request.SerializeToCodedStream(&coded_output_stream); |
109 } | 112 } |
110 | 113 |
111 if (output_stream_->Flush( | 114 if (output_stream_->Flush( |
112 base::Bind(&ConnectionHandler::OnMessageSent, | 115 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
113 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 116 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
114 base::MessageLoop::current()->PostTask( | 117 base::MessageLoop::current()->PostTask( |
115 FROM_HERE, | 118 FROM_HERE, |
116 base::Bind(&ConnectionHandler::OnMessageSent, | 119 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
117 weak_ptr_factory_.GetWeakPtr())); | 120 weak_ptr_factory_.GetWeakPtr())); |
118 } | 121 } |
119 | 122 |
120 read_timeout_timer_.Start(FROM_HERE, | 123 read_timeout_timer_.Start(FROM_HERE, |
121 read_timeout_, | 124 read_timeout_, |
122 base::Bind(&ConnectionHandler::OnTimeout, | 125 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
123 weak_ptr_factory_.GetWeakPtr())); | 126 weak_ptr_factory_.GetWeakPtr())); |
124 WaitForData(MCS_VERSION_TAG_AND_SIZE); | 127 WaitForData(MCS_VERSION_TAG_AND_SIZE); |
125 } | 128 } |
126 | 129 |
127 void ConnectionHandler::OnMessageSent() { | 130 void ConnectionHandlerImpl::OnMessageSent() { |
128 if (!output_stream_.get()) { | 131 if (!output_stream_.get()) { |
129 // The connection has already been closed. Just return. | 132 // The connection has already been closed. Just return. |
130 DCHECK(!input_stream_.get()); | 133 DCHECK(!input_stream_.get()); |
131 DCHECK(!read_timeout_timer_.IsRunning()); | 134 DCHECK(!read_timeout_timer_.IsRunning()); |
132 return; | 135 return; |
133 } | 136 } |
134 | 137 |
135 if (output_stream_->GetState() != SocketOutputStream::EMPTY) { | 138 if (output_stream_->GetState() != SocketOutputStream::EMPTY) { |
136 int last_error = output_stream_->last_error(); | 139 int last_error = output_stream_->last_error(); |
137 CloseConnection(); | 140 CloseConnection(); |
138 // If the socket stream had an error, plumb it up, else plumb up FAILED. | 141 // If the socket stream had an error, plumb it up, else plumb up FAILED. |
139 if (last_error == net::OK) | 142 if (last_error == net::OK) |
140 last_error = net::ERR_FAILED; | 143 last_error = net::ERR_FAILED; |
141 connection_callback_.Run(last_error); | 144 connection_callback_.Run(last_error); |
142 return; | 145 return; |
143 } | 146 } |
144 | 147 |
145 write_callback_.Run(); | 148 write_callback_.Run(); |
146 } | 149 } |
147 | 150 |
148 void ConnectionHandler::GetNextMessage() { | 151 void ConnectionHandlerImpl::GetNextMessage() { |
149 DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() || | 152 DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() || |
150 SocketInputStream::READY == input_stream_->GetState()); | 153 SocketInputStream::READY == input_stream_->GetState()); |
151 message_tag_ = 0; | 154 message_tag_ = 0; |
152 message_size_ = 0; | 155 message_size_ = 0; |
153 | 156 |
154 WaitForData(MCS_TAG_AND_SIZE); | 157 WaitForData(MCS_TAG_AND_SIZE); |
155 } | 158 } |
156 | 159 |
157 void ConnectionHandler::WaitForData(ProcessingState state) { | 160 void ConnectionHandlerImpl::WaitForData(ProcessingState state) { |
158 DVLOG(1) << "Waiting for MCS data: state == " << state; | 161 DVLOG(1) << "Waiting for MCS data: state == " << state; |
159 | 162 |
160 if (!input_stream_) { | 163 if (!input_stream_) { |
161 // The connection has already been closed. Just return. | 164 // The connection has already been closed. Just return. |
162 DCHECK(!output_stream_.get()); | 165 DCHECK(!output_stream_.get()); |
163 DCHECK(!read_timeout_timer_.IsRunning()); | 166 DCHECK(!read_timeout_timer_.IsRunning()); |
164 return; | 167 return; |
165 } | 168 } |
166 | 169 |
167 if (input_stream_->GetState() != SocketInputStream::EMPTY && | 170 if (input_stream_->GetState() != SocketInputStream::EMPTY && |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
203 max_bytes_needed = message_size_; | 206 max_bytes_needed = message_size_; |
204 break; | 207 break; |
205 default: | 208 default: |
206 NOTREACHED(); | 209 NOTREACHED(); |
207 } | 210 } |
208 DCHECK_GE(max_bytes_needed, min_bytes_needed); | 211 DCHECK_GE(max_bytes_needed, min_bytes_needed); |
209 | 212 |
210 int byte_count = input_stream_->UnreadByteCount(); | 213 int byte_count = input_stream_->UnreadByteCount(); |
211 if (min_bytes_needed - byte_count > 0 && | 214 if (min_bytes_needed - byte_count > 0 && |
212 input_stream_->Refresh( | 215 input_stream_->Refresh( |
213 base::Bind(&ConnectionHandler::WaitForData, | 216 base::Bind(&ConnectionHandlerImpl::WaitForData, |
214 weak_ptr_factory_.GetWeakPtr(), | 217 weak_ptr_factory_.GetWeakPtr(), |
215 state), | 218 state), |
216 max_bytes_needed - byte_count) == net::ERR_IO_PENDING) { | 219 max_bytes_needed - byte_count) == net::ERR_IO_PENDING) { |
217 return; | 220 return; |
218 } | 221 } |
219 | 222 |
220 // Check for refresh errors. | 223 // Check for refresh errors. |
221 if (input_stream_->GetState() != SocketInputStream::READY) { | 224 if (input_stream_->GetState() != SocketInputStream::READY) { |
222 // An error occurred. | 225 // An error occurred. |
223 int last_error = output_stream_->last_error(); | 226 int last_error = output_stream_->last_error(); |
(...skipping 18 matching lines...) Expand all Loading... |
242 OnGotMessageSize(); | 245 OnGotMessageSize(); |
243 break; | 246 break; |
244 case MCS_PROTO_BYTES: | 247 case MCS_PROTO_BYTES: |
245 OnGotMessageBytes(); | 248 OnGotMessageBytes(); |
246 break; | 249 break; |
247 default: | 250 default: |
248 NOTREACHED(); | 251 NOTREACHED(); |
249 } | 252 } |
250 } | 253 } |
251 | 254 |
252 void ConnectionHandler::OnGotVersion() { | 255 void ConnectionHandlerImpl::OnGotVersion() { |
253 uint8 version = 0; | 256 uint8 version = 0; |
254 { | 257 { |
255 CodedInputStream coded_input_stream(input_stream_.get()); | 258 CodedInputStream coded_input_stream(input_stream_.get()); |
256 coded_input_stream.ReadRaw(&version, 1); | 259 coded_input_stream.ReadRaw(&version, 1); |
257 } | 260 } |
258 if (version < kMCSVersion) { | 261 if (version < kMCSVersion) { |
259 LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version); | 262 LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version); |
260 connection_callback_.Run(net::ERR_FAILED); | 263 connection_callback_.Run(net::ERR_FAILED); |
261 return; | 264 return; |
262 } | 265 } |
263 | 266 |
264 input_stream_->RebuildBuffer(); | 267 input_stream_->RebuildBuffer(); |
265 | 268 |
266 // Process the LoginResponse message tag. | 269 // Process the LoginResponse message tag. |
267 OnGotMessageTag(); | 270 OnGotMessageTag(); |
268 } | 271 } |
269 | 272 |
270 void ConnectionHandler::OnGotMessageTag() { | 273 void ConnectionHandlerImpl::OnGotMessageTag() { |
271 if (input_stream_->GetState() != SocketInputStream::READY) { | 274 if (input_stream_->GetState() != SocketInputStream::READY) { |
272 LOG(ERROR) << "Failed to receive protobuf tag."; | 275 LOG(ERROR) << "Failed to receive protobuf tag."; |
273 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 276 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
274 return; | 277 return; |
275 } | 278 } |
276 | 279 |
277 { | 280 { |
278 CodedInputStream coded_input_stream(input_stream_.get()); | 281 CodedInputStream coded_input_stream(input_stream_.get()); |
279 coded_input_stream.ReadRaw(&message_tag_, 1); | 282 coded_input_stream.ReadRaw(&message_tag_, 1); |
280 } | 283 } |
281 | 284 |
282 DVLOG(1) << "Received proto of type " | 285 DVLOG(1) << "Received proto of type " |
283 << static_cast<unsigned int>(message_tag_); | 286 << static_cast<unsigned int>(message_tag_); |
284 | 287 |
285 if (!read_timeout_timer_.IsRunning()) { | 288 if (!read_timeout_timer_.IsRunning()) { |
286 read_timeout_timer_.Start(FROM_HERE, | 289 read_timeout_timer_.Start(FROM_HERE, |
287 read_timeout_, | 290 read_timeout_, |
288 base::Bind(&ConnectionHandler::OnTimeout, | 291 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
289 weak_ptr_factory_.GetWeakPtr())); | 292 weak_ptr_factory_.GetWeakPtr())); |
290 } | 293 } |
291 OnGotMessageSize(); | 294 OnGotMessageSize(); |
292 } | 295 } |
293 | 296 |
294 void ConnectionHandler::OnGotMessageSize() { | 297 void ConnectionHandlerImpl::OnGotMessageSize() { |
295 if (input_stream_->GetState() != SocketInputStream::READY) { | 298 if (input_stream_->GetState() != SocketInputStream::READY) { |
296 LOG(ERROR) << "Failed to receive message size."; | 299 LOG(ERROR) << "Failed to receive message size."; |
297 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 300 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
298 return; | 301 return; |
299 } | 302 } |
300 | 303 |
301 bool need_another_byte = false; | 304 bool need_another_byte = false; |
302 int prev_byte_count = input_stream_->ByteCount(); | 305 int prev_byte_count = input_stream_->ByteCount(); |
303 { | 306 { |
304 CodedInputStream coded_input_stream(input_stream_.get()); | 307 CodedInputStream coded_input_stream(input_stream_.get()); |
(...skipping 18 matching lines...) Expand all Loading... |
323 } | 326 } |
324 | 327 |
325 DVLOG(1) << "Proto size: " << message_size_; | 328 DVLOG(1) << "Proto size: " << message_size_; |
326 | 329 |
327 if (message_size_ > 0) | 330 if (message_size_ > 0) |
328 WaitForData(MCS_PROTO_BYTES); | 331 WaitForData(MCS_PROTO_BYTES); |
329 else | 332 else |
330 OnGotMessageBytes(); | 333 OnGotMessageBytes(); |
331 } | 334 } |
332 | 335 |
333 void ConnectionHandler::OnGotMessageBytes() { | 336 void ConnectionHandlerImpl::OnGotMessageBytes() { |
334 read_timeout_timer_.Stop(); | 337 read_timeout_timer_.Stop(); |
335 scoped_ptr<google::protobuf::MessageLite> protobuf( | 338 scoped_ptr<google::protobuf::MessageLite> protobuf( |
336 BuildProtobufFromTag(message_tag_)); | 339 BuildProtobufFromTag(message_tag_)); |
337 // Messages with no content are valid; just use the default protobuf for | 340 // Messages with no content are valid; just use the default protobuf for |
338 // that tag. | 341 // that tag. |
339 if (protobuf.get() && message_size_ == 0) { | 342 if (protobuf.get() && message_size_ == 0) { |
340 base::MessageLoop::current()->PostTask( | 343 base::MessageLoop::current()->PostTask( |
341 FROM_HERE, | 344 FROM_HERE, |
342 base::Bind(&ConnectionHandler::GetNextMessage, | 345 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
343 weak_ptr_factory_.GetWeakPtr())); | 346 weak_ptr_factory_.GetWeakPtr())); |
344 read_callback_.Run(protobuf.Pass()); | 347 read_callback_.Run(protobuf.Pass()); |
345 return; | 348 return; |
346 } | 349 } |
347 | 350 |
348 if (!protobuf.get() || | 351 if (!protobuf.get() || |
349 input_stream_->GetState() != SocketInputStream::READY) { | 352 input_stream_->GetState() != SocketInputStream::READY) { |
350 LOG(ERROR) << "Failed to extract protobuf bytes of type " | 353 LOG(ERROR) << "Failed to extract protobuf bytes of type " |
351 << static_cast<unsigned int>(message_tag_); | 354 << static_cast<unsigned int>(message_tag_); |
352 protobuf.reset(); // Return a null pointer to denote an error. | 355 protobuf.reset(); // Return a null pointer to denote an error. |
353 read_callback_.Run(protobuf.Pass()); | 356 read_callback_.Run(protobuf.Pass()); |
354 return; | 357 return; |
355 } | 358 } |
356 | 359 |
357 { | 360 { |
358 CodedInputStream coded_input_stream(input_stream_.get()); | 361 CodedInputStream coded_input_stream(input_stream_.get()); |
359 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { | 362 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { |
360 NOTREACHED() << "Unable to parse GCM message of type " | 363 NOTREACHED() << "Unable to parse GCM message of type " |
361 << static_cast<unsigned int>(message_tag_); | 364 << static_cast<unsigned int>(message_tag_); |
362 protobuf.reset(); // Return a null pointer to denote an error. | 365 protobuf.reset(); // Return a null pointer to denote an error. |
363 read_callback_.Run(protobuf.Pass()); | 366 read_callback_.Run(protobuf.Pass()); |
364 return; | 367 return; |
365 } | 368 } |
366 } | 369 } |
367 | 370 |
368 input_stream_->RebuildBuffer(); | 371 input_stream_->RebuildBuffer(); |
369 base::MessageLoop::current()->PostTask( | 372 base::MessageLoop::current()->PostTask( |
370 FROM_HERE, | 373 FROM_HERE, |
371 base::Bind(&ConnectionHandler::GetNextMessage, | 374 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
372 weak_ptr_factory_.GetWeakPtr())); | 375 weak_ptr_factory_.GetWeakPtr())); |
373 if (message_tag_ == kLoginResponseTag) { | 376 if (message_tag_ == kLoginResponseTag) { |
374 if (handshake_complete_) { | 377 if (handshake_complete_) { |
375 LOG(ERROR) << "Unexpected login response."; | 378 LOG(ERROR) << "Unexpected login response."; |
376 } else { | 379 } else { |
377 handshake_complete_ = true; | 380 handshake_complete_ = true; |
378 DVLOG(1) << "GCM Handshake complete."; | 381 DVLOG(1) << "GCM Handshake complete."; |
379 } | 382 } |
380 } | 383 } |
381 read_callback_.Run(protobuf.Pass()); | 384 read_callback_.Run(protobuf.Pass()); |
382 } | 385 } |
383 | 386 |
384 void ConnectionHandler::OnTimeout() { | 387 void ConnectionHandlerImpl::OnTimeout() { |
385 LOG(ERROR) << "Timed out waiting for GCM Protocol buffer."; | 388 LOG(ERROR) << "Timed out waiting for GCM Protocol buffer."; |
386 CloseConnection(); | 389 CloseConnection(); |
387 connection_callback_.Run(net::ERR_TIMED_OUT); | 390 connection_callback_.Run(net::ERR_TIMED_OUT); |
388 } | 391 } |
389 | 392 |
390 void ConnectionHandler::CloseConnection() { | 393 void ConnectionHandlerImpl::CloseConnection() { |
391 DVLOG(1) << "Closing connection."; | 394 DVLOG(1) << "Closing connection."; |
392 read_callback_.Reset(); | 395 read_callback_.Reset(); |
393 write_callback_.Reset(); | 396 write_callback_.Reset(); |
394 read_timeout_timer_.Stop(); | 397 read_timeout_timer_.Stop(); |
395 socket_->Disconnect(); | 398 socket_->Disconnect(); |
396 input_stream_.reset(); | 399 input_stream_.reset(); |
397 output_stream_.reset(); | 400 output_stream_.reset(); |
398 weak_ptr_factory_.InvalidateWeakPtrs(); | 401 weak_ptr_factory_.InvalidateWeakPtrs(); |
399 } | 402 } |
400 | 403 |
401 } // namespace gcm | 404 } // namespace gcm |
OLD | NEW |