OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "network/public/cpp/web_socket_write_queue.h" | |
6 | |
7 #include <stdint.h> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/logging.h" | |
11 | |
12 namespace mojo { | |
13 | |
14 struct WebSocketWriteQueue::Operation { | |
15 uint32_t num_bytes_; | |
16 base::Callback<void(const char*)> callback_; | |
17 | |
18 const char* data_; | |
19 // Only initialized if the initial Write fails. This saves a copy in | |
20 // the common case. | |
21 std::vector<char> data_copy_; | |
22 }; | |
23 | |
24 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) | |
25 : handle_(handle), is_busy_(false), weak_factory_(this) { | |
26 } | |
27 | |
28 WebSocketWriteQueue::~WebSocketWriteQueue() { | |
29 } | |
30 | |
31 void WebSocketWriteQueue::Write(const char* data, | |
32 uint32_t num_bytes, | |
33 base::Callback<void(const char*)> callback) { | |
34 Operation* op = new Operation; | |
35 op->num_bytes_ = num_bytes; | |
36 op->callback_ = callback; | |
37 op->data_ = data; | |
38 queue_.push_back(op); | |
39 | |
40 if (!is_busy_) { | |
41 is_busy_ = true; | |
42 // This call may reset |is_busy_| to false. | |
43 TryToWrite(); | |
44 } | |
45 | |
46 if (is_busy_) { | |
47 // If we have to wait, make a local copy of the data so we know it will | |
48 // live until we need it. | |
49 op->data_copy_.resize(num_bytes); | |
50 memcpy(&op->data_copy_[0], data, num_bytes); | |
51 op->data_ = &op->data_copy_[0]; | |
52 } | |
53 } | |
54 | |
55 void WebSocketWriteQueue::TryToWrite() { | |
56 DCHECK(is_busy_); | |
57 DCHECK(!queue_.empty()); | |
58 do { | |
59 Operation* op = queue_[0]; | |
60 uint32_t bytes_written = op->num_bytes_; | |
61 MojoResult result = WriteDataRaw( | |
62 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); | |
63 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
64 Wait(); | |
65 return; | |
66 } | |
67 | |
68 // Ensure |op| is deleted, whether or not |this| goes away. | |
69 scoped_ptr<Operation> op_deleter(op); | |
70 queue_.weak_erase(queue_.begin()); | |
71 | |
72 // http://crbug.com/490193 This should run callback as well. May need to | |
73 // change the callback signature. | |
74 if (result != MOJO_RESULT_OK) | |
75 return; | |
76 | |
77 base::WeakPtr<WebSocketWriteQueue> self(weak_factory_.GetWeakPtr()); | |
78 | |
79 // This call may delete |this|. In that case, |self| will be invalidated. | |
80 // It may re-enter Write() too. Because |is_busy_| is true during the whole | |
81 // process, TryToWrite() won't be re-entered. | |
82 op->callback_.Run(op->data_); | |
83 | |
84 if (!self) | |
85 return; | |
86 } while (!queue_.empty()); | |
87 is_busy_ = false; | |
88 } | |
89 | |
90 void WebSocketWriteQueue::Wait() { | |
91 DCHECK(is_busy_); | |
92 handle_watcher_.Start(handle_, | |
93 MOJO_HANDLE_SIGNAL_WRITABLE, | |
94 MOJO_DEADLINE_INDEFINITE, | |
95 base::Bind(&WebSocketWriteQueue::OnHandleReady, | |
96 base::Unretained(this))); | |
97 } | |
98 | |
99 void WebSocketWriteQueue::OnHandleReady(MojoResult result) { | |
100 DCHECK(is_busy_); | |
101 TryToWrite(); | |
102 } | |
103 | |
104 } // namespace mojo | |
OLD | NEW |