Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(572)

Side by Side Diff: runtime/bin/eventhandler_win.cc

Issue 1291163002: Join embeder threads on Windows. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_WINDOWS) 6 #if defined(TARGET_OS_WINDOWS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 #include "bin/eventhandler_win.h" 9 #include "bin/eventhandler_win.h"
10 10
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 114
115 Handle::Handle(intptr_t handle) 115 Handle::Handle(intptr_t handle)
116 : DescriptorInfoBase(handle), 116 : DescriptorInfoBase(handle),
117 handle_(reinterpret_cast<HANDLE>(handle)), 117 handle_(reinterpret_cast<HANDLE>(handle)),
118 completion_port_(INVALID_HANDLE_VALUE), 118 completion_port_(INVALID_HANDLE_VALUE),
119 event_handler_(NULL), 119 event_handler_(NULL),
120 data_ready_(NULL), 120 data_ready_(NULL),
121 pending_read_(NULL), 121 pending_read_(NULL),
122 pending_write_(NULL), 122 pending_write_(NULL),
123 last_error_(NOERROR), 123 last_error_(NOERROR),
124 flags_(0) { 124 flags_(0),
125 read_thread_id_(Thread::kInvalidThreadId),
126 read_thread_running_(false),
127 read_thread_finished_(false),
128 read_thread_monitor_(new Monitor()) {
Ivan Posva 2015/08/25 22:21:03 Can we share the monitor and the critical section?
125 InitializeCriticalSection(&cs_); 129 InitializeCriticalSection(&cs_);
126 } 130 }
127 131
128 132
129 Handle::~Handle() { 133 Handle::~Handle() {
134 delete read_thread_monitor_;
130 DeleteCriticalSection(&cs_); 135 DeleteCriticalSection(&cs_);
131 } 136 }
132 137
133 138
134 void Handle::Lock() { 139 void Handle::Lock() {
135 EnterCriticalSection(&cs_); 140 EnterCriticalSection(&cs_);
136 } 141 }
137 142
138 143
139 void Handle::Unlock() { 144 void Handle::Unlock() {
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
179 return pending_read_ != NULL; 184 return pending_read_ != NULL;
180 } 185 }
181 186
182 187
183 bool Handle::HasPendingWrite() { 188 bool Handle::HasPendingWrite() {
184 ScopedLock lock(this); 189 ScopedLock lock(this);
185 return pending_write_ != NULL; 190 return pending_write_ != NULL;
186 } 191 }
187 192
188 193
194 void Handle::NotifyReadThreadFinished() {
195 MonitorLocker ml(read_thread_monitor_);
196 read_thread_finished_ = true;
Ivan Posva 2015/08/25 22:21:03 Assertions?
zra 2015/08/26 05:24:39 Done.
197 ml.Notify();
198 }
199
200
201 void Handle::WaitForReadThreadFinished() {
202 // Join the Reader thread if there is one.
203 ThreadId to_join = Thread::kInvalidThreadId;
204 {
205 MonitorLocker ml(read_thread_monitor_);
206 if (read_thread_id_ != Thread::kInvalidThreadId) {
207 while (!read_thread_finished_) {
208 ml.Wait();
209 }
210 read_thread_finished_ = false;
211 read_thread_running_ = false;
212 to_join = read_thread_id_;
213 read_thread_id_ = Thread::kInvalidThreadId;
214 }
215 }
216 if (to_join != Thread::kInvalidThreadId) {
217 Thread::Join(to_join);
218 }
219 }
220
221
189 void Handle::ReadComplete(OverlappedBuffer* buffer) { 222 void Handle::ReadComplete(OverlappedBuffer* buffer) {
190 ScopedLock lock(this); 223 ScopedLock lock(this);
191 // Currently only one outstanding read at the time. 224 // Currently only one outstanding read at the time.
192 ASSERT(pending_read_ == buffer); 225 ASSERT(pending_read_ == buffer);
193 ASSERT(data_ready_ == NULL); 226 ASSERT(data_ready_ == NULL);
194 if (!IsClosing() && !buffer->IsEmpty()) { 227 if (!IsClosing() && !buffer->IsEmpty()) {
195 data_ready_ = pending_read_; 228 data_ready_ = pending_read_;
196 } else { 229 } else {
197 OverlappedBuffer::DisposeBuffer(buffer); 230 OverlappedBuffer::DisposeBuffer(buffer);
198 } 231 }
199 pending_read_ = NULL; 232 pending_read_ = NULL;
233 WaitForReadThreadFinished();
200 } 234 }
201 235
202 236
203 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { 237 void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
204 ReadComplete(buffer); 238 ReadComplete(buffer);
205 } 239 }
206 240
207 241
208 void Handle::WriteComplete(OverlappedBuffer* buffer) { 242 void Handle::WriteComplete(OverlappedBuffer* buffer) {
209 ScopedLock lock(this); 243 ScopedLock lock(this);
210 // Currently only one outstanding write at the time. 244 // Currently only one outstanding write at the time.
211 ASSERT(pending_write_ == buffer); 245 ASSERT(pending_write_ == buffer);
212 OverlappedBuffer::DisposeBuffer(buffer); 246 OverlappedBuffer::DisposeBuffer(buffer);
213 pending_write_ = NULL; 247 pending_write_ = NULL;
214 } 248 }
215 249
216 250
217 static void ReadFileThread(uword args) { 251 static void ReadFileThread(uword args) {
218 Handle* handle = reinterpret_cast<Handle*>(args); 252 Handle* handle = reinterpret_cast<Handle*>(args);
219 handle->ReadSyncCompleteAsync(); 253 handle->ReadSyncCompleteAsync();
220 } 254 }
221 255
222 256
257 void Handle::NotifyReadThreadStarted() {
258 MonitorLocker ml(read_thread_monitor_);
259 read_thread_id_ = Thread::GetCurrentThreadId();
260 read_thread_running_ = true;
261 ml.Notify();
262 }
263
264
265 void Handle::WaitForReadThreadStarted() {
266 MonitorLocker ml(read_thread_monitor_);
267 while (!read_thread_running_) {
268 ml.Wait();
269 }
270 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
271 }
272
273
223 void Handle::ReadSyncCompleteAsync() { 274 void Handle::ReadSyncCompleteAsync() {
275 NotifyReadThreadStarted();
224 ASSERT(pending_read_ != NULL); 276 ASSERT(pending_read_ != NULL);
225 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); 277 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
226 278
227 DWORD buffer_size = pending_read_->GetBufferSize(); 279 DWORD buffer_size = pending_read_->GetBufferSize();
228 if (GetFileType(handle_) == FILE_TYPE_CHAR) { 280 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
229 buffer_size = kStdOverlappedBufferSize; 281 buffer_size = kStdOverlappedBufferSize;
230 } 282 }
231 DWORD bytes_read = 0; 283 DWORD bytes_read = 0;
232 BOOL ok = ReadFile(handle_, 284 BOOL ok = ReadFile(handle_,
233 pending_read_->GetBufferStart(), 285 pending_read_->GetBufferStart(),
234 buffer_size, 286 buffer_size,
235 &bytes_read, 287 &bytes_read,
236 NULL); 288 NULL);
237 if (!ok) { 289 if (!ok) {
238 bytes_read = 0; 290 bytes_read = 0;
239 } 291 }
240 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); 292 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
241 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), 293 ok = PostQueuedCompletionStatus(event_handler_->completion_port(),
242 bytes_read, 294 bytes_read,
243 reinterpret_cast<ULONG_PTR>(this), 295 reinterpret_cast<ULONG_PTR>(this),
244 overlapped); 296 overlapped);
245 if (!ok) { 297 if (!ok) {
246 FATAL("PostQueuedCompletionStatus failed"); 298 FATAL("PostQueuedCompletionStatus failed");
247 } 299 }
300 NotifyReadThreadFinished();
248 } 301 }
249 302
250 303
251 bool Handle::IssueRead() { 304 bool Handle::IssueRead() {
252 ScopedLock lock(this); 305 ScopedLock lock(this);
253 ASSERT(type_ != kListenSocket); 306 ASSERT(type_ != kListenSocket);
254 ASSERT(pending_read_ == NULL); 307 ASSERT(pending_read_ == NULL);
255 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); 308 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
256 if (SupportsOverlappedIO()) { 309 if (SupportsOverlappedIO()) {
257 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 310 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
(...skipping 12 matching lines...) Expand all
270 HandleIssueError(); 323 HandleIssueError();
271 return false; 324 return false;
272 } else { 325 } else {
273 // Completing asynchronously through thread. 326 // Completing asynchronously through thread.
274 pending_read_ = buffer; 327 pending_read_ = buffer;
275 int result = Thread::Start(ReadFileThread, 328 int result = Thread::Start(ReadFileThread,
276 reinterpret_cast<uword>(this)); 329 reinterpret_cast<uword>(this));
277 if (result != 0) { 330 if (result != 0) {
278 FATAL1("Failed to start read file thread %d", result); 331 FATAL1("Failed to start read file thread %d", result);
279 } 332 }
333 WaitForReadThreadStarted();
280 return true; 334 return true;
281 } 335 }
282 } 336 }
283 337
284 338
285 bool Handle::IssueRecvFrom() { 339 bool Handle::IssueRecvFrom() {
286 return false; 340 return false;
287 } 341 }
288 342
289 343
(...skipping 380 matching lines...) Expand 10 before | Expand all | Expand 10 after
670 724
671 static void WriteFileThread(uword args) { 725 static void WriteFileThread(uword args) {
672 StdHandle* handle = reinterpret_cast<StdHandle*>(args); 726 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
673 handle->RunWriteLoop(); 727 handle->RunWriteLoop();
674 } 728 }
675 729
676 730
677 void StdHandle::RunWriteLoop() { 731 void StdHandle::RunWriteLoop() {
678 write_monitor_->Enter(); 732 write_monitor_->Enter();
679 write_thread_running_ = true; 733 write_thread_running_ = true;
734 thread_id_ = Thread::GetCurrentThreadId();
680 // Notify we have started. 735 // Notify we have started.
681 write_monitor_->Notify(); 736 write_monitor_->Notify();
682 737
683 while (write_thread_running_) { 738 while (write_thread_running_) {
684 write_monitor_->Wait(Monitor::kNoTimeout); 739 write_monitor_->Wait(Monitor::kNoTimeout);
685 if (pending_write_ != NULL) { 740 if (pending_write_ != NULL) {
686 // We woke up and had a pending write. Execute it. 741 // We woke up and had a pending write. Execute it.
687 WriteSyncCompleteAsync(); 742 WriteSyncCompleteAsync();
688 } 743 }
689 } 744 }
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
755 810
756 811
757 void StdHandle::DoClose() { 812 void StdHandle::DoClose() {
758 MonitorLocker locker(write_monitor_); 813 MonitorLocker locker(write_monitor_);
759 if (write_thread_exists_) { 814 if (write_thread_exists_) {
760 write_thread_running_ = false; 815 write_thread_running_ = false;
761 locker.Notify(); 816 locker.Notify();
762 while (write_thread_exists_) { 817 while (write_thread_exists_) {
763 locker.Wait(Monitor::kNoTimeout); 818 locker.Wait(Monitor::kNoTimeout);
764 } 819 }
820 Thread::Join(thread_id_);
765 } 821 }
766 Handle::DoClose(); 822 Handle::DoClose();
767 } 823 }
768 824
769 825
770 bool ClientSocket::LoadDisconnectEx() { 826 bool ClientSocket::LoadDisconnectEx() {
771 // Load the DisconnectEx function into memory using WSAIoctl. 827 // Load the DisconnectEx function into memory using WSAIoctl.
772 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; 828 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
773 DWORD bytes; 829 DWORD bytes;
774 int status = WSAIoctl(socket(), 830 int status = WSAIoctl(socket(),
(...skipping 496 matching lines...) Expand 10 before | Expand all | Expand 10 after
1271 HandleConnect(client_socket, bytes, buffer); 1327 HandleConnect(client_socket, bytes, buffer);
1272 break; 1328 break;
1273 } 1329 }
1274 default: 1330 default:
1275 UNREACHABLE(); 1331 UNREACHABLE();
1276 } 1332 }
1277 } 1333 }
1278 1334
1279 1335
1280 EventHandlerImplementation::EventHandlerImplementation() { 1336 EventHandlerImplementation::EventHandlerImplementation() {
1337 startup_monitor_ = new Monitor();
1338 handler_thread_id_ = Thread::kInvalidThreadId;
1281 completion_port_ = 1339 completion_port_ =
1282 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); 1340 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
1283 if (completion_port_ == NULL) { 1341 if (completion_port_ == NULL) {
1284 FATAL("Completion port creation failed"); 1342 FATAL("Completion port creation failed");
1285 } 1343 }
1286 shutdown_ = false; 1344 shutdown_ = false;
1287 } 1345 }
1288 1346
1289 1347
1290 EventHandlerImplementation::~EventHandlerImplementation() { 1348 EventHandlerImplementation::~EventHandlerImplementation() {
1349 Thread::Join(handler_thread_id_);
1350 delete startup_monitor_;
1291 CloseHandle(completion_port_); 1351 CloseHandle(completion_port_);
1292 } 1352 }
1293 1353
1294 1354
1295 int64_t EventHandlerImplementation::GetTimeout() { 1355 int64_t EventHandlerImplementation::GetTimeout() {
1296 if (!timeout_queue_.HasTimeout()) { 1356 if (!timeout_queue_.HasTimeout()) {
1297 return kInfinityTimeout; 1357 return kInfinityTimeout;
1298 } 1358 }
1299 int64_t millis = timeout_queue_.CurrentTimeout() - 1359 int64_t millis = timeout_queue_.CurrentTimeout() -
1300 TimerUtils::GetCurrentTimeMilliseconds(); 1360 TimerUtils::GetCurrentTimeMilliseconds();
(...skipping 14 matching lines...) Expand all
1315 FATAL("PostQueuedCompletionStatus failed"); 1375 FATAL("PostQueuedCompletionStatus failed");
1316 } 1376 }
1317 } 1377 }
1318 1378
1319 1379
1320 void EventHandlerImplementation::EventHandlerEntry(uword args) { 1380 void EventHandlerImplementation::EventHandlerEntry(uword args) {
1321 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 1381 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1322 EventHandlerImplementation* handler_impl = &handler->delegate_; 1382 EventHandlerImplementation* handler_impl = &handler->delegate_;
1323 ASSERT(handler_impl != NULL); 1383 ASSERT(handler_impl != NULL);
1324 1384
1385 {
1386 MonitorLocker ml(handler_impl->startup_monitor_);
1387 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1388 ml.Notify();
1389 }
1390
1325 while (!handler_impl->shutdown_) { 1391 while (!handler_impl->shutdown_) {
1326 DWORD bytes; 1392 DWORD bytes;
1327 ULONG_PTR key; 1393 ULONG_PTR key;
1328 OVERLAPPED* overlapped; 1394 OVERLAPPED* overlapped;
1329 int64_t millis = handler_impl->GetTimeout(); 1395 int64_t millis = handler_impl->GetTimeout();
1330 ASSERT(millis == kInfinityTimeout || millis >= 0); 1396 ASSERT(millis == kInfinityTimeout || millis >= 0);
1331 if (millis > kMaxInt32) millis = kMaxInt32; 1397 if (millis > kMaxInt32) millis = kMaxInt32;
1332 ASSERT(sizeof(int32_t) == sizeof(DWORD)); 1398 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1333 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), 1399 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(),
1334 &bytes, 1400 &bytes,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1375 } 1441 }
1376 1442
1377 1443
1378 void EventHandlerImplementation::Start(EventHandler* handler) { 1444 void EventHandlerImplementation::Start(EventHandler* handler) {
1379 int result = Thread::Start(EventHandlerEntry, 1445 int result = Thread::Start(EventHandlerEntry,
1380 reinterpret_cast<uword>(handler)); 1446 reinterpret_cast<uword>(handler));
1381 if (result != 0) { 1447 if (result != 0) {
1382 FATAL1("Failed to start event handler thread %d", result); 1448 FATAL1("Failed to start event handler thread %d", result);
1383 } 1449 }
1384 1450
1451 {
1452 MonitorLocker ml(startup_monitor_);
1453 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1454 ml.Wait();
1455 }
1456 }
1457
1385 // Initialize Winsock32 1458 // Initialize Winsock32
1386 if (!Socket::Initialize()) { 1459 if (!Socket::Initialize()) {
1387 FATAL("Failed to initialized Windows sockets"); 1460 FATAL("Failed to initialized Windows sockets");
1388 } 1461 }
1389 } 1462 }
1390 1463
1391 1464
1392 void EventHandlerImplementation::Shutdown() { 1465 void EventHandlerImplementation::Shutdown() {
1393 SendData(kShutdownId, 0, 0); 1466 SendData(kShutdownId, 0, 0);
1394 } 1467 }
1395 1468
1396 } // namespace bin 1469 } // namespace bin
1397 } // namespace dart 1470 } // namespace dart
1398 1471
1399 #endif // defined(TARGET_OS_WINDOWS) 1472 #endif // defined(TARGET_OS_WINDOWS)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698