Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(60)

Side by Side Diff: mojo/services/network/public/cpp/web_socket_read_queue.cc

Issue 1148913002: Fix WebSocket{Read,Write}Queue. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "network/public/cpp/web_socket_read_queue.h" 5 #include "network/public/cpp/web_socket_read_queue.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "third_party/mojo/src/mojo/public/cpp/environment/logging.h"
8 9
9 namespace mojo { 10 namespace mojo {
10 11
11 struct WebSocketReadQueue::Operation { 12 struct WebSocketReadQueue::Operation {
12 uint32_t num_bytes_; 13 uint32_t num_bytes_;
13 base::Callback<void(const char*)> callback_; 14 base::Callback<void(const char*)> callback_;
14 }; 15 };
15 16
16 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) 17 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
17 : handle_(handle), is_waiting_(false) { 18 : handle_(handle), is_busy_(false), weak_factory_(this) {
18 } 19 }
19 20
20 WebSocketReadQueue::~WebSocketReadQueue() { 21 WebSocketReadQueue::~WebSocketReadQueue() {
21 } 22 }
22 23
23 void WebSocketReadQueue::Read(uint32_t num_bytes, 24 void WebSocketReadQueue::Read(uint32_t num_bytes,
24 base::Callback<void(const char*)> callback) { 25 base::Callback<void(const char*)> callback) {
25 Operation* op = new Operation; 26 Operation* op = new Operation;
26 op->num_bytes_ = num_bytes; 27 op->num_bytes_ = num_bytes;
27 op->callback_ = callback; 28 op->callback_ = callback;
28 queue_.push_back(op); 29 queue_.push_back(op);
29 30
30 if (!is_waiting_) 31 if (is_busy_)
31 TryToRead(); 32 return;
33
34 is_busy_ = true;
35 TryToRead();
32 } 36 }
33 37
34 void WebSocketReadQueue::TryToRead() { 38 void WebSocketReadQueue::TryToRead() {
35 Operation* op = queue_[0]; 39 MOJO_DCHECK(is_busy_);
jam 2015/05/20 22:02:09 why MOJO_DCHECK instead of just DCHECK? I see many
yzshen1 2015/05/20 22:52:56 Yeah, DCHECK is from base/ and if the client lib i
36 const void* buffer = NULL; 40 MOJO_DCHECK(!queue_.empty());
37 uint32_t bytes_read = op->num_bytes_; 41 do {
38 MojoResult result = BeginReadDataRaw( 42 Operation* op = queue_[0];
39 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); 43 const void* buffer = NULL;
40 if (result == MOJO_RESULT_SHOULD_WAIT) { 44 uint32_t bytes_read = op->num_bytes_;
41 EndReadDataRaw(handle_, bytes_read); 45 MojoResult result = BeginReadDataRaw(
42 Wait(); 46 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
43 return; 47 if (result == MOJO_RESULT_SHOULD_WAIT) {
44 } 48 Wait();
49 return;
50 }
45 51
46 // Ensure |op| is deleted, whether or not |this| goes away. 52 // Ensure |op| is deleted, whether or not |this| goes away.
47 scoped_ptr<Operation> op_deleter(op); 53 scoped_ptr<Operation> op_deleter(op);
48 queue_.weak_erase(queue_.begin()); 54 queue_.weak_erase(queue_.begin());
49 if (result != MOJO_RESULT_OK) 55
50 return; 56 // http://crbug.com/490193 This should run callback as well. May need to
51 DataPipeConsumerHandle handle = handle_; 57 // change the callback signature.
52 op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this| 58 if (result != MOJO_RESULT_OK)
53 EndReadDataRaw(handle, bytes_read); 59 return;
60
61 uint32_t num_bytes = op_deleter->num_bytes_;
62 MOJO_DCHECK(num_bytes <= bytes_read);
63 DataPipeConsumerHandle handle = handle_;
64
65 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
66
67 // This call may delete |this|. In that case, |self| will be invalidated.
68 // It may re-enter Read() too. Because |is_busy_| is true during the whole
69 // process, TryToRead() won't be re-entered.
70 op->callback_.Run(static_cast<const char*>(buffer));
71
72 EndReadDataRaw(handle, num_bytes);
73
74 if (!self)
75 return;
76 } while (!queue_.empty());
77 is_busy_ = false;
54 } 78 }
55 79
56 void WebSocketReadQueue::Wait() { 80 void WebSocketReadQueue::Wait() {
57 is_waiting_ = true; 81 MOJO_DCHECK(is_busy_);
58 handle_watcher_.Start( 82 handle_watcher_.Start(
59 handle_, 83 handle_,
60 MOJO_HANDLE_SIGNAL_READABLE, 84 MOJO_HANDLE_SIGNAL_READABLE,
61 MOJO_DEADLINE_INDEFINITE, 85 MOJO_DEADLINE_INDEFINITE,
62 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); 86 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
63 } 87 }
64 88
65 void WebSocketReadQueue::OnHandleReady(MojoResult result) { 89 void WebSocketReadQueue::OnHandleReady(MojoResult result) {
66 is_waiting_ = false; 90 MOJO_DCHECK(is_busy_);
67 TryToRead(); 91 TryToRead();
68 } 92 }
69 93
70 } // namespace mojo 94 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/public/cpp/web_socket_read_queue.h ('k') | mojo/services/network/public/cpp/web_socket_write_queue.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698