Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: dart/runtime/bin/eventhandler_win.cc

Issue 896213002: Revert "Introduce optional 'bool shared' parameter to ServerSocket.bind() ..." (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_WINDOWS) 6 #if defined(TARGET_OS_WINDOWS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 #include "bin/eventhandler_win.h"
10 9
11 #include <winsock2.h> // NOLINT 10 #include <winsock2.h> // NOLINT
12 #include <ws2tcpip.h> // NOLINT 11 #include <ws2tcpip.h> // NOLINT
13 #include <mswsock.h> // NOLINT 12 #include <mswsock.h> // NOLINT
14 #include <io.h> // NOLINT 13 #include <io.h> // NOLINT
15 #include <fcntl.h> // NOLINT 14 #include <fcntl.h> // NOLINT
16 15
17 #include "bin/builtin.h" 16 #include "bin/builtin.h"
18 #include "bin/dartutils.h" 17 #include "bin/dartutils.h"
19 #include "bin/lockers.h" 18 #include "bin/lockers.h"
20 #include "bin/log.h" 19 #include "bin/log.h"
21 #include "bin/socket.h" 20 #include "bin/socket.h"
22 #include "bin/thread.h" 21 #include "bin/thread.h"
23 #include "bin/utils.h" 22 #include "bin/utils.h"
24 23
25 #include "platform/utils.h" 24 #include "platform/utils.h"
26 25
27 namespace dart { 26 namespace dart {
28 namespace bin { 27 namespace bin {
29 28
30 static const int kBufferSize = 64 * 1024; 29 static const int kBufferSize = 64 * 1024;
31 static const int kStdOverlappedBufferSize = 16 * 1024; 30 static const int kStdOverlappedBufferSize = 16 * 1024;
32 31
32 static const int kInfinityTimeout = -1;
33 static const int kTimeoutId = -1;
34 static const int kShutdownId = -2;
35
33 OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size, 36 OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
34 Operation operation) { 37 Operation operation) {
35 OverlappedBuffer* buffer = 38 OverlappedBuffer* buffer =
36 new(buffer_size) OverlappedBuffer(buffer_size, operation); 39 new(buffer_size) OverlappedBuffer(buffer_size, operation);
37 return buffer; 40 return buffer;
38 } 41 }
39 42
40 43
41 OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) { 44 OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) {
42 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept); 45 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept);
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 data_length_ = num_bytes; 108 data_length_ = num_bytes;
106 return num_bytes; 109 return num_bytes;
107 } 110 }
108 111
109 112
110 int OverlappedBuffer::GetRemainingLength() { 113 int OverlappedBuffer::GetRemainingLength() {
111 ASSERT(operation_ == kRead || operation_ == kRecvFrom); 114 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
112 return data_length_ - index_; 115 return data_length_ - index_;
113 } 116 }
114 117
115 Handle::Handle(intptr_t handle) 118
116 : DescriptorInfoBase(handle), 119 Handle::Handle(HANDLE handle)
117 handle_(reinterpret_cast<HANDLE>(handle)), 120 : handle_(reinterpret_cast<HANDLE>(handle)),
121 port_(0),
122 mask_(0),
118 completion_port_(INVALID_HANDLE_VALUE), 123 completion_port_(INVALID_HANDLE_VALUE),
119 event_handler_(NULL), 124 event_handler_(NULL),
120 data_ready_(NULL), 125 data_ready_(NULL),
126 pending_read_(NULL),
127 pending_write_(NULL),
128 last_error_(NOERROR),
129 flags_(0) {
130 InitializeCriticalSection(&cs_);
131 }
132
133
134 Handle::Handle(HANDLE handle, Dart_Port port)
135 : handle_(reinterpret_cast<HANDLE>(handle)),
136 port_(port),
137 mask_(0),
138 completion_port_(INVALID_HANDLE_VALUE),
139 event_handler_(NULL),
140 data_ready_(NULL),
121 pending_read_(NULL), 141 pending_read_(NULL),
122 pending_write_(NULL), 142 pending_write_(NULL),
123 last_error_(NOERROR), 143 last_error_(NOERROR),
124 flags_(0) { 144 flags_(0) {
125 InitializeCriticalSection(&cs_); 145 InitializeCriticalSection(&cs_);
126 } 146 }
127 147
128 148
129 Handle::~Handle() { 149 Handle::~Handle() {
130 DeleteCriticalSection(&cs_); 150 DeleteCriticalSection(&cs_);
(...skipping 135 matching lines...) Expand 10 before | Expand all | Expand 10 after
266 pending_read_ = buffer; 286 pending_read_ = buffer;
267 return true; 287 return true;
268 } 288 }
269 OverlappedBuffer::DisposeBuffer(buffer); 289 OverlappedBuffer::DisposeBuffer(buffer);
270 HandleIssueError(); 290 HandleIssueError();
271 return false; 291 return false;
272 } else { 292 } else {
273 // Completing asynchronously through thread. 293 // Completing asynchronously through thread.
274 pending_read_ = buffer; 294 pending_read_ = buffer;
275 int result = Thread::Start(ReadFileThread, 295 int result = Thread::Start(ReadFileThread,
276 reinterpret_cast<uword>(this)); 296 reinterpret_cast<uword>(this));
277 if (result != 0) { 297 if (result != 0) {
278 FATAL1("Failed to start read file thread %d", result); 298 FATAL1("Failed to start read file thread %d", result);
279 } 299 }
280 return true; 300 return true;
281 } 301 }
282 } 302 }
283 303
284 304
285 bool Handle::IssueRecvFrom() { 305 bool Handle::IssueRecvFrom() {
286 return false; 306 return false;
(...skipping 25 matching lines...) Expand all
312 332
313 333
314 bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { 334 bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
315 return false; 335 return false;
316 } 336 }
317 337
318 338
319 static void HandleClosed(Handle* handle) { 339 static void HandleClosed(Handle* handle) {
320 if (!handle->IsClosing()) { 340 if (!handle->IsClosing()) {
321 int event_mask = 1 << kCloseEvent; 341 int event_mask = 1 << kCloseEvent;
322 DartUtils::PostInt32(handle->NextPort(), event_mask); 342 DartUtils::PostInt32(handle->port(), event_mask);
323 } 343 }
324 } 344 }
325 345
326 346
327 static void HandleError(Handle* handle) { 347 static void HandleError(Handle* handle) {
328 handle->set_last_error(WSAGetLastError()); 348 handle->set_last_error(WSAGetLastError());
329 handle->MarkError(); 349 handle->MarkError();
330 if (!handle->IsClosing() && handle->HasNextPort()) { 350 if (!handle->IsClosing()) {
331 DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent); 351 Dart_Port port = handle->port();
352 if (port != ILLEGAL_PORT) {
353 DartUtils::PostInt32(port, 1 << kErrorEvent);
354 }
332 } 355 }
333 } 356 }
334 357
335 358
336 void Handle::HandleIssueError() { 359 void Handle::HandleIssueError() {
337 DWORD error = GetLastError(); 360 DWORD error = GetLastError();
338 if (error == ERROR_BROKEN_PIPE) { 361 if (error == ERROR_BROKEN_PIPE) {
339 HandleClosed(this); 362 HandleClosed(this);
340 } else { 363 } else {
341 HandleError(this); 364 HandleError(this);
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
437 NULL); 460 NULL);
438 if (status == SOCKET_ERROR) { 461 if (status == SOCKET_ERROR) {
439 return false; 462 return false;
440 } 463 }
441 return true; 464 return true;
442 } 465 }
443 466
444 467
445 bool ListenSocket::IssueAccept() { 468 bool ListenSocket::IssueAccept() {
446 ScopedLock lock(this); 469 ScopedLock lock(this);
447
448 // For AcceptEx there needs to be buffer storage for address 470 // For AcceptEx there needs to be buffer storage for address
449 // information for two addresses (local and remote address). The 471 // information for two addresses (local and remote address). The
450 // AcceptEx documentation says: "This value must be at least 16 472 // AcceptEx documentation says: "This value must be at least 16
451 // bytes more than the maximum address length for the transport 473 // bytes more than the maximum address length for the transport
452 // protocol in use." 474 // protocol in use."
453 static const int kAcceptExAddressAdditionalBytes = 16; 475 static const int kAcceptExAddressAdditionalBytes = 16;
454 static const int kAcceptExAddressStorageSize = 476 static const int kAcceptExAddressStorageSize =
455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; 477 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
456 OverlappedBuffer* buffer = 478 OverlappedBuffer* buffer =
457 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize); 479 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
(...skipping 28 matching lines...) Expand all
486 ScopedLock lock(this); 508 ScopedLock lock(this);
487 if (!IsClosing()) { 509 if (!IsClosing()) {
488 // Update the accepted socket to support the full range of API calls. 510 // Update the accepted socket to support the full range of API calls.
489 SOCKET s = socket(); 511 SOCKET s = socket();
490 int rc = setsockopt(buffer->client(), 512 int rc = setsockopt(buffer->client(),
491 SOL_SOCKET, 513 SOL_SOCKET,
492 SO_UPDATE_ACCEPT_CONTEXT, 514 SO_UPDATE_ACCEPT_CONTEXT,
493 reinterpret_cast<char*>(&s), sizeof(s)); 515 reinterpret_cast<char*>(&s), sizeof(s));
494 if (rc == NO_ERROR) { 516 if (rc == NO_ERROR) {
495 // Insert the accepted socket into the list. 517 // Insert the accepted socket into the list.
496 ClientSocket* client_socket = new ClientSocket(buffer->client()); 518 ClientSocket* client_socket = new ClientSocket(buffer->client(), 0);
497 client_socket->mark_connected(); 519 client_socket->mark_connected();
498 client_socket->CreateCompletionPort(completion_port); 520 client_socket->CreateCompletionPort(completion_port);
499 if (accepted_head_ == NULL) { 521 if (accepted_head_ == NULL) {
500 accepted_head_ = client_socket; 522 accepted_head_ = client_socket;
501 accepted_tail_ = client_socket; 523 accepted_tail_ = client_socket;
502 } else { 524 } else {
503 ASSERT(accepted_tail_ != NULL); 525 ASSERT(accepted_tail_ != NULL);
504 accepted_tail_->set_next(client_socket); 526 accepted_tail_->set_next(client_socket);
505 accepted_tail_ = client_socket; 527 accepted_tail_ = client_socket;
506 } 528 }
507 accepted_count_++;
508 } else { 529 } else {
509 closesocket(buffer->client()); 530 closesocket(buffer->client());
510 } 531 }
511 } else { 532 } else {
512 // Close the socket, as it's already accepted. 533 // Close the socket, as it's already accepted.
513 closesocket(buffer->client()); 534 closesocket(buffer->client());
514 } 535 }
515 536
516 pending_accept_count_--; 537 pending_accept_count_--;
517 OverlappedBuffer::DisposeBuffer(buffer); 538 OverlappedBuffer::DisposeBuffer(buffer);
518 } 539 }
519 540
520 541
521 static void DeleteIfClosed(Handle* handle) { 542 static void DeleteIfClosed(Handle* handle) {
522 if (handle->IsClosed()) { 543 if (handle->IsClosed()) {
523 handle->SendToAll(1 << kDestroyedEvent); 544 Dart_Port port = handle->port();
524 delete handle; 545 delete handle;
546 if (port != ILLEGAL_PORT) {
547 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
548 }
525 } 549 }
526 } 550 }
527 551
528 552
529 void ListenSocket::DoClose() { 553 void ListenSocket::DoClose() {
530 closesocket(socket()); 554 closesocket(socket());
531 handle_ = INVALID_HANDLE_VALUE; 555 handle_ = INVALID_HANDLE_VALUE;
532 while (CanAccept()) { 556 while (CanAccept()) {
533 // Get rid of connections already accepted. 557 // Get rid of connections already accepted.
534 ClientSocket *client = Accept(); 558 ClientSocket *client = Accept();
535 if (client != NULL) { 559 if (client != NULL) {
536 client->Close(); 560 client->Close();
537 DeleteIfClosed(client); 561 DeleteIfClosed(client);
538 } else { 562 } else {
539 break; 563 break;
540 } 564 }
541 } 565 }
542 } 566 }
543 567
544 568
545 bool ListenSocket::CanAccept() { 569 bool ListenSocket::CanAccept() {
546 ScopedLock lock(this); 570 ScopedLock lock(this);
547 return accepted_head_ != NULL; 571 return accepted_head_ != NULL;
548 } 572 }
549 573
550 574
551 ClientSocket* ListenSocket::Accept() { 575 ClientSocket* ListenSocket::Accept() {
552 ScopedLock lock(this); 576 ScopedLock lock(this);
553 577 if (accepted_head_ == NULL) return NULL;
554 ClientSocket *result = NULL; 578 ClientSocket* result = accepted_head_;
555 579 accepted_head_ = accepted_head_->next();
556 if (accepted_head_ != NULL) { 580 if (accepted_head_ == NULL) accepted_tail_ = NULL;
557 result = accepted_head_; 581 result->set_next(NULL);
558 accepted_head_ = accepted_head_->next();
559 if (accepted_head_ == NULL) accepted_tail_ = NULL;
560 result->set_next(NULL);
561 accepted_count_--;
562 }
563
564 if (!IsClosing()) { 582 if (!IsClosing()) {
565 if (!IssueAccept()) { 583 if (!IssueAccept()) {
566 HandleError(this); 584 HandleError(this);
567 } 585 }
568 } 586 }
569
570 return result; 587 return result;
571 } 588 }
572 589
573 590
574 void ListenSocket::EnsureInitialized( 591 void ListenSocket::EnsureInitialized(
575 EventHandlerImplementation* event_handler) { 592 EventHandlerImplementation* event_handler) {
576 ScopedLock lock(this); 593 ScopedLock lock(this);
577 if (AcceptEx_ == NULL) { 594 if (AcceptEx_ == NULL) {
578 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); 595 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
579 ASSERT(event_handler_ == NULL); 596 ASSERT(event_handler_ == NULL);
(...skipping 276 matching lines...) Expand 10 before | Expand all | Expand 10 after
856 873
857 void ClientSocket::IssueDisconnect() { 874 void ClientSocket::IssueDisconnect() {
858 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); 875 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
859 BOOL ok = DisconnectEx_( 876 BOOL ok = DisconnectEx_(
860 socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); 877 socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
861 // DisconnectEx works like other OverlappedIO APIs, where we can get either an 878 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
862 // immediate success or delayed operation by WSA_IO_PENDING being set. 879 // immediate success or delayed operation by WSA_IO_PENDING being set.
863 if (ok || WSAGetLastError() != WSA_IO_PENDING) { 880 if (ok || WSAGetLastError() != WSA_IO_PENDING) {
864 DisconnectComplete(buffer); 881 DisconnectComplete(buffer);
865 } 882 }
866 if (HasNextPort()) { 883 Dart_Port p = port();
867 Dart_Port p = NextPort(); 884 if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent);
868 DartUtils::PostInt32(p, 1 << kDestroyedEvent); 885 port_ = ILLEGAL_PORT;
869 RemovePort(p);
870 }
871 } 886 }
872 887
873 888
874 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { 889 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
875 OverlappedBuffer::DisposeBuffer(buffer); 890 OverlappedBuffer::DisposeBuffer(buffer);
876 closesocket(socket()); 891 closesocket(socket());
877 if (data_ready_ != NULL) { 892 if (data_ready_ != NULL) {
878 OverlappedBuffer::DisposeBuffer(data_ready_); 893 OverlappedBuffer::DisposeBuffer(data_ready_);
879 } 894 }
880 closed_ = true; 895 closed_ = true;
881 } 896 }
882 897
883 898
884 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { 899 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
885 OverlappedBuffer::DisposeBuffer(buffer); 900 OverlappedBuffer::DisposeBuffer(buffer);
886 // Update socket to support full socket API, after ConnectEx completed. 901 // Update socket to support full socket API, after ConnectEx completed.
887 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 902 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
888 if (HasNextPort()) { 903 Dart_Port p = port();
904 if (p != ILLEGAL_PORT) {
889 // If the port is set, we already listen for this socket in Dart. 905 // If the port is set, we already listen for this socket in Dart.
890 // Handle the cases here. 906 // Handle the cases here.
891 if (!IsClosedRead()) { 907 if (!IsClosedRead()) {
892 IssueRead(); 908 IssueRead();
893 } 909 }
894 if (!IsClosedWrite()) { 910 if (!IsClosedWrite()) {
895 DartUtils::PostInt32(NextPort(), 1 << kOutEvent); 911 DartUtils::PostInt32(p, 1 << kOutEvent);
896 } 912 }
897 } 913 }
898 } 914 }
899 915
900 916
901 void ClientSocket::EnsureInitialized( 917 void ClientSocket::EnsureInitialized(
902 EventHandlerImplementation* event_handler) { 918 EventHandlerImplementation* event_handler) {
903 ScopedLock lock(this); 919 ScopedLock lock(this);
904 if (completion_port_ == INVALID_HANDLE_VALUE) { 920 if (completion_port_ == INVALID_HANDLE_VALUE) {
905 ASSERT(event_handler_ == NULL); 921 ASSERT(event_handler_ == NULL);
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
988 // Just close the socket. This will cause any queued requests to be aborted. 1004 // Just close the socket. This will cause any queued requests to be aborted.
989 closesocket(socket()); 1005 closesocket(socket());
990 MarkClosedRead(); 1006 MarkClosedRead();
991 MarkClosedWrite(); 1007 MarkClosedWrite();
992 handle_ = INVALID_HANDLE_VALUE; 1008 handle_ = INVALID_HANDLE_VALUE;
993 } 1009 }
994 1010
995 1011
996 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { 1012 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
997 ASSERT(this != NULL); 1013 ASSERT(this != NULL);
998 if (msg->id == kTimerId) { 1014 if (msg->id == kTimeoutId) {
999 // Change of timeout request. Just set the new timeout and port as the 1015 // Change of timeout request. Just set the new timeout and port as the
1000 // completion thread will use the new timeout value for its next wait. 1016 // completion thread will use the new timeout value for its next wait.
1001 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); 1017 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1002 } else if (msg->id == kShutdownId) { 1018 } else if (msg->id == kShutdownId) {
1003 shutdown_ = true; 1019 shutdown_ = true;
1004 } else { 1020 } else {
1021 // No tokens to return on Windows.
1022 if ((msg->data & (1 << kReturnTokenCommand)) != 0) return;
1005 Handle* handle = reinterpret_cast<Handle*>(msg->id); 1023 Handle* handle = reinterpret_cast<Handle*>(msg->id);
1006 ASSERT(handle != NULL); 1024 ASSERT(handle != NULL);
1007
1008 if (handle->is_listen_socket()) { 1025 if (handle->is_listen_socket()) {
1009 ListenSocket* listen_socket = 1026 ListenSocket* listen_socket =
1010 reinterpret_cast<ListenSocket*>(handle); 1027 reinterpret_cast<ListenSocket*>(handle);
1011 listen_socket->EnsureInitialized(this); 1028 listen_socket->EnsureInitialized(this);
1029 listen_socket->SetPortAndMask(msg->dart_port, msg->data);
1012 1030
1013 Handle::ScopedLock lock(listen_socket); 1031 Handle::ScopedLock lock(listen_socket);
1014 1032
1015 // If incoming connections are requested make sure to post already 1033 // If incoming connections are requested make sure to post already
1016 // accepted connections. 1034 // accepted connections.
1017 if ((msg->data & (1 << kInEvent)) != 0) { 1035 if ((msg->data & (1 << kInEvent)) != 0) {
1018 listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); 1036 if (listen_socket->CanAccept()) {
1019 TryDispatchingPendingAccepts(listen_socket); 1037 int event_mask = (1 << kInEvent);
1020 } 1038 handle->set_mask(handle->mask() & ~event_mask);
1021 1039 DartUtils::PostInt32(handle->port(), event_mask);
1022 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1023 int count = TOKEN_COUNT(msg->data);
1024 listen_socket->ReturnTokens(msg->dart_port, count);
1025 TryDispatchingPendingAccepts(listen_socket);
1026 return;
1027 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1028 Dart_Port port = msg->dart_port;
1029 listen_socket->RemovePort(port);
1030
1031 MutexLocker locker(globalTcpListeningSocketRegistry.mutex());
1032 if (globalTcpListeningSocketRegistry.CloseSafe(
1033 reinterpret_cast<intptr_t>(listen_socket))) {
1034 handle->Close();
1035 } 1040 }
1036 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
1037 } 1041 }
1038 } else { 1042 } else {
1039 handle->EnsureInitialized(this); 1043 handle->EnsureInitialized(this);
1040 1044
1041 Handle::ScopedLock lock(handle); 1045 Handle::ScopedLock lock(handle);
1042 1046
1043 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1044 int count = TOKEN_COUNT(msg->data);
1045 handle->ReturnTokens(msg->dart_port, count);
1046 // TODO(kustermann): How can we continue with sending events
1047 // to dart from here?
1048 return;
1049 }
1050
1051 // Only set mask if we turned on kInEvent or kOutEvent. 1047 // Only set mask if we turned on kInEvent or kOutEvent.
1052 if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) { 1048 if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) {
1053 handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); 1049 handle->SetPortAndMask(msg->dart_port, msg->data);
1054 } 1050 }
1055 1051
1056 // Issue a read. 1052 // Issue a read.
1057 if ((msg->data & (1 << kInEvent)) != 0) { 1053 if ((msg->data & (1 << kInEvent)) != 0) {
1058 if (handle->is_datagram_socket()) { 1054 if (handle->is_datagram_socket()) {
1059 handle->IssueRecvFrom(); 1055 handle->IssueRecvFrom();
1060 } else if (handle->is_client_socket()) { 1056 } else if (handle->is_client_socket()) {
1061 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { 1057 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1062 handle->IssueRead(); 1058 handle->IssueRead();
1063 } 1059 }
1064 } else { 1060 } else {
1065 handle->IssueRead(); 1061 handle->IssueRead();
1066 } 1062 }
1067 } 1063 }
1068 1064
1069 // If out events (can write events) have been requested, and there 1065 // If out events (can write events) have been requested, and there
1070 // are no pending writes, meaning any writes are already complete, 1066 // are no pending writes, meaning any writes are already complete,
1071 // post an out event immediately. 1067 // post an out event immediately.
1072 if ((msg->data & (1 << kOutEvent)) != 0) { 1068 if ((msg->data & (1 << kOutEvent)) != 0) {
1073 if (!handle->HasPendingWrite()) { 1069 if (!handle->HasPendingWrite()) {
1074 if (handle->is_client_socket()) { 1070 if (handle->is_client_socket()) {
1075 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { 1071 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1076 DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent); 1072 DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
1077 } 1073 }
1078 } else { 1074 } else {
1079 DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent); 1075 DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
1080 } 1076 }
1081 } 1077 }
1082 } 1078 }
1083 1079
1084 if (handle->is_client_socket()) { 1080 if (handle->is_client_socket()) {
1085 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); 1081 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1086 if ((msg->data & (1 << kShutdownReadCommand)) != 0) { 1082 if ((msg->data & (1 << kShutdownReadCommand)) != 0) {
1087 client_socket->Shutdown(SD_RECEIVE); 1083 client_socket->Shutdown(SD_RECEIVE);
1088 } 1084 }
1089 1085
1090 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) { 1086 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) {
1091 client_socket->Shutdown(SD_SEND); 1087 client_socket->Shutdown(SD_SEND);
1092 } 1088 }
1093 } 1089 }
1090 }
1094 1091
1095 if ((msg->data & (1 << kCloseCommand)) != 0) { 1092 if ((msg->data & (1 << kCloseCommand)) != 0) {
1096 handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); 1093 handle->SetPortAndMask(msg->dart_port, msg->data);
1097 handle->Close(); 1094 handle->Close();
1098 }
1099 } 1095 }
1096
1100 DeleteIfClosed(handle); 1097 DeleteIfClosed(handle);
1101 } 1098 }
1102 } 1099 }
1103 1100
1104 1101
1105 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, 1102 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1106 OverlappedBuffer* buffer) { 1103 OverlappedBuffer* buffer) {
1107 listen_socket->AcceptComplete(buffer, completion_port_); 1104 listen_socket->AcceptComplete(buffer, completion_port_);
1108 1105
1109 TryDispatchingPendingAccepts(listen_socket); 1106 if (!listen_socket->IsClosing()) {
1107 int event_mask = 1 << kInEvent;
1108 if ((listen_socket->mask() & event_mask) != 0) {
1109 DartUtils::PostInt32(listen_socket->port(), event_mask);
1110 }
1111 }
1110 1112
1111 DeleteIfClosed(listen_socket); 1113 DeleteIfClosed(listen_socket);
1112 } 1114 }
1113 1115
1114 1116
1115 void EventHandlerImplementation::TryDispatchingPendingAccepts(
1116 ListenSocket *listen_socket) {
1117 Handle::ScopedLock lock(listen_socket);
1118
1119 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1120 for (int i = 0;
1121 i < listen_socket->accepted_count() && listen_socket->HasNextPort();
1122 i++) {
1123 Dart_Port port = listen_socket->NextPort();
1124 DartUtils::PostInt32(port, 1 << kInEvent);
1125 if (listen_socket->TakeToken()) {
1126 break;
1127 }
1128 }
1129 }
1130 }
1131
1132
1133 void EventHandlerImplementation::HandleRead(Handle* handle, 1117 void EventHandlerImplementation::HandleRead(Handle* handle,
1134 int bytes, 1118 int bytes,
1135 OverlappedBuffer* buffer) { 1119 OverlappedBuffer* buffer) {
1136 buffer->set_data_length(bytes); 1120 buffer->set_data_length(bytes);
1137 handle->ReadComplete(buffer); 1121 handle->ReadComplete(buffer);
1138 if (bytes > 0) { 1122 if (bytes > 0) {
1139 if (!handle->IsClosing()) { 1123 if (!handle->IsClosing()) {
1140 int event_mask = 1 << kInEvent; 1124 int event_mask = 1 << kInEvent;
1141 if ((handle->Mask() & event_mask) != 0) { 1125 if ((handle->mask() & event_mask) != 0) {
1142 DartUtils::PostInt32(handle->NextPort(), event_mask); 1126 DartUtils::PostInt32(handle->port(), event_mask);
1143 } 1127 }
1144 } 1128 }
1145 } else { 1129 } else {
1146 handle->MarkClosedRead(); 1130 handle->MarkClosedRead();
1147 if (bytes == 0) { 1131 if (bytes == 0) {
1148 HandleClosed(handle); 1132 HandleClosed(handle);
1149 } else { 1133 } else {
1150 HandleError(handle); 1134 HandleError(handle);
1151 } 1135 }
1152 } 1136 }
1153 1137
1154 DeleteIfClosed(handle); 1138 DeleteIfClosed(handle);
1155 } 1139 }
1156 1140
1157 1141
1158 void EventHandlerImplementation::HandleRecvFrom(Handle* handle, 1142 void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1159 int bytes, 1143 int bytes,
1160 OverlappedBuffer* buffer) { 1144 OverlappedBuffer* buffer) {
1161 ASSERT(handle->is_datagram_socket()); 1145 ASSERT(handle->is_datagram_socket());
1162 buffer->set_data_length(bytes); 1146 buffer->set_data_length(bytes);
1163 handle->ReadComplete(buffer); 1147 handle->ReadComplete(buffer);
1164 if (!handle->IsClosing()) { 1148 if (!handle->IsClosing()) {
1165 int event_mask = 1 << kInEvent; 1149 int event_mask = 1 << kInEvent;
1166 if ((handle->Mask() & event_mask) != 0) { 1150 if ((handle->mask() & event_mask) != 0) {
1167 DartUtils::PostInt32(handle->NextPort(), event_mask); 1151 DartUtils::PostInt32(handle->port(), event_mask);
1168 } 1152 }
1169 } 1153 }
1170 1154
1171 DeleteIfClosed(handle); 1155 DeleteIfClosed(handle);
1172 } 1156 }
1173 1157
1174 1158
1175 void EventHandlerImplementation::HandleWrite(Handle* handle, 1159 void EventHandlerImplementation::HandleWrite(Handle* handle,
1176 int bytes, 1160 int bytes,
1177 OverlappedBuffer* buffer) { 1161 OverlappedBuffer* buffer) {
1178 handle->WriteComplete(buffer); 1162 handle->WriteComplete(buffer);
1179 1163
1180 if (bytes >= 0) { 1164 if (bytes >= 0) {
1181 if (!handle->IsError() && !handle->IsClosing()) { 1165 if (!handle->IsError() && !handle->IsClosing()) {
1182 int event_mask = 1 << kOutEvent; 1166 int event_mask = 1 << kOutEvent;
1183 ASSERT(!handle->is_client_socket() || 1167 ASSERT(!handle->is_client_socket() ||
1184 reinterpret_cast<ClientSocket*>(handle)->is_connected()); 1168 reinterpret_cast<ClientSocket*>(handle)->is_connected());
1185 if ((handle->Mask() & event_mask) != 0) { 1169 if ((handle->mask() & event_mask) != 0) {
1186 DartUtils::PostInt32(handle->NextPort(), event_mask); 1170 DartUtils::PostInt32(handle->port(), event_mask);
1187 } 1171 }
1188 } 1172 }
1189 } else { 1173 } else {
1190 HandleError(handle); 1174 HandleError(handle);
1191 } 1175 }
1192 1176
1193 DeleteIfClosed(handle); 1177 DeleteIfClosed(handle);
1194 } 1178 }
1195 1179
1196 1180
(...skipping 162 matching lines...) Expand 10 before | Expand all | Expand 10 after
1359 } else { 1343 } else {
1360 handler_impl->HandleIOCompletion(bytes, key, overlapped); 1344 handler_impl->HandleIOCompletion(bytes, key, overlapped);
1361 } 1345 }
1362 } 1346 }
1363 delete handler; 1347 delete handler;
1364 } 1348 }
1365 1349
1366 1350
1367 void EventHandlerImplementation::Start(EventHandler* handler) { 1351 void EventHandlerImplementation::Start(EventHandler* handler) {
1368 int result = Thread::Start(EventHandlerEntry, 1352 int result = Thread::Start(EventHandlerEntry,
1369 reinterpret_cast<uword>(handler)); 1353 reinterpret_cast<uword>(handler));
1370 if (result != 0) { 1354 if (result != 0) {
1371 FATAL1("Failed to start event handler thread %d", result); 1355 FATAL1("Failed to start event handler thread %d", result);
1372 } 1356 }
1373 1357
1374 // Initialize Winsock32 1358 // Initialize Winsock32
1375 if (!Socket::Initialize()) { 1359 if (!Socket::Initialize()) {
1376 FATAL("Failed to initialized Windows sockets"); 1360 FATAL("Failed to initialized Windows sockets");
1377 } 1361 }
1378 } 1362 }
1379 1363
1380 1364
1381 void EventHandlerImplementation::Shutdown() { 1365 void EventHandlerImplementation::Shutdown() {
1382 SendData(kShutdownId, 0, 0); 1366 SendData(kShutdownId, 0, 0);
1383 } 1367 }
1384 1368
1385 } // namespace bin 1369 } // namespace bin
1386 } // namespace dart 1370 } // namespace dart
1387 1371
1388 #endif // defined(TARGET_OS_WINDOWS) 1372 #endif // defined(TARGET_OS_WINDOWS)
OLDNEW
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698