Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: runtime/bin/eventhandler_win.cc

Issue 1291163002: Join embeder threads on Windows. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_WINDOWS) 6 #if defined(TARGET_OS_WINDOWS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 #include "bin/eventhandler_win.h" 9 #include "bin/eventhandler_win.h"
10 10
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 114
115 Handle::Handle(intptr_t handle) 115 Handle::Handle(intptr_t handle)
116 : DescriptorInfoBase(handle), 116 : DescriptorInfoBase(handle),
117 handle_(reinterpret_cast<HANDLE>(handle)), 117 handle_(reinterpret_cast<HANDLE>(handle)),
118 completion_port_(INVALID_HANDLE_VALUE), 118 completion_port_(INVALID_HANDLE_VALUE),
119 event_handler_(NULL), 119 event_handler_(NULL),
120 data_ready_(NULL), 120 data_ready_(NULL),
121 pending_read_(NULL), 121 pending_read_(NULL),
122 pending_write_(NULL), 122 pending_write_(NULL),
123 last_error_(NOERROR), 123 last_error_(NOERROR),
124 flags_(0) { 124 flags_(0),
125 InitializeCriticalSection(&cs_); 125 read_thread_id_(Thread::kInvalidThreadId),
126 read_thread_starting_(false),
127 read_thread_finished_(false),
128 monitor_(new Monitor()) {
126 } 129 }
127 130
128 131
129 Handle::~Handle() { 132 Handle::~Handle() {
130 DeleteCriticalSection(&cs_); 133 delete monitor_;
131 } 134 }
132 135
133 136
134 void Handle::Lock() {
135 EnterCriticalSection(&cs_);
136 }
137
138
139 void Handle::Unlock() {
140 LeaveCriticalSection(&cs_);
141 }
142
143
144 bool Handle::CreateCompletionPort(HANDLE completion_port) { 137 bool Handle::CreateCompletionPort(HANDLE completion_port) {
145 completion_port_ = CreateIoCompletionPort(handle(), 138 completion_port_ = CreateIoCompletionPort(handle(),
146 completion_port, 139 completion_port,
147 reinterpret_cast<ULONG_PTR>(this), 140 reinterpret_cast<ULONG_PTR>(this),
148 0); 141 0);
149 if (completion_port_ == NULL) { 142 if (completion_port_ == NULL) {
150 return false; 143 return false;
151 } 144 }
152 return true; 145 return true;
153 } 146 }
154 147
155 148
156 void Handle::Close() { 149 void Handle::Close() {
157 ScopedLock lock(this); 150 MonitorLocker ml(monitor_);
158 if (!IsClosing()) { 151 if (!IsClosing()) {
159 // Close the socket and set the closing state. This close method can be 152 // Close the socket and set the closing state. This close method can be
160 // called again if this socket has pending IO operations in flight. 153 // called again if this socket has pending IO operations in flight.
161 MarkClosing(); 154 MarkClosing();
162 // Perform handle type specific closing. 155 // Perform handle type specific closing.
163 DoClose(); 156 DoClose();
164 } 157 }
165 ASSERT(IsHandleClosed()); 158 ASSERT(IsHandleClosed());
166 } 159 }
167 160
168 161
169 void Handle::DoClose() { 162 void Handle::DoClose() {
170 if (!IsHandleClosed()) { 163 if (!IsHandleClosed()) {
171 CloseHandle(handle_); 164 CloseHandle(handle_);
172 handle_ = INVALID_HANDLE_VALUE; 165 handle_ = INVALID_HANDLE_VALUE;
173 } 166 }
174 } 167 }
175 168
176 169
177 bool Handle::HasPendingRead() { 170 bool Handle::HasPendingRead() {
178 ScopedLock lock(this); 171 MonitorLocker ml(monitor_);
179 return pending_read_ != NULL; 172 return pending_read_ != NULL;
180 } 173 }
181 174
182 175
183 bool Handle::HasPendingWrite() { 176 bool Handle::HasPendingWrite() {
184 ScopedLock lock(this); 177 MonitorLocker ml(monitor_);
185 return pending_write_ != NULL; 178 return pending_write_ != NULL;
186 } 179 }
187 180
188 181
189 void Handle::ReadComplete(OverlappedBuffer* buffer) { 182 void Handle::WaitForReadThreadStarted() {
190 ScopedLock lock(this); 183 MonitorLocker ml(monitor_);
191 // Currently only one outstanding read at the time. 184 while (read_thread_starting_) {
192 ASSERT(pending_read_ == buffer); 185 ml.Wait();
193 ASSERT(data_ready_ == NULL);
194 if (!IsClosing() && !buffer->IsEmpty()) {
195 data_ready_ = pending_read_;
196 } else {
197 OverlappedBuffer::DisposeBuffer(buffer);
198 } 186 }
199 pending_read_ = NULL;
200 } 187 }
201 188
202 189
190 void Handle::WaitForReadThreadFinished() {
191 // Join the Reader thread if there is one.
192 ThreadId to_join = Thread::kInvalidThreadId;
193 {
194 MonitorLocker ml(monitor_);
195 if (read_thread_id_ != Thread::kInvalidThreadId) {
196 while (!read_thread_finished_) {
197 ml.Wait();
198 }
199 read_thread_finished_ = false;
200 to_join = read_thread_id_;
201 read_thread_id_ = Thread::kInvalidThreadId;
202 }
203 }
204 if (to_join != Thread::kInvalidThreadId) {
205 Thread::Join(to_join);
206 }
207 }
208
209
210 void Handle::ReadComplete(OverlappedBuffer* buffer) {
211 WaitForReadThreadStarted();
212 {
213 MonitorLocker ml(monitor_);
214 // Currently only one outstanding read at the time.
215 ASSERT(pending_read_ == buffer);
216 ASSERT(data_ready_ == NULL);
217 if (!IsClosing() && !buffer->IsEmpty()) {
218 data_ready_ = pending_read_;
219 } else {
220 OverlappedBuffer::DisposeBuffer(buffer);
221 }
222 pending_read_ = NULL;
223 }
224 WaitForReadThreadFinished();
225 }
226
227
203 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { 228 void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
204 ReadComplete(buffer); 229 ReadComplete(buffer);
205 } 230 }
206 231
207 232
208 void Handle::WriteComplete(OverlappedBuffer* buffer) { 233 void Handle::WriteComplete(OverlappedBuffer* buffer) {
209 ScopedLock lock(this); 234 MonitorLocker ml(monitor_);
210 // Currently only one outstanding write at the time. 235 // Currently only one outstanding write at the time.
211 ASSERT(pending_write_ == buffer); 236 ASSERT(pending_write_ == buffer);
212 OverlappedBuffer::DisposeBuffer(buffer); 237 OverlappedBuffer::DisposeBuffer(buffer);
213 pending_write_ = NULL; 238 pending_write_ = NULL;
214 } 239 }
215 240
216 241
217 static void ReadFileThread(uword args) { 242 static void ReadFileThread(uword args) {
218 Handle* handle = reinterpret_cast<Handle*>(args); 243 Handle* handle = reinterpret_cast<Handle*>(args);
219 handle->ReadSyncCompleteAsync(); 244 handle->ReadSyncCompleteAsync();
220 } 245 }
221 246
222 247
248 void Handle::NotifyReadThreadStarted() {
249 MonitorLocker ml(monitor_);
250 ASSERT(read_thread_starting_);
251 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
252 read_thread_id_ = Thread::GetCurrentThreadId();
253 read_thread_starting_ = false;
254 ml.Notify();
255 }
256
257 void Handle::NotifyReadThreadFinished() {
258 MonitorLocker ml(monitor_);
259 ASSERT(!read_thread_finished_);
260 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
261 read_thread_finished_ = true;
262 ml.Notify();
263 }
264
265
223 void Handle::ReadSyncCompleteAsync() { 266 void Handle::ReadSyncCompleteAsync() {
267 NotifyReadThreadStarted();
224 ASSERT(pending_read_ != NULL); 268 ASSERT(pending_read_ != NULL);
225 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); 269 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
226 270
227 DWORD buffer_size = pending_read_->GetBufferSize(); 271 DWORD buffer_size = pending_read_->GetBufferSize();
228 if (GetFileType(handle_) == FILE_TYPE_CHAR) { 272 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
229 buffer_size = kStdOverlappedBufferSize; 273 buffer_size = kStdOverlappedBufferSize;
230 } 274 }
275 char* buffer_start = pending_read_->GetBufferStart();
231 DWORD bytes_read = 0; 276 DWORD bytes_read = 0;
232 BOOL ok = ReadFile(handle_, 277 BOOL ok = ReadFile(handle_,
233 pending_read_->GetBufferStart(), 278 buffer_start,
234 buffer_size, 279 buffer_size,
235 &bytes_read, 280 &bytes_read,
236 NULL); 281 NULL);
237 if (!ok) { 282 if (!ok) {
238 bytes_read = 0; 283 bytes_read = 0;
239 } 284 }
240 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); 285 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
241 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), 286 ok = PostQueuedCompletionStatus(event_handler_->completion_port(),
242 bytes_read, 287 bytes_read,
243 reinterpret_cast<ULONG_PTR>(this), 288 reinterpret_cast<ULONG_PTR>(this),
244 overlapped); 289 overlapped);
245 if (!ok) { 290 if (!ok) {
246 FATAL("PostQueuedCompletionStatus failed"); 291 FATAL("PostQueuedCompletionStatus failed");
247 } 292 }
293 NotifyReadThreadFinished();
248 } 294 }
249 295
250 296
251 bool Handle::IssueRead() { 297 bool Handle::IssueRead() {
252 ScopedLock lock(this);
253 ASSERT(type_ != kListenSocket); 298 ASSERT(type_ != kListenSocket);
254 ASSERT(pending_read_ == NULL); 299 ASSERT(pending_read_ == NULL);
255 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); 300 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
256 if (SupportsOverlappedIO()) { 301 if (SupportsOverlappedIO()) {
257 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 302 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
258 303
259 BOOL ok = ReadFile(handle_, 304 BOOL ok = ReadFile(handle_,
260 buffer->GetBufferStart(), 305 buffer->GetBufferStart(),
261 buffer->GetBufferSize(), 306 buffer->GetBufferSize(),
262 NULL, 307 NULL,
263 buffer->GetCleanOverlapped()); 308 buffer->GetCleanOverlapped());
264 if (ok || GetLastError() == ERROR_IO_PENDING) { 309 if (ok || GetLastError() == ERROR_IO_PENDING) {
265 // Completing asynchronously. 310 // Completing asynchronously.
266 pending_read_ = buffer; 311 pending_read_ = buffer;
267 return true; 312 return true;
268 } 313 }
269 OverlappedBuffer::DisposeBuffer(buffer); 314 OverlappedBuffer::DisposeBuffer(buffer);
270 HandleIssueError(); 315 HandleIssueError();
271 return false; 316 return false;
272 } else { 317 } else {
273 // Completing asynchronously through thread. 318 // Completing asynchronously through thread.
274 pending_read_ = buffer; 319 pending_read_ = buffer;
320 read_thread_starting_ = true;
275 int result = Thread::Start(ReadFileThread, 321 int result = Thread::Start(ReadFileThread,
276 reinterpret_cast<uword>(this)); 322 reinterpret_cast<uword>(this));
277 if (result != 0) { 323 if (result != 0) {
278 FATAL1("Failed to start read file thread %d", result); 324 FATAL1("Failed to start read file thread %d", result);
279 } 325 }
280 return true; 326 return true;
281 } 327 }
282 } 328 }
283 329
284 330
285 bool Handle::IssueRecvFrom() { 331 bool Handle::IssueRecvFrom() {
286 return false; 332 return false;
287 } 333 }
288 334
289 335
290 bool Handle::IssueWrite() { 336 bool Handle::IssueWrite() {
291 ScopedLock lock(this); 337 MonitorLocker ml(monitor_);
292 ASSERT(type_ != kListenSocket); 338 ASSERT(type_ != kListenSocket);
293 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 339 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
294 ASSERT(pending_write_ != NULL); 340 ASSERT(pending_write_ != NULL);
295 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); 341 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
296 342
297 OverlappedBuffer* buffer = pending_write_; 343 OverlappedBuffer* buffer = pending_write_;
298 BOOL ok = WriteFile(handle_, 344 BOOL ok = WriteFile(handle_,
299 buffer->GetBufferStart(), 345 buffer->GetBufferStart(),
300 buffer->GetBufferSize(), 346 buffer->GetBufferSize(),
301 NULL, 347 NULL,
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
338 if (error == ERROR_BROKEN_PIPE) { 384 if (error == ERROR_BROKEN_PIPE) {
339 HandleClosed(this); 385 HandleClosed(this);
340 } else { 386 } else {
341 HandleError(this); 387 HandleError(this);
342 } 388 }
343 SetLastError(error); 389 SetLastError(error);
344 } 390 }
345 391
346 392
347 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { 393 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
348 ScopedLock lock(this); 394 MonitorLocker ml(monitor_);
349 event_handler_ = event_handler; 395 event_handler_ = event_handler;
350 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { 396 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) {
351 CreateCompletionPort(event_handler_->completion_port()); 397 CreateCompletionPort(event_handler_->completion_port());
352 } 398 }
353 } 399 }
354 400
355 401
356 bool FileHandle::IsClosed() { 402 bool FileHandle::IsClosed() {
357 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); 403 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
358 } 404 }
359 405
360 406
361 void DirectoryWatchHandle::EnsureInitialized( 407 void DirectoryWatchHandle::EnsureInitialized(
362 EventHandlerImplementation* event_handler) { 408 EventHandlerImplementation* event_handler) {
363 ScopedLock lock(this); 409 MonitorLocker ml(monitor_);
364 event_handler_ = event_handler; 410 event_handler_ = event_handler;
365 if (completion_port_ == INVALID_HANDLE_VALUE) { 411 if (completion_port_ == INVALID_HANDLE_VALUE) {
366 CreateCompletionPort(event_handler_->completion_port()); 412 CreateCompletionPort(event_handler_->completion_port());
367 } 413 }
368 } 414 }
369 415
370 416
371 bool DirectoryWatchHandle::IsClosed() { 417 bool DirectoryWatchHandle::IsClosed() {
372 return IsClosing() && pending_read_ == NULL; 418 return IsClosing() && pending_read_ == NULL;
373 } 419 }
374 420
375 421
376 bool DirectoryWatchHandle::IssueRead() { 422 bool DirectoryWatchHandle::IssueRead() {
377 ScopedLock lock(this);
378 // It may have been started before, as we start the directory-handler when 423 // It may have been started before, as we start the directory-handler when
379 // we create it. 424 // we create it.
380 if (pending_read_ != NULL || data_ready_ != NULL) return true; 425 if (pending_read_ != NULL || data_ready_ != NULL) return true;
381 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); 426 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
382 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 427 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
383 BOOL ok = ReadDirectoryChangesW(handle_, 428 BOOL ok = ReadDirectoryChangesW(handle_,
384 buffer->GetBufferStart(), 429 buffer->GetBufferStart(),
385 buffer->GetBufferSize(), 430 buffer->GetBufferSize(),
386 recursive_, 431 recursive_,
387 events_, 432 events_,
388 NULL, 433 NULL,
389 buffer->GetCleanOverlapped(), 434 buffer->GetCleanOverlapped(),
390 NULL); 435 NULL);
391 if (ok || GetLastError() == ERROR_IO_PENDING) { 436 if (ok || GetLastError() == ERROR_IO_PENDING) {
392 // Completing asynchronously. 437 // Completing asynchronously.
393 pending_read_ = buffer; 438 pending_read_ = buffer;
394 return true; 439 return true;
395 } 440 }
396 OverlappedBuffer::DisposeBuffer(buffer); 441 OverlappedBuffer::DisposeBuffer(buffer);
397 return false; 442 return false;
398 } 443 }
399 444
400 445
401 void DirectoryWatchHandle::Stop() { 446 void DirectoryWatchHandle::Stop() {
402 ScopedLock lock(this); 447 MonitorLocker ml(monitor_);
403 // Stop the outstanding read, so we can close the handle. 448 // Stop the outstanding read, so we can close the handle.
404 449
405 if (pending_read_ != NULL) { 450 if (pending_read_ != NULL) {
406 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); 451 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
407 // Don't dispose of the buffer, as it will still complete (with length 0). 452 // Don't dispose of the buffer, as it will still complete (with length 0).
408 } 453 }
409 454
410 DoClose(); 455 DoClose();
411 } 456 }
412 457
(...skipping 23 matching lines...) Expand all
436 NULL, 481 NULL,
437 NULL); 482 NULL);
438 if (status == SOCKET_ERROR) { 483 if (status == SOCKET_ERROR) {
439 return false; 484 return false;
440 } 485 }
441 return true; 486 return true;
442 } 487 }
443 488
444 489
445 bool ListenSocket::IssueAccept() { 490 bool ListenSocket::IssueAccept() {
446 ScopedLock lock(this); 491 MonitorLocker ml(monitor_);
447 492
448 // For AcceptEx there needs to be buffer storage for address 493 // For AcceptEx there needs to be buffer storage for address
449 // information for two addresses (local and remote address). The 494 // information for two addresses (local and remote address). The
450 // AcceptEx documentation says: "This value must be at least 16 495 // AcceptEx documentation says: "This value must be at least 16
451 // bytes more than the maximum address length for the transport 496 // bytes more than the maximum address length for the transport
452 // protocol in use." 497 // protocol in use."
453 static const int kAcceptExAddressAdditionalBytes = 16; 498 static const int kAcceptExAddressAdditionalBytes = 16;
454 static const int kAcceptExAddressStorageSize = 499 static const int kAcceptExAddressStorageSize =
455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; 500 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
456 OverlappedBuffer* buffer = 501 OverlappedBuffer* buffer =
(...skipping 19 matching lines...) Expand all
476 } 521 }
477 522
478 pending_accept_count_++; 523 pending_accept_count_++;
479 524
480 return true; 525 return true;
481 } 526 }
482 527
483 528
484 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, 529 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
485 HANDLE completion_port) { 530 HANDLE completion_port) {
486 ScopedLock lock(this); 531 MonitorLocker ml(monitor_);
487 if (!IsClosing()) { 532 if (!IsClosing()) {
488 // Update the accepted socket to support the full range of API calls. 533 // Update the accepted socket to support the full range of API calls.
489 SOCKET s = socket(); 534 SOCKET s = socket();
490 int rc = setsockopt(buffer->client(), 535 int rc = setsockopt(buffer->client(),
491 SOL_SOCKET, 536 SOL_SOCKET,
492 SO_UPDATE_ACCEPT_CONTEXT, 537 SO_UPDATE_ACCEPT_CONTEXT,
493 reinterpret_cast<char*>(&s), sizeof(s)); 538 reinterpret_cast<char*>(&s), sizeof(s));
494 if (rc == NO_ERROR) { 539 if (rc == NO_ERROR) {
495 // Insert the accepted socket into the list. 540 // Insert the accepted socket into the list.
496 ClientSocket* client_socket = new ClientSocket(buffer->client()); 541 ClientSocket* client_socket = new ClientSocket(buffer->client());
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
537 client->Close(); 582 client->Close();
538 DeleteIfClosed(client); 583 DeleteIfClosed(client);
539 } else { 584 } else {
540 break; 585 break;
541 } 586 }
542 } 587 }
543 } 588 }
544 589
545 590
546 bool ListenSocket::CanAccept() { 591 bool ListenSocket::CanAccept() {
547 ScopedLock lock(this); 592 MonitorLocker ml(monitor_);
548 return accepted_head_ != NULL; 593 return accepted_head_ != NULL;
549 } 594 }
550 595
551 596
552 ClientSocket* ListenSocket::Accept() { 597 ClientSocket* ListenSocket::Accept() {
553 ScopedLock lock(this); 598 MonitorLocker ml(monitor_);
554 599
555 ClientSocket *result = NULL; 600 ClientSocket *result = NULL;
556 601
557 if (accepted_head_ != NULL) { 602 if (accepted_head_ != NULL) {
558 result = accepted_head_; 603 result = accepted_head_;
559 accepted_head_ = accepted_head_->next(); 604 accepted_head_ = accepted_head_->next();
560 if (accepted_head_ == NULL) accepted_tail_ = NULL; 605 if (accepted_head_ == NULL) accepted_tail_ = NULL;
561 result->set_next(NULL); 606 result->set_next(NULL);
562 accepted_count_--; 607 accepted_count_--;
563 } 608 }
564 609
565 if (pending_accept_count_ < 5) { 610 if (pending_accept_count_ < 5) {
566 // We have less than 5 pending accepts, queue another. 611 // We have less than 5 pending accepts, queue another.
567 if (!IsClosing()) { 612 if (!IsClosing()) {
568 if (!IssueAccept()) { 613 if (!IssueAccept()) {
569 HandleError(this); 614 HandleError(this);
570 } 615 }
571 } 616 }
572 } 617 }
573 618
574 return result; 619 return result;
575 } 620 }
576 621
577 622
578 void ListenSocket::EnsureInitialized( 623 void ListenSocket::EnsureInitialized(
579 EventHandlerImplementation* event_handler) { 624 EventHandlerImplementation* event_handler) {
580 ScopedLock lock(this); 625 MonitorLocker ml(monitor_);
581 if (AcceptEx_ == NULL) { 626 if (AcceptEx_ == NULL) {
582 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); 627 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
583 ASSERT(event_handler_ == NULL); 628 ASSERT(event_handler_ == NULL);
584 event_handler_ = event_handler; 629 event_handler_ = event_handler;
585 CreateCompletionPort(event_handler_->completion_port()); 630 CreateCompletionPort(event_handler_->completion_port());
586 LoadAcceptEx(); 631 LoadAcceptEx();
587 } 632 }
588 } 633 }
589 634
590 635
591 bool ListenSocket::IsClosed() { 636 bool ListenSocket::IsClosed() {
592 return IsClosing() && !HasPendingAccept(); 637 return IsClosing() && !HasPendingAccept();
593 } 638 }
594 639
595 640
596 intptr_t Handle::Available() { 641 intptr_t Handle::Available() {
597 ScopedLock lock(this); 642 MonitorLocker ml(monitor_);
598 if (data_ready_ == NULL) return 0; 643 if (data_ready_ == NULL) return 0;
599 ASSERT(!data_ready_->IsEmpty()); 644 ASSERT(!data_ready_->IsEmpty());
600 return data_ready_->GetRemainingLength(); 645 return data_ready_->GetRemainingLength();
601 } 646 }
602 647
603 648
604 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { 649 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
605 ScopedLock lock(this); 650 MonitorLocker ml(monitor_);
606 if (data_ready_ == NULL) return 0; 651 if (data_ready_ == NULL) return 0;
607 num_bytes = data_ready_->Read( 652 num_bytes = data_ready_->Read(
608 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); 653 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
609 if (data_ready_->IsEmpty()) { 654 if (data_ready_->IsEmpty()) {
610 OverlappedBuffer::DisposeBuffer(data_ready_); 655 OverlappedBuffer::DisposeBuffer(data_ready_);
611 data_ready_ = NULL; 656 data_ready_ = NULL;
612 if (!IsClosing() && !IsClosedRead()) IssueRead(); 657 if (!IsClosing() && !IsClosedRead()) IssueRead();
613 } 658 }
614 return num_bytes; 659 return num_bytes;
615 } 660 }
616 661
617 662
618 intptr_t Handle::RecvFrom( 663 intptr_t Handle::RecvFrom(
619 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { 664 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) {
620 ScopedLock lock(this); 665 MonitorLocker ml(monitor_);
621 if (data_ready_ == NULL) return 0; 666 if (data_ready_ == NULL) return 0;
622 num_bytes = data_ready_->Read( 667 num_bytes = data_ready_->Read(
623 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); 668 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
624 if (data_ready_->from()->sa_family == AF_INET) { 669 if (data_ready_->from()->sa_family == AF_INET) {
625 ASSERT(sa_len >= sizeof(struct sockaddr_in)); 670 ASSERT(sa_len >= sizeof(struct sockaddr_in));
626 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); 671 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
627 } else { 672 } else {
628 ASSERT(data_ready_->from()->sa_family == AF_INET6); 673 ASSERT(data_ready_->from()->sa_family == AF_INET6);
629 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); 674 ASSERT(sa_len >= sizeof(struct sockaddr_in6));
630 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); 675 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
631 } 676 }
632 // Always dispose of the buffer, as UDP messages must be read in their 677 // Always dispose of the buffer, as UDP messages must be read in their
633 // entirety to match how recvfrom works in a socket. 678 // entirety to match how recvfrom works in a socket.
634 OverlappedBuffer::DisposeBuffer(data_ready_); 679 OverlappedBuffer::DisposeBuffer(data_ready_);
635 data_ready_ = NULL; 680 data_ready_ = NULL;
636 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); 681 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom();
637 return num_bytes; 682 return num_bytes;
638 } 683 }
639 684
640 685
641 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { 686 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
642 ScopedLock lock(this); 687 MonitorLocker ml(monitor_);
643 if (pending_write_ != NULL) return 0; 688 if (pending_write_ != NULL) return 0;
644 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 689 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
645 ASSERT(SupportsOverlappedIO()); 690 ASSERT(SupportsOverlappedIO());
646 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; 691 if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
647 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); 692 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
648 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); 693 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
649 pending_write_->Write(buffer, truncated_bytes); 694 pending_write_->Write(buffer, truncated_bytes);
650 if (!IssueWrite()) return -1; 695 if (!IssueWrite()) return -1;
651 return truncated_bytes; 696 return truncated_bytes;
652 } 697 }
653 698
654 699
655 intptr_t Handle::SendTo(const void* buffer, 700 intptr_t Handle::SendTo(const void* buffer,
656 intptr_t num_bytes, 701 intptr_t num_bytes,
657 struct sockaddr* sa, 702 struct sockaddr* sa,
658 socklen_t sa_len) { 703 socklen_t sa_len) {
659 ScopedLock lock(this); 704 MonitorLocker ml(monitor_);
660 if (pending_write_ != NULL) return 0; 705 if (pending_write_ != NULL) return 0;
661 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 706 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
662 ASSERT(SupportsOverlappedIO()); 707 ASSERT(SupportsOverlappedIO());
663 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; 708 if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
664 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); 709 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
665 pending_write_->Write(buffer, num_bytes); 710 pending_write_->Write(buffer, num_bytes);
666 if (!IssueSendTo(sa, sa_len)) return -1; 711 if (!IssueSendTo(sa, sa_len)) return -1;
667 return num_bytes; 712 return num_bytes;
668 } 713 }
669 714
670 715
671 static void WriteFileThread(uword args) { 716 static void WriteFileThread(uword args) {
672 StdHandle* handle = reinterpret_cast<StdHandle*>(args); 717 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
673 handle->RunWriteLoop(); 718 handle->RunWriteLoop();
674 } 719 }
675 720
676 721
677 void StdHandle::RunWriteLoop() { 722 void StdHandle::RunWriteLoop() {
678 write_monitor_->Enter(); 723 MonitorLocker ml(monitor_);
679 write_thread_running_ = true; 724 write_thread_running_ = true;
725 thread_id_ = Thread::GetCurrentThreadId();
680 // Notify we have started. 726 // Notify we have started.
681 write_monitor_->Notify(); 727 ml.Notify();
682 728
683 while (write_thread_running_) { 729 while (write_thread_running_) {
684 write_monitor_->Wait(Monitor::kNoTimeout); 730 ml.Wait(Monitor::kNoTimeout);
685 if (pending_write_ != NULL) { 731 if (pending_write_ != NULL) {
686 // We woke up and had a pending write. Execute it. 732 // We woke up and had a pending write. Execute it.
687 WriteSyncCompleteAsync(); 733 WriteSyncCompleteAsync();
688 } 734 }
689 } 735 }
690 736
691 write_thread_exists_ = false; 737 write_thread_exists_ = false;
692 write_monitor_->Notify(); 738 ml.Notify();
693 write_monitor_->Exit();
694 } 739 }
695 740
696 741
697 void StdHandle::WriteSyncCompleteAsync() { 742 void StdHandle::WriteSyncCompleteAsync() {
698 ASSERT(pending_write_ != NULL); 743 ASSERT(pending_write_ != NULL);
699 744
700 DWORD bytes_written = -1; 745 DWORD bytes_written = -1;
701 BOOL ok = WriteFile(handle_, 746 BOOL ok = WriteFile(handle_,
702 pending_write_->GetBufferStart(), 747 pending_write_->GetBufferStart(),
703 pending_write_->GetBufferSize(), 748 pending_write_->GetBufferSize(),
704 &bytes_written, 749 &bytes_written,
705 NULL); 750 NULL);
706 if (!ok) { 751 if (!ok) {
707 bytes_written = 0; 752 bytes_written = 0;
708 } 753 }
709 thread_wrote_ += bytes_written; 754 thread_wrote_ += bytes_written;
710 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); 755 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
711 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), 756 ok = PostQueuedCompletionStatus(event_handler_->completion_port(),
712 bytes_written, 757 bytes_written,
713 reinterpret_cast<ULONG_PTR>(this), 758 reinterpret_cast<ULONG_PTR>(this),
714 overlapped); 759 overlapped);
715 if (!ok) { 760 if (!ok) {
716 FATAL("PostQueuedCompletionStatus failed"); 761 FATAL("PostQueuedCompletionStatus failed");
717 } 762 }
718 } 763 }
719 764
720 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { 765 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
721 ScopedLock lock(this); 766 MonitorLocker ml(monitor_);
722 if (pending_write_ != NULL) return 0; 767 if (pending_write_ != NULL) return 0;
723 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 768 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
724 // In the case of stdout and stderr, OverlappedIO is not supported. 769 // In the case of stdout and stderr, OverlappedIO is not supported.
725 // Here we'll instead use a thread, to make it async. 770 // Here we'll instead use a thread, to make it async.
726 // This code is actually never exposed to the user, as stdout and stderr is 771 // This code is actually never exposed to the user, as stdout and stderr is
727 // not available as a RawSocket, but only wrapped in a Socket. 772 // not available as a RawSocket, but only wrapped in a Socket.
728 // Note that we return '0', unless a thread have already completed a write. 773 // Note that we return '0', unless a thread have already completed a write.
729 MonitorLocker locker(write_monitor_);
730 if (thread_wrote_ > 0) { 774 if (thread_wrote_ > 0) {
731 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; 775 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_;
732 thread_wrote_ -= num_bytes; 776 thread_wrote_ -= num_bytes;
733 return num_bytes; 777 return num_bytes;
734 } 778 }
735 if (!write_thread_exists_) { 779 if (!write_thread_exists_) {
736 write_thread_exists_ = true; 780 write_thread_exists_ = true;
737 int result = Thread::Start(WriteFileThread, 781 int result = Thread::Start(
738 reinterpret_cast<uword>(this)); 782 WriteFileThread, reinterpret_cast<uword>(this));
739 if (result != 0) { 783 if (result != 0) {
740 FATAL1("Failed to start write file thread %d", result); 784 FATAL1("Failed to start write file thread %d", result);
741 } 785 }
742 while (!write_thread_running_) { 786 while (!write_thread_running_) {
743 // Wait until we the thread is running. 787 // Wait until we the thread is running.
744 locker.Wait(Monitor::kNoTimeout); 788 ml.Wait(Monitor::kNoTimeout);
745 } 789 }
746 } 790 }
747 // Only queue up to INT_MAX bytes. 791 // Only queue up to INT_MAX bytes.
748 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); 792 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
749 // Create buffer and notify thread about the new handle. 793 // Create buffer and notify thread about the new handle.
750 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); 794 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
751 pending_write_->Write(buffer, truncated_bytes); 795 pending_write_->Write(buffer, truncated_bytes);
752 locker.Notify(); 796 ml.Notify();
753 return 0; 797 return 0;
754 } 798 }
755 799
756 800
757 void StdHandle::DoClose() { 801 void StdHandle::DoClose() {
758 MonitorLocker locker(write_monitor_); 802 MonitorLocker ml(monitor_);
759 if (write_thread_exists_) { 803 if (write_thread_exists_) {
760 write_thread_running_ = false; 804 write_thread_running_ = false;
761 locker.Notify(); 805 ml.Notify();
762 while (write_thread_exists_) { 806 while (write_thread_exists_) {
763 locker.Wait(Monitor::kNoTimeout); 807 ml.Wait(Monitor::kNoTimeout);
764 } 808 }
809 Thread::Join(thread_id_);
765 } 810 }
766 Handle::DoClose(); 811 Handle::DoClose();
767 } 812 }
768 813
769 814
770 bool ClientSocket::LoadDisconnectEx() { 815 bool ClientSocket::LoadDisconnectEx() {
771 // Load the DisconnectEx function into memory using WSAIoctl. 816 // Load the DisconnectEx function into memory using WSAIoctl.
772 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; 817 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
773 DWORD bytes; 818 DWORD bytes;
774 int status = WSAIoctl(socket(), 819 int status = WSAIoctl(socket(),
(...skipping 25 matching lines...) Expand all
800 845
801 void ClientSocket::DoClose() { 846 void ClientSocket::DoClose() {
802 // Always do a shutdown before initiating a disconnect. 847 // Always do a shutdown before initiating a disconnect.
803 shutdown(socket(), SD_BOTH); 848 shutdown(socket(), SD_BOTH);
804 IssueDisconnect(); 849 IssueDisconnect();
805 handle_ = INVALID_HANDLE_VALUE; 850 handle_ = INVALID_HANDLE_VALUE;
806 } 851 }
807 852
808 853
809 bool ClientSocket::IssueRead() { 854 bool ClientSocket::IssueRead() {
810 ScopedLock lock(this); 855 MonitorLocker ml(monitor_);
811 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 856 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
812 ASSERT(pending_read_ == NULL); 857 ASSERT(pending_read_ == NULL);
813 858
814 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can 859 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
815 // handle 64k datagrams. 860 // handle 64k datagrams.
816 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); 861 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
817 862
818 DWORD flags; 863 DWORD flags;
819 flags = 0; 864 flags = 0;
820 int rc = WSARecv(socket(), 865 int rc = WSARecv(socket(),
821 buffer->GetWASBUF(), 866 buffer->GetWASBUF(),
822 1, 867 1,
823 NULL, 868 NULL,
824 &flags, 869 &flags,
825 buffer->GetCleanOverlapped(), 870 buffer->GetCleanOverlapped(),
826 NULL); 871 NULL);
827 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 872 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
828 pending_read_ = buffer; 873 pending_read_ = buffer;
829 return true; 874 return true;
830 } 875 }
831 OverlappedBuffer::DisposeBuffer(buffer); 876 OverlappedBuffer::DisposeBuffer(buffer);
832 pending_read_ = NULL; 877 pending_read_ = NULL;
833 HandleIssueError(); 878 HandleIssueError();
834 return false; 879 return false;
835 } 880 }
836 881
837 882
838 bool ClientSocket::IssueWrite() { 883 bool ClientSocket::IssueWrite() {
839 ScopedLock lock(this); 884 MonitorLocker ml(monitor_);
840 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 885 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
841 ASSERT(pending_write_ != NULL); 886 ASSERT(pending_write_ != NULL);
842 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); 887 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
843 888
844 int rc = WSASend(socket(), 889 int rc = WSASend(socket(),
845 pending_write_->GetWASBUF(), 890 pending_write_->GetWASBUF(),
846 1, 891 1,
847 NULL, 892 NULL,
848 0, 893 0,
849 pending_write_->GetCleanOverlapped(), 894 pending_write_->GetCleanOverlapped(),
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
893 } 938 }
894 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { 939 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
895 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); 940 Dart_Port port = NextNotifyDartPort(1 << kOutEvent);
896 DartUtils::PostInt32(port, 1 << kOutEvent); 941 DartUtils::PostInt32(port, 1 << kOutEvent);
897 } 942 }
898 } 943 }
899 944
900 945
901 void ClientSocket::EnsureInitialized( 946 void ClientSocket::EnsureInitialized(
902 EventHandlerImplementation* event_handler) { 947 EventHandlerImplementation* event_handler) {
903 ScopedLock lock(this); 948 MonitorLocker ml(monitor_);
904 if (completion_port_ == INVALID_HANDLE_VALUE) { 949 if (completion_port_ == INVALID_HANDLE_VALUE) {
905 ASSERT(event_handler_ == NULL); 950 ASSERT(event_handler_ == NULL);
906 event_handler_ = event_handler; 951 event_handler_ = event_handler;
907 CreateCompletionPort(event_handler_->completion_port()); 952 CreateCompletionPort(event_handler_->completion_port());
908 } 953 }
909 } 954 }
910 955
911 956
912 bool ClientSocket::IsClosed() { 957 bool ClientSocket::IsClosed() {
913 return connected_ && closed_; 958 return connected_ && closed_;
914 } 959 }
915 960
916 961
917 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { 962 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
918 ScopedLock lock(this); 963 MonitorLocker ml(monitor_);
919 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 964 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
920 ASSERT(pending_write_ != NULL); 965 ASSERT(pending_write_ != NULL);
921 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); 966 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
922 967
923 int rc = WSASendTo(socket(), 968 int rc = WSASendTo(socket(),
924 pending_write_->GetWASBUF(), 969 pending_write_->GetWASBUF(),
925 1, 970 1,
926 NULL, 971 NULL,
927 0, 972 0,
928 sa, 973 sa,
929 sa_len, 974 sa_len,
930 pending_write_->GetCleanOverlapped(), 975 pending_write_->GetCleanOverlapped(),
931 NULL); 976 NULL);
932 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 977 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
933 return true; 978 return true;
934 } 979 }
935 OverlappedBuffer::DisposeBuffer(pending_write_); 980 OverlappedBuffer::DisposeBuffer(pending_write_);
936 pending_write_ = NULL; 981 pending_write_ = NULL;
937 HandleIssueError(); 982 HandleIssueError();
938 return false; 983 return false;
939 } 984 }
940 985
941 986
942 bool DatagramSocket::IssueRecvFrom() { 987 bool DatagramSocket::IssueRecvFrom() {
943 ScopedLock lock(this); 988 MonitorLocker ml(monitor_);
944 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 989 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
945 ASSERT(pending_read_ == NULL); 990 ASSERT(pending_read_ == NULL);
946 991
947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); 992 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024);
948 993
949 DWORD flags; 994 DWORD flags;
950 flags = 0; 995 flags = 0;
951 int rc = WSARecvFrom(socket(), 996 int rc = WSARecvFrom(socket(),
952 buffer->GetWASBUF(), 997 buffer->GetWASBUF(),
953 1, 998 1,
954 NULL, 999 NULL,
955 &flags, 1000 &flags,
956 buffer->from(), 1001 buffer->from(),
957 buffer->from_len_addr(), 1002 buffer->from_len_addr(),
958 buffer->GetCleanOverlapped(), 1003 buffer->GetCleanOverlapped(),
959 NULL); 1004 NULL);
960 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 1005 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
961 pending_read_ = buffer; 1006 pending_read_ = buffer;
962 return true; 1007 return true;
963 } 1008 }
964 OverlappedBuffer::DisposeBuffer(buffer); 1009 OverlappedBuffer::DisposeBuffer(buffer);
965 pending_read_ = NULL; 1010 pending_read_ = NULL;
966 HandleIssueError(); 1011 HandleIssueError();
967 return false; 1012 return false;
968 } 1013 }
969 1014
970 1015
971 void DatagramSocket::EnsureInitialized( 1016 void DatagramSocket::EnsureInitialized(
972 EventHandlerImplementation* event_handler) { 1017 EventHandlerImplementation* event_handler) {
973 ScopedLock lock(this); 1018 MonitorLocker ml(monitor_);
974 if (completion_port_ == INVALID_HANDLE_VALUE) { 1019 if (completion_port_ == INVALID_HANDLE_VALUE) {
975 ASSERT(event_handler_ == NULL); 1020 ASSERT(event_handler_ == NULL);
976 event_handler_ = event_handler; 1021 event_handler_ = event_handler;
977 CreateCompletionPort(event_handler_->completion_port()); 1022 CreateCompletionPort(event_handler_->completion_port());
978 } 1023 }
979 } 1024 }
980 1025
981 1026
982 bool DatagramSocket::IsClosed() { 1027 bool DatagramSocket::IsClosed() {
983 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); 1028 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
(...skipping 19 matching lines...) Expand all
1003 shutdown_ = true; 1048 shutdown_ = true;
1004 } else { 1049 } else {
1005 Handle* handle = reinterpret_cast<Handle*>(msg->id); 1050 Handle* handle = reinterpret_cast<Handle*>(msg->id);
1006 ASSERT(handle != NULL); 1051 ASSERT(handle != NULL);
1007 1052
1008 if (handle->is_listen_socket()) { 1053 if (handle->is_listen_socket()) {
1009 ListenSocket* listen_socket = 1054 ListenSocket* listen_socket =
1010 reinterpret_cast<ListenSocket*>(handle); 1055 reinterpret_cast<ListenSocket*>(handle);
1011 listen_socket->EnsureInitialized(this); 1056 listen_socket->EnsureInitialized(this);
1012 1057
1013 Handle::ScopedLock lock(listen_socket); 1058 MonitorLocker ml(listen_socket->monitor_);
1014 1059
1015 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { 1060 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1016 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); 1061 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1017 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { 1062 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1018 // `events` can only have kInEvent/kOutEvent flags set. 1063 // `events` can only have kInEvent/kOutEvent flags set.
1019 intptr_t events = msg->data & EVENT_MASK; 1064 intptr_t events = msg->data & EVENT_MASK;
1020 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); 1065 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1021 listen_socket->SetPortAndMask(msg->dart_port, events); 1066 listen_socket->SetPortAndMask(msg->dart_port, events);
1022 TryDispatchingPendingAccepts(listen_socket); 1067 TryDispatchingPendingAccepts(listen_socket);
1023 } else if (IS_COMMAND(msg->data, kCloseCommand)) { 1068 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1024 listen_socket->RemovePort(msg->dart_port); 1069 listen_socket->RemovePort(msg->dart_port);
1025 1070
1026 // We only close the socket file descriptor from the operating 1071 // We only close the socket file descriptor from the operating
1027 // system if there are no other dart socket objects which 1072 // system if there are no other dart socket objects which
1028 // are listening on the same (address, port) combination. 1073 // are listening on the same (address, port) combination.
1029 ListeningSocketRegistry *registry = 1074 ListeningSocketRegistry *registry =
1030 ListeningSocketRegistry::Instance(); 1075 ListeningSocketRegistry::Instance();
1031 MutexLocker locker(registry->mutex()); 1076 MutexLocker locker(registry->mutex());
1032 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { 1077 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) {
1033 ASSERT(listen_socket->Mask() == 0); 1078 ASSERT(listen_socket->Mask() == 0);
1034 listen_socket->Close(); 1079 listen_socket->Close();
1035 } 1080 }
1036 1081
1037 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); 1082 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1038 } else { 1083 } else {
1039 UNREACHABLE(); 1084 UNREACHABLE();
1040 } 1085 }
1041 } else { 1086 } else {
1042 handle->EnsureInitialized(this); 1087 handle->EnsureInitialized(this);
1043 Handle::ScopedLock lock(handle); 1088 MonitorLocker ml(handle->monitor_);
1044 1089
1045 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { 1090 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1046 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); 1091 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1047 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { 1092 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1048 // `events` can only have kInEvent/kOutEvent flags set. 1093 // `events` can only have kInEvent/kOutEvent flags set.
1049 intptr_t events = msg->data & EVENT_MASK; 1094 intptr_t events = msg->data & EVENT_MASK;
1050 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); 1095 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1051 1096
1052 handle->SetPortAndMask(msg->dart_port, events); 1097 handle->SetPortAndMask(msg->dart_port, events);
1053 1098
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1107 DeleteIfClosed(handle); 1152 DeleteIfClosed(handle);
1108 } 1153 }
1109 } 1154 }
1110 1155
1111 1156
1112 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, 1157 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1113 OverlappedBuffer* buffer) { 1158 OverlappedBuffer* buffer) {
1114 listen_socket->AcceptComplete(buffer, completion_port_); 1159 listen_socket->AcceptComplete(buffer, completion_port_);
1115 1160
1116 { 1161 {
1117 Handle::ScopedLock lock(listen_socket); 1162 MonitorLocker ml(listen_socket->monitor_);
1118 TryDispatchingPendingAccepts(listen_socket); 1163 TryDispatchingPendingAccepts(listen_socket);
1119 } 1164 }
1120 1165
1121 DeleteIfClosed(listen_socket); 1166 DeleteIfClosed(listen_socket);
1122 } 1167 }
1123 1168
1124 1169
1125 void EventHandlerImplementation::TryDispatchingPendingAccepts( 1170 void EventHandlerImplementation::TryDispatchingPendingAccepts(
1126 ListenSocket *listen_socket) { 1171 ListenSocket *listen_socket) {
1127 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { 1172 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
1271 HandleConnect(client_socket, bytes, buffer); 1316 HandleConnect(client_socket, bytes, buffer);
1272 break; 1317 break;
1273 } 1318 }
1274 default: 1319 default:
1275 UNREACHABLE(); 1320 UNREACHABLE();
1276 } 1321 }
1277 } 1322 }
1278 1323
1279 1324
1280 EventHandlerImplementation::EventHandlerImplementation() { 1325 EventHandlerImplementation::EventHandlerImplementation() {
1326 startup_monitor_ = new Monitor();
1327 handler_thread_id_ = Thread::kInvalidThreadId;
1281 completion_port_ = 1328 completion_port_ =
1282 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); 1329 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
1283 if (completion_port_ == NULL) { 1330 if (completion_port_ == NULL) {
1284 FATAL("Completion port creation failed"); 1331 FATAL("Completion port creation failed");
1285 } 1332 }
1286 shutdown_ = false; 1333 shutdown_ = false;
1287 } 1334 }
1288 1335
1289 1336
1290 EventHandlerImplementation::~EventHandlerImplementation() { 1337 EventHandlerImplementation::~EventHandlerImplementation() {
1338 Thread::Join(handler_thread_id_);
1339 delete startup_monitor_;
1291 CloseHandle(completion_port_); 1340 CloseHandle(completion_port_);
1292 } 1341 }
1293 1342
1294 1343
1295 int64_t EventHandlerImplementation::GetTimeout() { 1344 int64_t EventHandlerImplementation::GetTimeout() {
1296 if (!timeout_queue_.HasTimeout()) { 1345 if (!timeout_queue_.HasTimeout()) {
1297 return kInfinityTimeout; 1346 return kInfinityTimeout;
1298 } 1347 }
1299 int64_t millis = timeout_queue_.CurrentTimeout() - 1348 int64_t millis = timeout_queue_.CurrentTimeout() -
1300 TimerUtils::GetCurrentTimeMilliseconds(); 1349 TimerUtils::GetCurrentTimeMilliseconds();
(...skipping 14 matching lines...) Expand all
1315 FATAL("PostQueuedCompletionStatus failed"); 1364 FATAL("PostQueuedCompletionStatus failed");
1316 } 1365 }
1317 } 1366 }
1318 1367
1319 1368
1320 void EventHandlerImplementation::EventHandlerEntry(uword args) { 1369 void EventHandlerImplementation::EventHandlerEntry(uword args) {
1321 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 1370 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1322 EventHandlerImplementation* handler_impl = &handler->delegate_; 1371 EventHandlerImplementation* handler_impl = &handler->delegate_;
1323 ASSERT(handler_impl != NULL); 1372 ASSERT(handler_impl != NULL);
1324 1373
1374 {
1375 MonitorLocker ml(handler_impl->startup_monitor_);
1376 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1377 ml.Notify();
1378 }
1379
1325 while (!handler_impl->shutdown_) { 1380 while (!handler_impl->shutdown_) {
1326 DWORD bytes; 1381 DWORD bytes;
1327 ULONG_PTR key; 1382 ULONG_PTR key;
1328 OVERLAPPED* overlapped; 1383 OVERLAPPED* overlapped;
1329 int64_t millis = handler_impl->GetTimeout(); 1384 int64_t millis = handler_impl->GetTimeout();
1330 ASSERT(millis == kInfinityTimeout || millis >= 0); 1385 ASSERT(millis == kInfinityTimeout || millis >= 0);
1331 if (millis > kMaxInt32) millis = kMaxInt32; 1386 if (millis > kMaxInt32) millis = kMaxInt32;
1332 ASSERT(sizeof(int32_t) == sizeof(DWORD)); 1387 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1333 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), 1388 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(),
1334 &bytes, 1389 &bytes,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1375 } 1430 }
1376 1431
1377 1432
1378 void EventHandlerImplementation::Start(EventHandler* handler) { 1433 void EventHandlerImplementation::Start(EventHandler* handler) {
1379 int result = Thread::Start(EventHandlerEntry, 1434 int result = Thread::Start(EventHandlerEntry,
1380 reinterpret_cast<uword>(handler)); 1435 reinterpret_cast<uword>(handler));
1381 if (result != 0) { 1436 if (result != 0) {
1382 FATAL1("Failed to start event handler thread %d", result); 1437 FATAL1("Failed to start event handler thread %d", result);
1383 } 1438 }
1384 1439
1440 {
1441 MonitorLocker ml(startup_monitor_);
1442 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1443 ml.Wait();
1444 }
1445 }
1446
1385 // Initialize Winsock32 1447 // Initialize Winsock32
1386 if (!Socket::Initialize()) { 1448 if (!Socket::Initialize()) {
1387 FATAL("Failed to initialized Windows sockets"); 1449 FATAL("Failed to initialized Windows sockets");
1388 } 1450 }
1389 } 1451 }
1390 1452
1391 1453
1392 void EventHandlerImplementation::Shutdown() { 1454 void EventHandlerImplementation::Shutdown() {
1393 SendData(kShutdownId, 0, 0); 1455 SendData(kShutdownId, 0, 0);
1394 } 1456 }
1395 1457
1396 } // namespace bin 1458 } // namespace bin
1397 } // namespace dart 1459 } // namespace dart
1398 1460
1399 #endif // defined(TARGET_OS_WINDOWS) 1461 #endif // defined(TARGET_OS_WINDOWS)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698