Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(805)

Side by Side Diff: mojo/services/network/public/cpp/web_socket_write_queue.cc

Issue 1148913002: Fix WebSocket{Read,Write}Queue. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "network/public/cpp/web_socket_write_queue.h" 5 #include "network/public/cpp/web_socket_write_queue.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "third_party/mojo/src/mojo/public/cpp/environment/logging.h"
8 9
9 namespace mojo { 10 namespace mojo {
10 11
11 struct WebSocketWriteQueue::Operation { 12 struct WebSocketWriteQueue::Operation {
12 uint32_t num_bytes_; 13 uint32_t num_bytes_;
13 base::Callback<void(const char*)> callback_; 14 base::Callback<void(const char*)> callback_;
14 15
15 const char* data_; 16 const char* data_;
16 // Only initialized if the initial Write fails. This saves a copy in 17 // Only initialized if the initial Write fails. This saves a copy in
17 // the common case. 18 // the common case.
18 std::vector<char> data_copy_; 19 std::vector<char> data_copy_;
19 }; 20 };
20 21
21 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) 22 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
22 : handle_(handle), is_waiting_(false) { 23 : handle_(handle), is_busy_(false), destructed_(nullptr) {
23 } 24 }
24 25
25 WebSocketWriteQueue::~WebSocketWriteQueue() { 26 WebSocketWriteQueue::~WebSocketWriteQueue() {
27 if (destructed_)
28 *destructed_ = true;
26 } 29 }
27 30
28 void WebSocketWriteQueue::Write(const char* data, 31 void WebSocketWriteQueue::Write(const char* data,
29 uint32_t num_bytes, 32 uint32_t num_bytes,
30 base::Callback<void(const char*)> callback) { 33 base::Callback<void(const char*)> callback) {
31 Operation* op = new Operation; 34 Operation* op = new Operation;
32 op->num_bytes_ = num_bytes; 35 op->num_bytes_ = num_bytes;
33 op->callback_ = callback; 36 op->callback_ = callback;
34 op->data_ = data; 37 op->data_ = data;
35 queue_.push_back(op); 38 queue_.push_back(op);
36 39
37 MojoResult result = MOJO_RESULT_SHOULD_WAIT; 40 if (!is_busy_) {
38 if (!is_waiting_) 41 is_busy_ = true;
39 result = TryToWrite(); 42 // This call may reset |is_busy_| to false.
43 TryToWrite();
44 }
40 45
41 // If we have to wait, make a local copy of the data so we know it will 46 if (is_busy_) {
42 // live until we need it. 47 // If we have to wait, make a local copy of the data so we know it will
43 if (result == MOJO_RESULT_SHOULD_WAIT) { 48 // live until we need it.
44 op->data_copy_.resize(num_bytes); 49 op->data_copy_.resize(num_bytes);
45 memcpy(&op->data_copy_[0], data, num_bytes); 50 memcpy(&op->data_copy_[0], data, num_bytes);
46 op->data_ = &op->data_copy_[0]; 51 op->data_ = &op->data_copy_[0];
47 } 52 }
48 } 53 }
49 54
50 MojoResult WebSocketWriteQueue::TryToWrite() { 55 void WebSocketWriteQueue::TryToWrite() {
51 Operation* op = queue_[0]; 56 MOJO_DCHECK(is_busy_);
52 uint32_t bytes_written = op->num_bytes_; 57 MOJO_DCHECK(!queue_.empty());
53 MojoResult result = WriteDataRaw( 58 do {
54 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); 59 Operation* op = queue_[0];
55 if (result == MOJO_RESULT_SHOULD_WAIT) { 60 uint32_t bytes_written = op->num_bytes_;
56 Wait(); 61 MojoResult result = WriteDataRaw(
57 return result; 62 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
58 } 63 if (result == MOJO_RESULT_SHOULD_WAIT) {
64 Wait();
65 return;
66 }
59 67
60 // Ensure |op| is deleted, whether or not |this| goes away. 68 // Ensure |op| is deleted, whether or not |this| goes away.
61 scoped_ptr<Operation> op_deleter(op); 69 scoped_ptr<Operation> op_deleter(op);
62 queue_.weak_erase(queue_.begin()); 70 queue_.weak_erase(queue_.begin());
63 if (result != MOJO_RESULT_OK)
64 return result;
65 71
66 op->callback_.Run(op->data_); // may delete |this| 72 // http://crbug.com/490193 This should run callback as well. May need to
67 return result; 73 // change the callback signature.
74 if (result != MOJO_RESULT_OK)
75 return;
76
77 bool destructed = false;
78 destructed_ = &destructed;
79
80 // May delete |this|. In that case, |destructed| will be set to true.
81 // Because |is_busy_| is true during the whole process, even if Write() is
82 // called by the callback, TryToWrite() won't be re-entered. So we don't
83 // have to worry about |destructed_| being re-written by nested calls.
84 op->callback_.Run(op->data_);
85
86 destructed_ = nullptr;
87 if (destructed)
88 return;
89 } while (!queue_.empty());
90 is_busy_ = false;
68 } 91 }
69 92
70 void WebSocketWriteQueue::Wait() { 93 void WebSocketWriteQueue::Wait() {
71 is_waiting_ = true; 94 MOJO_DCHECK(is_busy_);
72 handle_watcher_.Start(handle_, 95 handle_watcher_.Start(handle_,
73 MOJO_HANDLE_SIGNAL_WRITABLE, 96 MOJO_HANDLE_SIGNAL_WRITABLE,
74 MOJO_DEADLINE_INDEFINITE, 97 MOJO_DEADLINE_INDEFINITE,
75 base::Bind(&WebSocketWriteQueue::OnHandleReady, 98 base::Bind(&WebSocketWriteQueue::OnHandleReady,
76 base::Unretained(this))); 99 base::Unretained(this)));
77 } 100 }
78 101
79 void WebSocketWriteQueue::OnHandleReady(MojoResult result) { 102 void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
80 is_waiting_ = false; 103 MOJO_DCHECK(is_busy_);
81 TryToWrite(); 104 TryToWrite();
82 } 105 }
83 106
84 } // namespace mojo 107 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698