Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(548)

Side by Side Diff: dart/runtime/bin/eventhandler_win.cc

Issue 879353003: Introduce optional 'bool shared' parameter to ServerSocket.bind() ... (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge
Patch Set: Addressed comments Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_WINDOWS) 6 #if defined(TARGET_OS_WINDOWS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 #include "bin/eventhandler_win.h"
9 10
10 #include <winsock2.h> // NOLINT 11 #include <winsock2.h> // NOLINT
11 #include <ws2tcpip.h> // NOLINT 12 #include <ws2tcpip.h> // NOLINT
12 #include <mswsock.h> // NOLINT 13 #include <mswsock.h> // NOLINT
13 #include <io.h> // NOLINT 14 #include <io.h> // NOLINT
14 #include <fcntl.h> // NOLINT 15 #include <fcntl.h> // NOLINT
15 16
16 #include "bin/builtin.h" 17 #include "bin/builtin.h"
17 #include "bin/dartutils.h" 18 #include "bin/dartutils.h"
18 #include "bin/lockers.h" 19 #include "bin/lockers.h"
19 #include "bin/log.h" 20 #include "bin/log.h"
20 #include "bin/socket.h" 21 #include "bin/socket.h"
21 #include "bin/thread.h" 22 #include "bin/thread.h"
22 #include "bin/utils.h" 23 #include "bin/utils.h"
23 24
24 #include "platform/utils.h" 25 #include "platform/utils.h"
25 26
26 namespace dart { 27 namespace dart {
27 namespace bin { 28 namespace bin {
28 29
29 static const int kBufferSize = 64 * 1024; 30 static const int kBufferSize = 64 * 1024;
30 static const int kStdOverlappedBufferSize = 16 * 1024; 31 static const int kStdOverlappedBufferSize = 16 * 1024;
31 32
32 static const int kInfinityTimeout = -1;
33 static const int kTimeoutId = -1;
34 static const int kShutdownId = -2;
35
36 OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size, 33 OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
37 Operation operation) { 34 Operation operation) {
38 OverlappedBuffer* buffer = 35 OverlappedBuffer* buffer =
39 new(buffer_size) OverlappedBuffer(buffer_size, operation); 36 new(buffer_size) OverlappedBuffer(buffer_size, operation);
40 return buffer; 37 return buffer;
41 } 38 }
42 39
43 40
44 OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) { 41 OverlappedBuffer* OverlappedBuffer::AllocateAcceptBuffer(int buffer_size) {
45 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept); 42 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept);
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 data_length_ = num_bytes; 105 data_length_ = num_bytes;
109 return num_bytes; 106 return num_bytes;
110 } 107 }
111 108
112 109
113 int OverlappedBuffer::GetRemainingLength() { 110 int OverlappedBuffer::GetRemainingLength() {
114 ASSERT(operation_ == kRead || operation_ == kRecvFrom); 111 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
115 return data_length_ - index_; 112 return data_length_ - index_;
116 } 113 }
117 114
118 115 Handle::Handle(intptr_t handle)
119 Handle::Handle(HANDLE handle) 116 : DescriptorInfoBase(handle),
120 : handle_(reinterpret_cast<HANDLE>(handle)), 117 handle_(reinterpret_cast<HANDLE>(handle)),
121 port_(0),
122 mask_(0),
123 completion_port_(INVALID_HANDLE_VALUE), 118 completion_port_(INVALID_HANDLE_VALUE),
124 event_handler_(NULL), 119 event_handler_(NULL),
125 data_ready_(NULL), 120 data_ready_(NULL),
126 pending_read_(NULL),
127 pending_write_(NULL),
128 last_error_(NOERROR),
129 flags_(0) {
130 InitializeCriticalSection(&cs_);
131 }
132
133
134 Handle::Handle(HANDLE handle, Dart_Port port)
135 : handle_(reinterpret_cast<HANDLE>(handle)),
136 port_(port),
137 mask_(0),
138 completion_port_(INVALID_HANDLE_VALUE),
139 event_handler_(NULL),
140 data_ready_(NULL),
141 pending_read_(NULL), 121 pending_read_(NULL),
142 pending_write_(NULL), 122 pending_write_(NULL),
143 last_error_(NOERROR), 123 last_error_(NOERROR),
144 flags_(0) { 124 flags_(0) {
145 InitializeCriticalSection(&cs_); 125 InitializeCriticalSection(&cs_);
146 } 126 }
147 127
148 128
149 Handle::~Handle() { 129 Handle::~Handle() {
150 DeleteCriticalSection(&cs_); 130 DeleteCriticalSection(&cs_);
(...skipping 135 matching lines...) Expand 10 before | Expand all | Expand 10 after
286 pending_read_ = buffer; 266 pending_read_ = buffer;
287 return true; 267 return true;
288 } 268 }
289 OverlappedBuffer::DisposeBuffer(buffer); 269 OverlappedBuffer::DisposeBuffer(buffer);
290 HandleIssueError(); 270 HandleIssueError();
291 return false; 271 return false;
292 } else { 272 } else {
293 // Completing asynchronously through thread. 273 // Completing asynchronously through thread.
294 pending_read_ = buffer; 274 pending_read_ = buffer;
295 int result = Thread::Start(ReadFileThread, 275 int result = Thread::Start(ReadFileThread,
296 reinterpret_cast<uword>(this)); 276 reinterpret_cast<uword>(this));
297 if (result != 0) { 277 if (result != 0) {
298 FATAL1("Failed to start read file thread %d", result); 278 FATAL1("Failed to start read file thread %d", result);
299 } 279 }
300 return true; 280 return true;
301 } 281 }
302 } 282 }
303 283
304 284
305 bool Handle::IssueRecvFrom() { 285 bool Handle::IssueRecvFrom() {
306 return false; 286 return false;
(...skipping 25 matching lines...) Expand all
332 312
333 313
334 bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { 314 bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
335 return false; 315 return false;
336 } 316 }
337 317
338 318
339 static void HandleClosed(Handle* handle) { 319 static void HandleClosed(Handle* handle) {
340 if (!handle->IsClosing()) { 320 if (!handle->IsClosing()) {
341 int event_mask = 1 << kCloseEvent; 321 int event_mask = 1 << kCloseEvent;
342 DartUtils::PostInt32(handle->port(), event_mask); 322 DartUtils::PostInt32(handle->NextPort(), event_mask);
343 } 323 }
344 } 324 }
345 325
346 326
347 static void HandleError(Handle* handle) { 327 static void HandleError(Handle* handle) {
348 handle->set_last_error(WSAGetLastError()); 328 handle->set_last_error(WSAGetLastError());
349 handle->MarkError(); 329 handle->MarkError();
350 if (!handle->IsClosing()) { 330 if (!handle->IsClosing() && handle->HasNextPort()) {
351 Dart_Port port = handle->port(); 331 DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent);
352 if (port != ILLEGAL_PORT) {
353 DartUtils::PostInt32(port, 1 << kErrorEvent);
354 }
355 } 332 }
356 } 333 }
357 334
358 335
359 void Handle::HandleIssueError() { 336 void Handle::HandleIssueError() {
360 DWORD error = GetLastError(); 337 DWORD error = GetLastError();
361 if (error == ERROR_BROKEN_PIPE) { 338 if (error == ERROR_BROKEN_PIPE) {
362 HandleClosed(this); 339 HandleClosed(this);
363 } else { 340 } else {
364 HandleError(this); 341 HandleError(this);
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
460 NULL); 437 NULL);
461 if (status == SOCKET_ERROR) { 438 if (status == SOCKET_ERROR) {
462 return false; 439 return false;
463 } 440 }
464 return true; 441 return true;
465 } 442 }
466 443
467 444
468 bool ListenSocket::IssueAccept() { 445 bool ListenSocket::IssueAccept() {
469 ScopedLock lock(this); 446 ScopedLock lock(this);
447
470 // For AcceptEx there needs to be buffer storage for address 448 // For AcceptEx there needs to be buffer storage for address
471 // information for two addresses (local and remote address). The 449 // information for two addresses (local and remote address). The
472 // AcceptEx documentation says: "This value must be at least 16 450 // AcceptEx documentation says: "This value must be at least 16
473 // bytes more than the maximum address length for the transport 451 // bytes more than the maximum address length for the transport
474 // protocol in use." 452 // protocol in use."
475 static const int kAcceptExAddressAdditionalBytes = 16; 453 static const int kAcceptExAddressAdditionalBytes = 16;
476 static const int kAcceptExAddressStorageSize = 454 static const int kAcceptExAddressStorageSize =
477 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; 455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
478 OverlappedBuffer* buffer = 456 OverlappedBuffer* buffer =
479 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize); 457 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
(...skipping 28 matching lines...) Expand all
508 ScopedLock lock(this); 486 ScopedLock lock(this);
509 if (!IsClosing()) { 487 if (!IsClosing()) {
510 // Update the accepted socket to support the full range of API calls. 488 // Update the accepted socket to support the full range of API calls.
511 SOCKET s = socket(); 489 SOCKET s = socket();
512 int rc = setsockopt(buffer->client(), 490 int rc = setsockopt(buffer->client(),
513 SOL_SOCKET, 491 SOL_SOCKET,
514 SO_UPDATE_ACCEPT_CONTEXT, 492 SO_UPDATE_ACCEPT_CONTEXT,
515 reinterpret_cast<char*>(&s), sizeof(s)); 493 reinterpret_cast<char*>(&s), sizeof(s));
516 if (rc == NO_ERROR) { 494 if (rc == NO_ERROR) {
517 // Insert the accepted socket into the list. 495 // Insert the accepted socket into the list.
518 ClientSocket* client_socket = new ClientSocket(buffer->client(), 0); 496 ClientSocket* client_socket = new ClientSocket(buffer->client());
519 client_socket->mark_connected(); 497 client_socket->mark_connected();
520 client_socket->CreateCompletionPort(completion_port); 498 client_socket->CreateCompletionPort(completion_port);
521 if (accepted_head_ == NULL) { 499 if (accepted_head_ == NULL) {
522 accepted_head_ = client_socket; 500 accepted_head_ = client_socket;
523 accepted_tail_ = client_socket; 501 accepted_tail_ = client_socket;
524 } else { 502 } else {
525 ASSERT(accepted_tail_ != NULL); 503 ASSERT(accepted_tail_ != NULL);
526 accepted_tail_->set_next(client_socket); 504 accepted_tail_->set_next(client_socket);
527 accepted_tail_ = client_socket; 505 accepted_tail_ = client_socket;
528 } 506 }
507 accepted_count_++;
529 } else { 508 } else {
530 closesocket(buffer->client()); 509 closesocket(buffer->client());
531 } 510 }
532 } else { 511 } else {
533 // Close the socket, as it's already accepted. 512 // Close the socket, as it's already accepted.
534 closesocket(buffer->client()); 513 closesocket(buffer->client());
535 } 514 }
536 515
537 pending_accept_count_--; 516 pending_accept_count_--;
538 OverlappedBuffer::DisposeBuffer(buffer); 517 OverlappedBuffer::DisposeBuffer(buffer);
539 } 518 }
540 519
541 520
542 static void DeleteIfClosed(Handle* handle) { 521 static void DeleteIfClosed(Handle* handle) {
543 if (handle->IsClosed()) { 522 if (handle->IsClosed()) {
544 Dart_Port port = handle->port(); 523 handle->SendToAll(1 << kDestroyedEvent);
545 delete handle; 524 delete handle;
546 if (port != ILLEGAL_PORT) {
547 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
548 }
549 } 525 }
550 } 526 }
551 527
552 528
553 void ListenSocket::DoClose() { 529 void ListenSocket::DoClose() {
554 closesocket(socket()); 530 closesocket(socket());
555 handle_ = INVALID_HANDLE_VALUE; 531 handle_ = INVALID_HANDLE_VALUE;
556 while (CanAccept()) { 532 while (CanAccept()) {
557 // Get rid of connections already accepted. 533 // Get rid of connections already accepted.
558 ClientSocket *client = Accept(); 534 ClientSocket *client = Accept();
559 if (client != NULL) { 535 if (client != NULL) {
560 client->Close(); 536 client->Close();
561 DeleteIfClosed(client); 537 DeleteIfClosed(client);
562 } else { 538 } else {
563 break; 539 break;
564 } 540 }
565 } 541 }
566 } 542 }
567 543
568 544
569 bool ListenSocket::CanAccept() { 545 bool ListenSocket::CanAccept() {
570 ScopedLock lock(this); 546 ScopedLock lock(this);
571 return accepted_head_ != NULL; 547 return accepted_head_ != NULL;
572 } 548 }
573 549
574 550
575 ClientSocket* ListenSocket::Accept() { 551 ClientSocket* ListenSocket::Accept() {
576 ScopedLock lock(this); 552 ScopedLock lock(this);
577 if (accepted_head_ == NULL) return NULL; 553
578 ClientSocket* result = accepted_head_; 554 ClientSocket *result = NULL;
579 accepted_head_ = accepted_head_->next(); 555
580 if (accepted_head_ == NULL) accepted_tail_ = NULL; 556 if (accepted_head_ != NULL) {
581 result->set_next(NULL); 557 result = accepted_head_;
558 accepted_head_ = accepted_head_->next();
559 if (accepted_head_ == NULL) accepted_tail_ = NULL;
560 result->set_next(NULL);
561 accepted_count_--;
562 }
563
582 if (!IsClosing()) { 564 if (!IsClosing()) {
583 if (!IssueAccept()) { 565 if (!IssueAccept()) {
584 HandleError(this); 566 HandleError(this);
585 } 567 }
586 } 568 }
569
587 return result; 570 return result;
588 } 571 }
589 572
590 573
591 void ListenSocket::EnsureInitialized( 574 void ListenSocket::EnsureInitialized(
592 EventHandlerImplementation* event_handler) { 575 EventHandlerImplementation* event_handler) {
593 ScopedLock lock(this); 576 ScopedLock lock(this);
594 if (AcceptEx_ == NULL) { 577 if (AcceptEx_ == NULL) {
595 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); 578 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
596 ASSERT(event_handler_ == NULL); 579 ASSERT(event_handler_ == NULL);
(...skipping 276 matching lines...) Expand 10 before | Expand all | Expand 10 after
873 856
874 void ClientSocket::IssueDisconnect() { 857 void ClientSocket::IssueDisconnect() {
875 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer(); 858 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
876 BOOL ok = DisconnectEx_( 859 BOOL ok = DisconnectEx_(
877 socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0); 860 socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
878 // DisconnectEx works like other OverlappedIO APIs, where we can get either an 861 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
879 // immediate success or delayed operation by WSA_IO_PENDING being set. 862 // immediate success or delayed operation by WSA_IO_PENDING being set.
880 if (ok || WSAGetLastError() != WSA_IO_PENDING) { 863 if (ok || WSAGetLastError() != WSA_IO_PENDING) {
881 DisconnectComplete(buffer); 864 DisconnectComplete(buffer);
882 } 865 }
883 Dart_Port p = port(); 866 if (HasNextPort()) {
884 if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent); 867 Dart_Port p = NextPort();
885 port_ = ILLEGAL_PORT; 868 DartUtils::PostInt32(p, 1 << kDestroyedEvent);
869 RemovePort(p);
870 }
886 } 871 }
887 872
888 873
889 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) { 874 void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
890 OverlappedBuffer::DisposeBuffer(buffer); 875 OverlappedBuffer::DisposeBuffer(buffer);
891 closesocket(socket()); 876 closesocket(socket());
892 if (data_ready_ != NULL) { 877 if (data_ready_ != NULL) {
893 OverlappedBuffer::DisposeBuffer(data_ready_); 878 OverlappedBuffer::DisposeBuffer(data_ready_);
894 } 879 }
895 closed_ = true; 880 closed_ = true;
896 } 881 }
897 882
898 883
899 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) { 884 void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
900 OverlappedBuffer::DisposeBuffer(buffer); 885 OverlappedBuffer::DisposeBuffer(buffer);
901 // Update socket to support full socket API, after ConnectEx completed. 886 // Update socket to support full socket API, after ConnectEx completed.
902 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 887 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
903 Dart_Port p = port(); 888 if (HasNextPort()) {
904 if (p != ILLEGAL_PORT) {
905 // If the port is set, we already listen for this socket in Dart. 889 // If the port is set, we already listen for this socket in Dart.
906 // Handle the cases here. 890 // Handle the cases here.
907 if (!IsClosedRead()) { 891 if (!IsClosedRead()) {
908 IssueRead(); 892 IssueRead();
909 } 893 }
910 if (!IsClosedWrite()) { 894 if (!IsClosedWrite()) {
911 DartUtils::PostInt32(p, 1 << kOutEvent); 895 DartUtils::PostInt32(NextPort(), 1 << kOutEvent);
912 } 896 }
913 } 897 }
914 } 898 }
915 899
916 900
917 void ClientSocket::EnsureInitialized( 901 void ClientSocket::EnsureInitialized(
918 EventHandlerImplementation* event_handler) { 902 EventHandlerImplementation* event_handler) {
919 ScopedLock lock(this); 903 ScopedLock lock(this);
920 if (completion_port_ == INVALID_HANDLE_VALUE) { 904 if (completion_port_ == INVALID_HANDLE_VALUE) {
921 ASSERT(event_handler_ == NULL); 905 ASSERT(event_handler_ == NULL);
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
1004 // Just close the socket. This will cause any queued requests to be aborted. 988 // Just close the socket. This will cause any queued requests to be aborted.
1005 closesocket(socket()); 989 closesocket(socket());
1006 MarkClosedRead(); 990 MarkClosedRead();
1007 MarkClosedWrite(); 991 MarkClosedWrite();
1008 handle_ = INVALID_HANDLE_VALUE; 992 handle_ = INVALID_HANDLE_VALUE;
1009 } 993 }
1010 994
1011 995
1012 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { 996 void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1013 ASSERT(this != NULL); 997 ASSERT(this != NULL);
1014 if (msg->id == kTimeoutId) { 998 if (msg->id == kTimerId) {
1015 // Change of timeout request. Just set the new timeout and port as the 999 // Change of timeout request. Just set the new timeout and port as the
1016 // completion thread will use the new timeout value for its next wait. 1000 // completion thread will use the new timeout value for its next wait.
1017 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); 1001 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1018 } else if (msg->id == kShutdownId) { 1002 } else if (msg->id == kShutdownId) {
1019 shutdown_ = true; 1003 shutdown_ = true;
1020 } else { 1004 } else {
1021 // No tokens to return on Windows.
1022 if ((msg->data & (1 << kReturnTokenCommand)) != 0) return;
1023 Handle* handle = reinterpret_cast<Handle*>(msg->id); 1005 Handle* handle = reinterpret_cast<Handle*>(msg->id);
1024 ASSERT(handle != NULL); 1006 ASSERT(handle != NULL);
1007
1025 if (handle->is_listen_socket()) { 1008 if (handle->is_listen_socket()) {
1026 ListenSocket* listen_socket = 1009 ListenSocket* listen_socket =
1027 reinterpret_cast<ListenSocket*>(handle); 1010 reinterpret_cast<ListenSocket*>(handle);
1028 listen_socket->EnsureInitialized(this); 1011 listen_socket->EnsureInitialized(this);
1029 listen_socket->SetPortAndMask(msg->dart_port, msg->data);
1030 1012
1031 Handle::ScopedLock lock(listen_socket); 1013 Handle::ScopedLock lock(listen_socket);
1032 1014
1033 // If incoming connections are requested make sure to post already 1015 // If incoming connections are requested make sure to post already
1034 // accepted connections. 1016 // accepted connections.
1035 if ((msg->data & (1 << kInEvent)) != 0) { 1017 if ((msg->data & (1 << kInEvent)) != 0) {
1036 if (listen_socket->CanAccept()) { 1018 listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
1037 int event_mask = (1 << kInEvent); 1019 TryDispatchingPendingAccepts(listen_socket);
1038 handle->set_mask(handle->mask() & ~event_mask); 1020 }
1039 DartUtils::PostInt32(handle->port(), event_mask); 1021
1022 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1023 int count = TOKEN_COUNT(msg->data);
1024 listen_socket->ReturnTokens(msg->dart_port, count);
1025 TryDispatchingPendingAccepts(listen_socket);
1026 return;
1027 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1028 Dart_Port port = msg->dart_port;
1029 listen_socket->RemovePort(port);
1030
1031 MutexLocker locker(globalTcpListeningSocketRegistry.mutex());
1032 if (globalTcpListeningSocketRegistry.CloseSafe(
1033 reinterpret_cast<intptr_t>(listen_socket))) {
1034 handle->Close();
1040 } 1035 }
1036 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
1041 } 1037 }
1042 } else { 1038 } else {
1043 handle->EnsureInitialized(this); 1039 handle->EnsureInitialized(this);
1044 1040
1045 Handle::ScopedLock lock(handle); 1041 Handle::ScopedLock lock(handle);
1046 1042
1043 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1044 int count = TOKEN_COUNT(msg->data);
1045 handle->ReturnTokens(msg->dart_port, count);
1046 // TODO(kustermann): How can we continue with sending events
1047 // to dart from here?
1048 return;
1049 }
1050
1047 // Only set mask if we turned on kInEvent or kOutEvent. 1051 // Only set mask if we turned on kInEvent or kOutEvent.
1048 if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) { 1052 if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) {
1049 handle->SetPortAndMask(msg->dart_port, msg->data); 1053 handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
1050 } 1054 }
1051 1055
1052 // Issue a read. 1056 // Issue a read.
1053 if ((msg->data & (1 << kInEvent)) != 0) { 1057 if ((msg->data & (1 << kInEvent)) != 0) {
1054 if (handle->is_datagram_socket()) { 1058 if (handle->is_datagram_socket()) {
1055 handle->IssueRecvFrom(); 1059 handle->IssueRecvFrom();
1056 } else if (handle->is_client_socket()) { 1060 } else if (handle->is_client_socket()) {
1057 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { 1061 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1058 handle->IssueRead(); 1062 handle->IssueRead();
1059 } 1063 }
1060 } else { 1064 } else {
1061 handle->IssueRead(); 1065 handle->IssueRead();
1062 } 1066 }
1063 } 1067 }
1064 1068
1065 // If out events (can write events) have been requested, and there 1069 // If out events (can write events) have been requested, and there
1066 // are no pending writes, meaning any writes are already complete, 1070 // are no pending writes, meaning any writes are already complete,
1067 // post an out event immediately. 1071 // post an out event immediately.
1068 if ((msg->data & (1 << kOutEvent)) != 0) { 1072 if ((msg->data & (1 << kOutEvent)) != 0) {
1069 if (!handle->HasPendingWrite()) { 1073 if (!handle->HasPendingWrite()) {
1070 if (handle->is_client_socket()) { 1074 if (handle->is_client_socket()) {
1071 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) { 1075 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1072 DartUtils::PostInt32(handle->port(), 1 << kOutEvent); 1076 DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
1073 } 1077 }
1074 } else { 1078 } else {
1075 DartUtils::PostInt32(handle->port(), 1 << kOutEvent); 1079 DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
1076 } 1080 }
1077 } 1081 }
1078 } 1082 }
1079 1083
1080 if (handle->is_client_socket()) { 1084 if (handle->is_client_socket()) {
1081 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle); 1085 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1082 if ((msg->data & (1 << kShutdownReadCommand)) != 0) { 1086 if ((msg->data & (1 << kShutdownReadCommand)) != 0) {
1083 client_socket->Shutdown(SD_RECEIVE); 1087 client_socket->Shutdown(SD_RECEIVE);
1084 } 1088 }
1085 1089
1086 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) { 1090 if ((msg->data & (1 << kShutdownWriteCommand)) != 0) {
1087 client_socket->Shutdown(SD_SEND); 1091 client_socket->Shutdown(SD_SEND);
1088 } 1092 }
1089 } 1093 }
1094
1095 if ((msg->data & (1 << kCloseCommand)) != 0) {
1096 handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
1097 handle->Close();
1098 }
1090 } 1099 }
1091
1092 if ((msg->data & (1 << kCloseCommand)) != 0) {
1093 handle->SetPortAndMask(msg->dart_port, msg->data);
1094 handle->Close();
1095 }
1096
1097 DeleteIfClosed(handle); 1100 DeleteIfClosed(handle);
1098 } 1101 }
1099 } 1102 }
1100 1103
1101 1104
1102 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, 1105 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1103 OverlappedBuffer* buffer) { 1106 OverlappedBuffer* buffer) {
1104 listen_socket->AcceptComplete(buffer, completion_port_); 1107 listen_socket->AcceptComplete(buffer, completion_port_);
1105 1108
1106 if (!listen_socket->IsClosing()) { 1109 TryDispatchingPendingAccepts(listen_socket);
1107 int event_mask = 1 << kInEvent; 1110
1108 if ((listen_socket->mask() & event_mask) != 0) { 1111 DeleteIfClosed(listen_socket);
1109 DartUtils::PostInt32(listen_socket->port(), event_mask); 1112 }
1113
1114
1115 void EventHandlerImplementation::TryDispatchingPendingAccepts(
1116 ListenSocket *listen_socket) {
1117 Handle::ScopedLock lock(listen_socket);
1118
1119 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1120 for (int i = 0;
1121 i < listen_socket->accepted_count() && listen_socket->HasNextPort();
1122 i++) {
1123 Dart_Port port = listen_socket->NextPort();
1124 DartUtils::PostInt32(port, 1 << kInEvent);
1125 if (listen_socket->TakeToken()) {
1126 break;
1127 }
1110 } 1128 }
1111 } 1129 }
1112
1113 DeleteIfClosed(listen_socket);
1114 } 1130 }
1115 1131
1116 1132
1117 void EventHandlerImplementation::HandleRead(Handle* handle, 1133 void EventHandlerImplementation::HandleRead(Handle* handle,
1118 int bytes, 1134 int bytes,
1119 OverlappedBuffer* buffer) { 1135 OverlappedBuffer* buffer) {
1120 buffer->set_data_length(bytes); 1136 buffer->set_data_length(bytes);
1121 handle->ReadComplete(buffer); 1137 handle->ReadComplete(buffer);
1122 if (bytes > 0) { 1138 if (bytes > 0) {
1123 if (!handle->IsClosing()) { 1139 if (!handle->IsClosing()) {
1124 int event_mask = 1 << kInEvent; 1140 int event_mask = 1 << kInEvent;
1125 if ((handle->mask() & event_mask) != 0) { 1141 if ((handle->Mask() & event_mask) != 0) {
1126 DartUtils::PostInt32(handle->port(), event_mask); 1142 DartUtils::PostInt32(handle->NextPort(), event_mask);
1127 } 1143 }
1128 } 1144 }
1129 } else { 1145 } else {
1130 handle->MarkClosedRead(); 1146 handle->MarkClosedRead();
1131 if (bytes == 0) { 1147 if (bytes == 0) {
1132 HandleClosed(handle); 1148 HandleClosed(handle);
1133 } else { 1149 } else {
1134 HandleError(handle); 1150 HandleError(handle);
1135 } 1151 }
1136 } 1152 }
1137 1153
1138 DeleteIfClosed(handle); 1154 DeleteIfClosed(handle);
1139 } 1155 }
1140 1156
1141 1157
1142 void EventHandlerImplementation::HandleRecvFrom(Handle* handle, 1158 void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1143 int bytes, 1159 int bytes,
1144 OverlappedBuffer* buffer) { 1160 OverlappedBuffer* buffer) {
1145 ASSERT(handle->is_datagram_socket()); 1161 ASSERT(handle->is_datagram_socket());
1146 buffer->set_data_length(bytes); 1162 buffer->set_data_length(bytes);
1147 handle->ReadComplete(buffer); 1163 handle->ReadComplete(buffer);
1148 if (!handle->IsClosing()) { 1164 if (!handle->IsClosing()) {
1149 int event_mask = 1 << kInEvent; 1165 int event_mask = 1 << kInEvent;
1150 if ((handle->mask() & event_mask) != 0) { 1166 if ((handle->Mask() & event_mask) != 0) {
1151 DartUtils::PostInt32(handle->port(), event_mask); 1167 DartUtils::PostInt32(handle->NextPort(), event_mask);
1152 } 1168 }
1153 } 1169 }
1154 1170
1155 DeleteIfClosed(handle); 1171 DeleteIfClosed(handle);
1156 } 1172 }
1157 1173
1158 1174
1159 void EventHandlerImplementation::HandleWrite(Handle* handle, 1175 void EventHandlerImplementation::HandleWrite(Handle* handle,
1160 int bytes, 1176 int bytes,
1161 OverlappedBuffer* buffer) { 1177 OverlappedBuffer* buffer) {
1162 handle->WriteComplete(buffer); 1178 handle->WriteComplete(buffer);
1163 1179
1164 if (bytes >= 0) { 1180 if (bytes >= 0) {
1165 if (!handle->IsError() && !handle->IsClosing()) { 1181 if (!handle->IsError() && !handle->IsClosing()) {
1166 int event_mask = 1 << kOutEvent; 1182 int event_mask = 1 << kOutEvent;
1167 ASSERT(!handle->is_client_socket() || 1183 ASSERT(!handle->is_client_socket() ||
1168 reinterpret_cast<ClientSocket*>(handle)->is_connected()); 1184 reinterpret_cast<ClientSocket*>(handle)->is_connected());
1169 if ((handle->mask() & event_mask) != 0) { 1185 if ((handle->Mask() & event_mask) != 0) {
1170 DartUtils::PostInt32(handle->port(), event_mask); 1186 DartUtils::PostInt32(handle->NextPort(), event_mask);
1171 } 1187 }
1172 } 1188 }
1173 } else { 1189 } else {
1174 HandleError(handle); 1190 HandleError(handle);
1175 } 1191 }
1176 1192
1177 DeleteIfClosed(handle); 1193 DeleteIfClosed(handle);
1178 } 1194 }
1179 1195
1180 1196
(...skipping 162 matching lines...) Expand 10 before | Expand all | Expand 10 after
1343 } else { 1359 } else {
1344 handler_impl->HandleIOCompletion(bytes, key, overlapped); 1360 handler_impl->HandleIOCompletion(bytes, key, overlapped);
1345 } 1361 }
1346 } 1362 }
1347 delete handler; 1363 delete handler;
1348 } 1364 }
1349 1365
1350 1366
1351 void EventHandlerImplementation::Start(EventHandler* handler) { 1367 void EventHandlerImplementation::Start(EventHandler* handler) {
1352 int result = Thread::Start(EventHandlerEntry, 1368 int result = Thread::Start(EventHandlerEntry,
1353 reinterpret_cast<uword>(handler)); 1369 reinterpret_cast<uword>(handler));
1354 if (result != 0) { 1370 if (result != 0) {
1355 FATAL1("Failed to start event handler thread %d", result); 1371 FATAL1("Failed to start event handler thread %d", result);
1356 } 1372 }
1357 1373
1358 // Initialize Winsock32 1374 // Initialize Winsock32
1359 if (!Socket::Initialize()) { 1375 if (!Socket::Initialize()) {
1360 FATAL("Failed to initialized Windows sockets"); 1376 FATAL("Failed to initialized Windows sockets");
1361 } 1377 }
1362 } 1378 }
1363 1379
1364 1380
1365 void EventHandlerImplementation::Shutdown() { 1381 void EventHandlerImplementation::Shutdown() {
1366 SendData(kShutdownId, 0, 0); 1382 SendData(kShutdownId, 0, 0);
1367 } 1383 }
1368 1384
1369 } // namespace bin 1385 } // namespace bin
1370 } // namespace dart 1386 } // namespace dart
1371 1387
1372 #endif // defined(TARGET_OS_WINDOWS) 1388 #endif // defined(TARGET_OS_WINDOWS)
OLDNEW
« no previous file with comments | « dart/runtime/bin/eventhandler_win.h ('k') | dart/runtime/bin/io_natives.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698