Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(683)

Side by Side Diff: runtime/bin/eventhandler_win.cc

Issue 8503006: Fix handling of async IO from pipes on Windows (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed review comments from ager@ Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/process_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include <process.h> 5 #include <process.h>
6 #include <winsock2.h> 6 #include <winsock2.h>
7 #include <ws2tcpip.h> 7 #include <ws2tcpip.h>
8 #include <mswsock.h> 8 #include <mswsock.h>
9 9
10 #include "bin/builtin.h" 10 #include "bin/builtin.h"
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
84 84
85 85
86 int IOBuffer::GetRemainingLength() { 86 int IOBuffer::GetRemainingLength() {
87 ASSERT(operation_ == kRead); 87 ASSERT(operation_ == kRead);
88 return data_length_ - index_; 88 return data_length_ - index_;
89 } 89 }
90 90
91 91
92 Handle::Handle(HANDLE handle) 92 Handle::Handle(HANDLE handle)
93 : handle_(reinterpret_cast<HANDLE>(handle)), 93 : handle_(reinterpret_cast<HANDLE>(handle)),
94 closing_(false), 94 flags_(0),
95 port_(0), 95 port_(0),
96 completion_port_(INVALID_HANDLE_VALUE), 96 completion_port_(INVALID_HANDLE_VALUE),
97 event_handler_(NULL), 97 event_handler_(NULL),
98 data_ready_(NULL), 98 data_ready_(NULL),
99 pending_read_(NULL), 99 pending_read_(NULL),
100 pending_write_(NULL) { 100 pending_write_(NULL) {
101 InitializeCriticalSection(&cs_); 101 InitializeCriticalSection(&cs_);
102 } 102 }
103 103
104 104
105 Handle::Handle(HANDLE handle, Dart_Port port) 105 Handle::Handle(HANDLE handle, Dart_Port port)
106 : handle_(reinterpret_cast<HANDLE>(handle)), 106 : handle_(reinterpret_cast<HANDLE>(handle)),
107 closing_(false), 107 flags_(0),
108 port_(port), 108 port_(port),
109 completion_port_(INVALID_HANDLE_VALUE), 109 completion_port_(INVALID_HANDLE_VALUE),
110 event_handler_(NULL), 110 event_handler_(NULL),
111 data_ready_(NULL), 111 data_ready_(NULL),
112 pending_read_(NULL), 112 pending_read_(NULL),
113 pending_write_(NULL) { 113 pending_write_(NULL) {
114 InitializeCriticalSection(&cs_); 114 InitializeCriticalSection(&cs_);
115 } 115 }
116 116
117 117
(...skipping 20 matching lines...) Expand all
138 if (completion_port_ == NULL) { 138 if (completion_port_ == NULL) {
139 fprintf(stderr, "Error CreateIoCompletionPort: %d\n", GetLastError()); 139 fprintf(stderr, "Error CreateIoCompletionPort: %d\n", GetLastError());
140 return false; 140 return false;
141 } 141 }
142 return true; 142 return true;
143 } 143 }
144 144
145 145
146 void Handle::close() { 146 void Handle::close() {
147 ScopedLock lock(this); 147 ScopedLock lock(this);
148 if (!closing_) { 148 if (!IsClosing()) {
149 // Close the socket and set the closing state. This close method can be 149 // Close the socket and set the closing state. This close method can be
150 // called again if this socket has pending IO operations in flight. 150 // called again if this socket has pending IO operations in flight.
151 ASSERT(handle_ != INVALID_HANDLE_VALUE); 151 ASSERT(handle_ != INVALID_HANDLE_VALUE);
152 closing_ = true; 152 MarkClosing();
153 // According to the documentation from Microsoft socket handles should 153 // According to the documentation from Microsoft socket handles should
154 // not be closed using CloseHandle but using closesocket. 154 // not be closed using CloseHandle but using closesocket.
155 if (is_socket()) { 155 if (is_socket()) {
156 closesocket(reinterpret_cast<SOCKET>(handle_)); 156 closesocket(reinterpret_cast<SOCKET>(handle_));
157 } else { 157 } else {
158 CloseHandle(handle_); 158 CloseHandle(handle_);
159 } 159 }
160 handle_ = INVALID_HANDLE_VALUE; 160 handle_ = INVALID_HANDLE_VALUE;
161 } 161 }
162 162
(...skipping 12 matching lines...) Expand all
175 ScopedLock lock(this); 175 ScopedLock lock(this);
176 return pending_write_ != NULL; 176 return pending_write_ != NULL;
177 } 177 }
178 178
179 179
180 void Handle::ReadComplete(IOBuffer* buffer) { 180 void Handle::ReadComplete(IOBuffer* buffer) {
181 ScopedLock lock(this); 181 ScopedLock lock(this);
182 // Currently only one outstanding read at the time. 182 // Currently only one outstanding read at the time.
183 ASSERT(pending_read_ == buffer); 183 ASSERT(pending_read_ == buffer);
184 ASSERT(data_ready_ == NULL); 184 ASSERT(data_ready_ == NULL);
185 if (!closing_ && !buffer->IsEmpty()) { 185 if (!IsClosing() && !buffer->IsEmpty()) {
186 data_ready_ = pending_read_; 186 data_ready_ = pending_read_;
187 } else { 187 } else {
188 IOBuffer::DisposeBuffer(buffer); 188 IOBuffer::DisposeBuffer(buffer);
189 } 189 }
190 pending_read_ = NULL; 190 pending_read_ = NULL;
191 } 191 }
192 192
193 193
194 void Handle::WriteComplete(IOBuffer* buffer) { 194 void Handle::WriteComplete(IOBuffer* buffer) {
195 ScopedLock lock(this); 195 ScopedLock lock(this);
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after
334 } 334 }
335 335
336 pending_accept_count_++; 336 pending_accept_count_++;
337 337
338 return true; 338 return true;
339 } 339 }
340 340
341 341
342 void ListenSocket::AcceptComplete(IOBuffer* buffer, HANDLE completion_port) { 342 void ListenSocket::AcceptComplete(IOBuffer* buffer, HANDLE completion_port) {
343 ScopedLock lock(this); 343 ScopedLock lock(this);
344 if (!closing_) { 344 if (!IsClosing()) {
345 // Update the accepted socket to support the full range of API calls. 345 // Update the accepted socket to support the full range of API calls.
346 SOCKET s = socket(); 346 SOCKET s = socket();
347 int rc = setsockopt(buffer->client(), 347 int rc = setsockopt(buffer->client(),
348 SOL_SOCKET, 348 SOL_SOCKET,
349 SO_UPDATE_ACCEPT_CONTEXT, 349 SO_UPDATE_ACCEPT_CONTEXT,
350 reinterpret_cast<char*>(&s), sizeof(s)); 350 reinterpret_cast<char*>(&s), sizeof(s));
351 if (rc == NO_ERROR) { 351 if (rc == NO_ERROR) {
352 // Insert the accepted socket into the list. 352 // Insert the accepted socket into the list.
353 ClientSocket* client_socket = new ClientSocket(buffer->client(), 0); 353 ClientSocket* client_socket = new ClientSocket(buffer->client(), 0);
354 client_socket->CreateCompletionPort(completion_port); 354 client_socket->CreateCompletionPort(completion_port);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
403 if (client != NULL) { 403 if (client != NULL) {
404 client->close(); 404 client->close();
405 } else { 405 } else {
406 break; 406 break;
407 } 407 }
408 } 408 }
409 } 409 }
410 410
411 411
412 bool ListenSocket::IsClosed() { 412 bool ListenSocket::IsClosed() {
413 return closing_ && !HasPendingAccept(); 413 return IsClosing() && !HasPendingAccept();
414 } 414 }
415 415
416 416
417 int Handle::Available() { 417 int Handle::Available() {
418 ScopedLock lock(this); 418 ScopedLock lock(this);
419 if (data_ready_ == NULL) return 0; 419 if (data_ready_ == NULL) return 0;
420 ASSERT(!data_ready_->IsEmpty()); 420 ASSERT(!data_ready_->IsEmpty());
421 return data_ready_->GetRemainingLength(); 421 return data_ready_->GetRemainingLength();
422 } 422 }
423 423
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
527 void ClientSocket::AfterClose() { 527 void ClientSocket::AfterClose() {
528 ScopedLock lock(this); 528 ScopedLock lock(this);
529 if (data_ready_ != NULL) { 529 if (data_ready_ != NULL) {
530 IOBuffer::DisposeBuffer(data_ready_); 530 IOBuffer::DisposeBuffer(data_ready_);
531 data_ready_ = NULL; 531 data_ready_ = NULL;
532 } 532 }
533 } 533 }
534 534
535 535
536 bool ClientSocket::IsClosed() { 536 bool ClientSocket::IsClosed() {
537 return closing_ && !HasPendingRead() && !HasPendingWrite(); 537 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
538 } 538 }
539 539
540 540
541 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { 541 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
542 if (msg->id == -1) { 542 if (msg->id == -1) {
543 // Change of timeout request. Just set the new timeout and port as the 543 // Change of timeout request. Just set the new timeout and port as the
544 // completion thread will use the new timeout value for its next wait. 544 // completion thread will use the new timeout value for its next wait.
545 timeout_ = msg->data; 545 timeout_ = msg->data;
546 timeout_port_ = msg->dart_port; 546 timeout_port_ = msg->dart_port;
547 } else { 547 } else {
548 bool delete_socket = false; 548 bool delete_handle = false;
549 Handle* socket_desc = 549 Handle* handle = reinterpret_cast<Handle*>(msg->id);
550 reinterpret_cast<Handle*>(msg->id); 550 ASSERT(handle != NULL);
551 ASSERT(socket_desc != NULL); 551 if (handle->is_listen_socket()) {
552 if (socket_desc->is_listen_socket()) {
553 ListenSocket* listen_socket = 552 ListenSocket* listen_socket =
554 reinterpret_cast<ListenSocket*>(socket_desc); 553 reinterpret_cast<ListenSocket*>(handle);
555 listen_socket->EnsureInitialized(this); 554 listen_socket->EnsureInitialized(this);
556 listen_socket->SetPortAndMask(msg->dart_port, msg->data); 555 listen_socket->SetPortAndMask(msg->dart_port, msg->data);
557 556
558 Handle::ScopedLock lock(listen_socket); 557 Handle::ScopedLock lock(listen_socket);
559 558
560 // If incomming connections are requested make sure that pending accepts 559 // If incomming connections are requested make sure that pending accepts
561 // are issued. 560 // are issued.
562 if ((msg->data & (1 << kInEvent)) != 0) { 561 if ((msg->data & (1 << kInEvent)) != 0) {
563 while (listen_socket->pending_accept_count() < 5) { 562 while (listen_socket->pending_accept_count() < 5) {
564 listen_socket->IssueAccept(); 563 listen_socket->IssueAccept();
565 } 564 }
566 } 565 }
567 566
568 if ((msg->data & (1 << kCloseCommand)) != 0) { 567 if ((msg->data & (1 << kCloseCommand)) != 0) {
569 listen_socket->close(); 568 listen_socket->close();
570 if (listen_socket->IsClosed()) { 569 if (listen_socket->IsClosed()) {
571 delete_socket = true; 570 delete_handle = true;
572 } 571 }
573 } 572 }
574 } else { 573 } else {
575 ClientSocket* client_socket = 574 handle->SetPortAndMask(msg->dart_port, msg->data);
576 reinterpret_cast<ClientSocket*>(socket_desc); 575 handle->EnsureInitialized(this);
577 client_socket->SetPortAndMask(msg->dart_port, msg->data);
578 client_socket->EnsureInitialized(this);
579 576
580 Handle::ScopedLock lock(client_socket); 577 Handle::ScopedLock lock(handle);
581 578
582 // If the data available callback has been requested and data are 579 // If the data available callback has been requested and data are
583 // available post it immediately. Otherwise make sure that a pending 580 // available post it immediately. Otherwise make sure that a pending
584 // read is issued unless the socket is already closed for read. 581 // read is issued unless the socket is already closed for read.
585 if ((msg->data & (1 << kInEvent)) != 0) { 582 if ((msg->data & (1 << kInEvent)) != 0) {
586 if (client_socket->Available() > 0) { 583 if (handle->Available() > 0) {
587 int event_mask = (1 << kInEvent); 584 int event_mask = (1 << kInEvent);
588 Dart_PostIntArray(client_socket->port(), 1, &event_mask); 585 Dart_PostIntArray(handle->port(), 1, &event_mask);
589 } else if (!client_socket->HasPendingRead() && 586 } else if (!handle->HasPendingRead() &&
590 !client_socket->IsClosedRead()) { 587 !handle->IsClosedRead()) {
591 client_socket->IssueRead(); 588 handle->IssueRead();
592 } 589 }
593 } 590 }
594 591
595 // If can send callback had been requested and there is no pending 592 // If can send callback had been requested and there is no pending
596 // send post it immediately. 593 // send post it immediately.
597 if ((msg->data & (1 << kOutEvent)) != 0) { 594 if ((msg->data & (1 << kOutEvent)) != 0) {
598 if (!client_socket->HasPendingWrite()) { 595 if (!handle->HasPendingWrite()) {
599 int event_mask = (1 << kOutEvent); 596 int event_mask = (1 << kOutEvent);
600 Dart_PostIntArray(client_socket->port(), 1, &event_mask); 597 Dart_PostIntArray(handle->port(), 1, &event_mask);
601 } 598 }
602 } 599 }
603 600
604 if ((msg->data & (1 << kShutdownReadCommand)) != 0) { 601 if (handle->is_client_socket()) {
605 client_socket->Shutdown(SD_RECEIVE); 602 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
606 } 603 if ((msg->data & (1 << kShutdownReadCommand)) != 0) {
604 client_socket->Shutdown(SD_RECEIVE);
605 }
607 606
608 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) { 607 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) {
609 client_socket->Shutdown(SD_SEND); 608 client_socket->Shutdown(SD_SEND);
609 }
610 } 610 }
611 611
612 if ((msg->data & (1 << kCloseCommand)) != 0) { 612 if ((msg->data & (1 << kCloseCommand)) != 0) {
613 client_socket->close(); 613 handle->close();
614 if (client_socket->IsClosed()) { 614 if (handle->IsClosed()) {
615 delete_socket = true; 615 delete_handle = true;
616 } 616 }
617 } 617 }
618 } 618 }
619 if (delete_socket) { 619 if (delete_handle) {
620 delete socket_desc; 620 delete handle;
621 } 621 }
622 } 622 }
623 } 623 }
624 624
625 625
626 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, 626 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
627 IOBuffer* buffer) { 627 IOBuffer* buffer) {
628 listen_socket->AcceptComplete(buffer, completion_port_); 628 listen_socket->AcceptComplete(buffer, completion_port_);
629 629
630 if (!listen_socket->is_closing()) { 630 if (!listen_socket->IsClosing()) {
631 int event_mask = 1 << kInEvent; 631 int event_mask = 1 << kInEvent;
632 if ((listen_socket->mask() & event_mask) != 0) { 632 if ((listen_socket->mask() & event_mask) != 0) {
633 Dart_PostIntArray(listen_socket->port(), 1, &event_mask); 633 Dart_PostIntArray(listen_socket->port(), 1, &event_mask);
634 } 634 }
635 } 635 }
636 636
637 if (listen_socket->IsClosed()) { 637 if (listen_socket->IsClosed()) {
638 delete listen_socket; 638 delete listen_socket;
639 } 639 }
640 } 640 }
641 641
642 642
643 void EventHandlerImplementation::HandleClosed(Handle* handle) { 643 void EventHandlerImplementation::HandleClosed(Handle* handle) {
644 if (!handle->is_closing()) { 644 if (!handle->IsClosing()) {
645 int event_mask = 1 << kCloseEvent; 645 int event_mask = 1 << kCloseEvent;
646 if ((handle->mask() & event_mask) != 0) { 646 if ((handle->mask() & event_mask) != 0) {
647 Dart_PostIntArray(handle->port(), 1, &event_mask); 647 Dart_PostIntArray(handle->port(), 1, &event_mask);
648 } 648 }
649 } 649 }
650 } 650 }
651 651
652 652
653 void EventHandlerImplementation::HandleRead(ClientSocket* client_socket, 653 void EventHandlerImplementation::HandleRead(Handle* handle,
654 int bytes, 654 int bytes,
655 IOBuffer* buffer) { 655 IOBuffer* buffer) {
656 buffer->set_data_length(bytes); 656 buffer->set_data_length(bytes);
657 client_socket->ReadComplete(buffer); 657 handle->ReadComplete(buffer);
658 if (bytes > 0) { 658 if (bytes > 0) {
659 if (!client_socket->is_closing()) { 659 if (!handle->IsClosing()) {
660 int event_mask = 1 << kInEvent; 660 int event_mask = 1 << kInEvent;
661 if ((client_socket->mask() & event_mask) != 0) { 661 if ((handle->mask() & event_mask) != 0) {
662 Dart_PostIntArray(client_socket->port(), 1, &event_mask); 662 Dart_PostIntArray(handle->port(), 1, &event_mask);
663 } 663 }
664 } 664 }
665 } else { 665 } else {
666 ASSERT(bytes == 0); 666 ASSERT(bytes == 0);
667 client_socket->MarkClosedRead(); 667 handle->MarkClosedRead();
668 HandleClosed(client_socket); 668 HandleClosed(handle);
669 } 669 }
670 670
671 if (client_socket->IsClosed()) { 671 if (handle->IsClosed()) {
672 delete client_socket; 672 delete handle;
673 } 673 }
674 } 674 }
675 675
676 676
677 void EventHandlerImplementation::HandleWrite(ClientSocket* client_socket, 677 void EventHandlerImplementation::HandleWrite(Handle* handle,
678 int bytes, 678 int bytes,
679 IOBuffer* buffer) { 679 IOBuffer* buffer) {
680 client_socket->WriteComplete(buffer); 680 handle->WriteComplete(buffer);
681 681
682 if (bytes > 0) { 682 if (bytes > 0) {
683 if (!client_socket->is_closing()) { 683 if (!handle->IsClosing()) {
684 int event_mask = 1 << kOutEvent; 684 int event_mask = 1 << kOutEvent;
685 if ((client_socket->mask() & event_mask) != 0) { 685 if ((handle->mask() & event_mask) != 0) {
686 Dart_PostIntArray(client_socket->port(), 1, &event_mask); 686 Dart_PostIntArray(handle->port(), 1, &event_mask);
687 } 687 }
688 } 688 }
689 } else { 689 } else {
690 ASSERT(bytes == 0); 690 ASSERT(bytes == 0);
691 HandleClosed(client_socket); 691 HandleClosed(handle);
692 } 692 }
693 693
694 if (client_socket->IsClosed()) { 694 if (handle->IsClosed()) {
695 delete client_socket; 695 delete handle;
696 } 696 }
697 } 697 }
698 698
699 699
700 void EventHandlerImplementation::HandleTimeout() { 700 void EventHandlerImplementation::HandleTimeout() {
701 // TODO(sgjesse) check if there actually is a timeout. 701 // TODO(sgjesse) check if there actually is a timeout.
702 Dart_PostIntArray(timeout_port_, 0, NULL); 702 Dart_PostIntArray(timeout_port_, 0, NULL);
703 timeout_ = kInfinityTimeout; 703 timeout_ = kInfinityTimeout;
704 timeout_port_ = 0; 704 timeout_port_ = 0;
705 } 705 }
706 706
707 707
708 void EventHandlerImplementation::HandleIOCompletion(DWORD bytes, 708 void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
709 ULONG_PTR key, 709 ULONG_PTR key,
710 OVERLAPPED* overlapped) { 710 OVERLAPPED* overlapped) {
711 IOBuffer* buffer = IOBuffer::GetFromOverlapped(overlapped); 711 IOBuffer* buffer = IOBuffer::GetFromOverlapped(overlapped);
712 switch (buffer->operation()) { 712 switch (buffer->operation()) {
713 case IOBuffer::kAccept: { 713 case IOBuffer::kAccept: {
714 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key); 714 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key);
715 HandleAccept(listen_socket, buffer); 715 HandleAccept(listen_socket, buffer);
716 break; 716 break;
717 } 717 }
718 case IOBuffer::kRead: { 718 case IOBuffer::kRead: {
719 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); 719 Handle* handle = reinterpret_cast<Handle*>(key);
720 HandleRead(client_socket, bytes, buffer); 720 HandleRead(handle, bytes, buffer);
721 break; 721 break;
722 } 722 }
723 case IOBuffer::kWrite: { 723 case IOBuffer::kWrite: {
724 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key); 724 Handle* handle = reinterpret_cast<Handle*>(key);
725 HandleWrite(client_socket, bytes, buffer); 725 HandleWrite(handle, bytes, buffer);
726 break; 726 break;
727 } 727 }
728 default: 728 default:
729 UNREACHABLE(); 729 UNREACHABLE();
730 } 730 }
731 } 731 }
732 732
733 733
734 EventHandlerImplementation::EventHandlerImplementation() { 734 EventHandlerImplementation::EventHandlerImplementation() {
735 intptr_t result; 735 intptr_t result;
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
826 _beginthreadex(NULL, 32 * 1024, EventHandlerThread, this, 0, &tid); 826 _beginthreadex(NULL, 32 * 1024, EventHandlerThread, this, 0, &tid);
827 if (thread_handle == -1) { 827 if (thread_handle == -1) {
828 FATAL("Failed to start event handler thread"); 828 FATAL("Failed to start event handler thread");
829 } 829 }
830 830
831 // Initialize Winsock32 831 // Initialize Winsock32
832 if (!Socket::Initialize()) { 832 if (!Socket::Initialize()) {
833 FATAL("Failed to initialized Windows sockets"); 833 FATAL("Failed to initialized Windows sockets");
834 } 834 }
835 } 835 }
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/process_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698