Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Side by Side Diff: mojo/services/network/web_socket_data_pipe_queue.cc

Issue 550003005: Mojo: WebSocket interface now reuses the DataPipe for subsequent sends or (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 6 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/network/web_socket_data_pipe_queue.h"
6
7 #include "base/bind.h"
8
9 namespace mojo {
10
11 struct WebSocketReadQueue::Operation {
12 uint32_t num_bytes_;
13 base::Callback<void(const char*)> callback_;
14 };
15
16 struct WebSocketWriteQueue::Operation {
17 uint32_t num_bytes_;
18 base::Callback<void(const char*)> callback_;
19
20 const char* data_;
21 // Only initialized if the initial Write fails. This saves a copy in
22 // the common case.
23 std::vector<char> data_copy_;
24 };
25
26 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
27 : handle_(handle), is_waiting_(false) {
28 }
29
30 WebSocketReadQueue::~WebSocketReadQueue() {
31 }
32
33 void WebSocketReadQueue::Read(uint32_t num_bytes,
34 base::Callback<void(const char*)> callback) {
35 Operation* op = new Operation;
36 op->num_bytes_ = num_bytes;
37 op->callback_ = callback;
38 queue_.push_back(op);
39
40 if (!is_waiting_)
41 TryToRead();
42 }
43
44 void WebSocketReadQueue::TryToRead() {
45 Operation* op = queue_[0];
46 const void* buffer = NULL;
47 uint32_t bytes_read = op->num_bytes_;
48 MojoResult result = BeginReadDataRaw(
49 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
50 if (result == MOJO_RESULT_SHOULD_WAIT) {
51 EndReadDataRaw(handle_, bytes_read);
52 Wait();
53 return;
54 }
55
56 // Ensure |op| is deleted, whether or not |this| goes away.
57 scoped_ptr<Operation> op_deleter(op);
58 queue_.weak_erase(queue_.begin());
59 if (result != MOJO_RESULT_OK)
60 return;
61 DataPipeConsumerHandle handle = handle_;
62 op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this|
63 EndReadDataRaw(handle, bytes_read);
64 }
65
66 void WebSocketReadQueue::Wait() {
67 is_waiting_ = true;
68 handle_watcher_.Start(
69 handle_,
70 MOJO_HANDLE_SIGNAL_READABLE,
71 MOJO_DEADLINE_INDEFINITE,
72 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
73 }
74
75 void WebSocketReadQueue::OnHandleReady(MojoResult result) {
76 is_waiting_ = false;
77 TryToRead();
78 }
79
80
81 WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
82 : handle_(handle), is_waiting_(false) {
83 }
84
85 WebSocketWriteQueue::~WebSocketWriteQueue() {
86 }
87
88 void WebSocketWriteQueue::Write(const char* data,
89 uint32_t num_bytes,
90 base::Callback<void(const char*)> callback) {
91 Operation* op = new Operation;
92 op->num_bytes_ = num_bytes;
93 op->callback_ = callback;
94 op->data_ = data;
95 queue_.push_back(op);
96
97 MojoResult result = MOJO_RESULT_SHOULD_WAIT;
98 if (!is_waiting_)
99 result = TryToWrite();
100
101 // If we have to wait, make a local copy of the data so we know it will
102 // live until we need it.
103 if (result == MOJO_RESULT_SHOULD_WAIT) {
104 op->data_copy_.resize(num_bytes);
105 memcpy(&op->data_copy_[0], data, num_bytes);
106 op->data_ = &op->data_copy_[0];
107 }
108 }
109
110 MojoResult WebSocketWriteQueue::TryToWrite() {
111 Operation* op = queue_[0];
112 uint32_t bytes_written = op->num_bytes_;
113 MojoResult result = WriteDataRaw(
114 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
115 if (result == MOJO_RESULT_SHOULD_WAIT) {
116 Wait();
117 return result;
118 }
119
120 // Ensure |op| is deleted, whether or not |this| goes away.
121 scoped_ptr<Operation> op_deleter(op);
122 queue_.weak_erase(queue_.begin());
123 if (result != MOJO_RESULT_OK)
124 return result;
125
126 op->callback_.Run(op->data_); // may delete |this|
127 return result;
128 }
129
130 void WebSocketWriteQueue::Wait() {
131 is_waiting_ = true;
132 handle_watcher_.Start(handle_,
133 MOJO_HANDLE_SIGNAL_WRITABLE,
134 MOJO_DEADLINE_INDEFINITE,
135 base::Bind(&WebSocketWriteQueue::OnHandleReady,
136 base::Unretained(this)));
137 }
138
139 void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
140 is_waiting_ = false;
141 TryToWrite();
142 }
143
144 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698