OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/services/public/cpp/network/web_socket_write_queue.h" | |
6 | |
7 #include "base/bind.h" | |
8 | |
9 namespace mojo { | |
10 | |
11 struct WebSocketWriteQueue::Operation { | |
12 uint32_t num_bytes_; | |
13 base::Callback<void(const char*)> callback_; | |
14 | |
15 const char* data_; | |
16 // Only initialized if the initial Write fails. This saves a copy in | |
17 // the common case. | |
18 std::vector<char> data_copy_; | |
19 }; | |
20 | |
21 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) | |
22 : handle_(handle), is_waiting_(false) { | |
23 } | |
24 | |
25 WebSocketWriteQueue::~WebSocketWriteQueue() { | |
26 } | |
27 | |
28 void WebSocketWriteQueue::Write(const char* data, | |
29 uint32_t num_bytes, | |
30 base::Callback<void(const char*)> callback) { | |
31 Operation* op = new Operation; | |
32 op->num_bytes_ = num_bytes; | |
33 op->callback_ = callback; | |
34 op->data_ = data; | |
35 queue_.push_back(op); | |
36 | |
37 MojoResult result = MOJO_RESULT_SHOULD_WAIT; | |
38 if (!is_waiting_) | |
39 result = TryToWrite(); | |
40 | |
41 // If we have to wait, make a local copy of the data so we know it will | |
42 // live until we need it. | |
43 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
44 op->data_copy_.resize(num_bytes); | |
45 memcpy(&op->data_copy_[0], data, num_bytes); | |
46 op->data_ = &op->data_copy_[0]; | |
47 } | |
48 } | |
49 | |
50 MojoResult WebSocketWriteQueue::TryToWrite() { | |
51 Operation* op = queue_[0]; | |
52 uint32_t bytes_written = op->num_bytes_; | |
53 MojoResult result = WriteDataRaw( | |
54 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); | |
55 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
56 Wait(); | |
57 return result; | |
58 } | |
59 | |
60 // Ensure |op| is deleted, whether or not |this| goes away. | |
61 scoped_ptr<Operation> op_deleter(op); | |
62 queue_.weak_erase(queue_.begin()); | |
63 if (result != MOJO_RESULT_OK) | |
64 return result; | |
65 | |
66 op->callback_.Run(op->data_); // may delete |this| | |
67 return result; | |
68 } | |
69 | |
70 void WebSocketWriteQueue::Wait() { | |
71 is_waiting_ = true; | |
72 handle_watcher_.Start(handle_, | |
73 MOJO_HANDLE_SIGNAL_WRITABLE, | |
74 MOJO_DEADLINE_INDEFINITE, | |
75 base::Bind(&WebSocketWriteQueue::OnHandleReady, | |
76 base::Unretained(this))); | |
77 } | |
78 | |
79 void WebSocketWriteQueue::OnHandleReady(MojoResult result) { | |
80 is_waiting_ = false; | |
81 TryToWrite(); | |
82 } | |
83 | |
84 } // namespace mojo | |
OLD | NEW |