Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Side by Side Diff: runtime/bin/eventhandler_win.cc

Issue 1291163002: Join embeder threads on Windows. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: simplify locking Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/globals.h" 5 #include "platform/globals.h"
6 #if defined(TARGET_OS_WINDOWS) 6 #if defined(TARGET_OS_WINDOWS)
7 7
8 #include "bin/eventhandler.h" 8 #include "bin/eventhandler.h"
9 #include "bin/eventhandler_win.h" 9 #include "bin/eventhandler_win.h"
10 10
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 114
115 Handle::Handle(intptr_t handle) 115 Handle::Handle(intptr_t handle)
116 : DescriptorInfoBase(handle), 116 : DescriptorInfoBase(handle),
117 handle_(reinterpret_cast<HANDLE>(handle)), 117 handle_(reinterpret_cast<HANDLE>(handle)),
118 completion_port_(INVALID_HANDLE_VALUE), 118 completion_port_(INVALID_HANDLE_VALUE),
119 event_handler_(NULL), 119 event_handler_(NULL),
120 data_ready_(NULL), 120 data_ready_(NULL),
121 pending_read_(NULL), 121 pending_read_(NULL),
122 pending_write_(NULL), 122 pending_write_(NULL),
123 last_error_(NOERROR), 123 last_error_(NOERROR),
124 flags_(0) { 124 flags_(0),
125 InitializeCriticalSection(&cs_); 125 read_thread_id_(Thread::kInvalidThreadId),
126 read_thread_starting_(false),
127 read_thread_finished_(false),
128 monitor_(new Monitor()) {
126 } 129 }
127 130
128 131
129 Handle::~Handle() { 132 Handle::~Handle() {
130 DeleteCriticalSection(&cs_); 133 delete monitor_;
131 } 134 }
132 135
133 136
134 void Handle::Lock() {
135 EnterCriticalSection(&cs_);
136 }
137
138
139 void Handle::Unlock() {
140 LeaveCriticalSection(&cs_);
141 }
142
143
144 bool Handle::CreateCompletionPort(HANDLE completion_port) { 137 bool Handle::CreateCompletionPort(HANDLE completion_port) {
145 completion_port_ = CreateIoCompletionPort(handle(), 138 completion_port_ = CreateIoCompletionPort(handle(),
146 completion_port, 139 completion_port,
147 reinterpret_cast<ULONG_PTR>(this), 140 reinterpret_cast<ULONG_PTR>(this),
148 0); 141 0);
149 if (completion_port_ == NULL) { 142 if (completion_port_ == NULL) {
150 return false; 143 return false;
151 } 144 }
152 return true; 145 return true;
153 } 146 }
154 147
155 148
156 void Handle::Close() { 149 void Handle::Close() {
157 ScopedLock lock(this); 150 MonitorLocker ml(monitor_);
158 if (!IsClosing()) { 151 if (!IsClosing()) {
159 // Close the socket and set the closing state. This close method can be 152 // Close the socket and set the closing state. This close method can be
160 // called again if this socket has pending IO operations in flight. 153 // called again if this socket has pending IO operations in flight.
161 MarkClosing(); 154 MarkClosing();
162 // Perform handle type specific closing. 155 // Perform handle type specific closing.
163 DoClose(); 156 DoClose();
164 } 157 }
165 ASSERT(IsHandleClosed()); 158 ASSERT(IsHandleClosed());
166 } 159 }
167 160
168 161
169 void Handle::DoClose() { 162 void Handle::DoClose() {
170 if (!IsHandleClosed()) { 163 if (!IsHandleClosed()) {
171 CloseHandle(handle_); 164 CloseHandle(handle_);
172 handle_ = INVALID_HANDLE_VALUE; 165 handle_ = INVALID_HANDLE_VALUE;
173 } 166 }
174 } 167 }
175 168
176 169
177 bool Handle::HasPendingRead() { 170 bool Handle::HasPendingRead() {
178 ScopedLock lock(this); 171 MonitorLocker ml(monitor_);
179 return pending_read_ != NULL; 172 return pending_read_ != NULL;
180 } 173 }
181 174
182 175
183 bool Handle::HasPendingWrite() { 176 bool Handle::HasPendingWrite() {
184 ScopedLock lock(this); 177 MonitorLocker ml(monitor_);
185 return pending_write_ != NULL; 178 return pending_write_ != NULL;
186 } 179 }
187 180
188 181
189 void Handle::ReadComplete(OverlappedBuffer* buffer) { 182 void Handle::WaitForReadThreadFinished() {
190 ScopedLock lock(this); 183 // Join the Reader thread if there is one.
191 // Currently only one outstanding read at the time. 184 ThreadId to_join = Thread::kInvalidThreadId;
192 ASSERT(pending_read_ == buffer); 185 {
193 ASSERT(data_ready_ == NULL); 186 MonitorLocker ml(monitor_);
194 if (!IsClosing() && !buffer->IsEmpty()) { 187 if (read_thread_id_ != Thread::kInvalidThreadId) {
195 data_ready_ = pending_read_; 188 while (!read_thread_finished_) {
196 } else { 189 ml.Wait();
197 OverlappedBuffer::DisposeBuffer(buffer); 190 }
191 read_thread_finished_ = false;
192 to_join = read_thread_id_;
193 read_thread_id_ = Thread::kInvalidThreadId;
194 }
198 } 195 }
199 pending_read_ = NULL; 196 if (to_join != Thread::kInvalidThreadId) {
197 Thread::Join(to_join);
198 }
200 } 199 }
201 200
202 201
202 void Handle::ReadComplete(OverlappedBuffer* buffer) {
203 {
204 MonitorLocker ml(monitor_);
205 // Currently only one outstanding read at the time.
206 ASSERT(pending_read_ == buffer);
207 ASSERT(data_ready_ == NULL);
208 if (!IsClosing() && !buffer->IsEmpty()) {
209 data_ready_ = pending_read_;
210 } else {
211 OverlappedBuffer::DisposeBuffer(buffer);
212 }
213 pending_read_ = NULL;
214 }
215 WaitForReadThreadFinished();
216 }
217
218
203 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { 219 void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
204 ReadComplete(buffer); 220 ReadComplete(buffer);
205 } 221 }
206 222
207 223
208 void Handle::WriteComplete(OverlappedBuffer* buffer) { 224 void Handle::WriteComplete(OverlappedBuffer* buffer) {
209 ScopedLock lock(this); 225 MonitorLocker ml(monitor_);
210 // Currently only one outstanding write at the time. 226 // Currently only one outstanding write at the time.
211 ASSERT(pending_write_ == buffer); 227 ASSERT(pending_write_ == buffer);
212 OverlappedBuffer::DisposeBuffer(buffer); 228 OverlappedBuffer::DisposeBuffer(buffer);
213 pending_write_ = NULL; 229 pending_write_ = NULL;
214 } 230 }
215 231
216 232
217 static void ReadFileThread(uword args) { 233 static void ReadFileThread(uword args) {
218 Handle* handle = reinterpret_cast<Handle*>(args); 234 Handle* handle = reinterpret_cast<Handle*>(args);
219 handle->ReadSyncCompleteAsync(); 235 handle->ReadSyncCompleteAsync();
220 } 236 }
221 237
222 238
223 void Handle::ReadSyncCompleteAsync() { 239 void Handle::ReadSyncCompleteAsync() {
240 NotifyReadThreadStarted();
224 ASSERT(pending_read_ != NULL); 241 ASSERT(pending_read_ != NULL);
225 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); 242 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
226 243
227 DWORD buffer_size = pending_read_->GetBufferSize(); 244 DWORD buffer_size = pending_read_->GetBufferSize();
228 if (GetFileType(handle_) == FILE_TYPE_CHAR) { 245 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
229 buffer_size = kStdOverlappedBufferSize; 246 buffer_size = kStdOverlappedBufferSize;
230 } 247 }
248 char* buffer_start = pending_read_->GetBufferStart();
231 DWORD bytes_read = 0; 249 DWORD bytes_read = 0;
232 BOOL ok = ReadFile(handle_, 250 BOOL ok = ReadFile(handle_,
233 pending_read_->GetBufferStart(), 251 buffer_start,
234 buffer_size, 252 buffer_size,
235 &bytes_read, 253 &bytes_read,
236 NULL); 254 NULL);
237 if (!ok) { 255 if (!ok) {
238 bytes_read = 0; 256 bytes_read = 0;
239 } 257 }
240 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); 258 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
241 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), 259 ok = PostQueuedCompletionStatus(event_handler_->completion_port(),
242 bytes_read, 260 bytes_read,
243 reinterpret_cast<ULONG_PTR>(this), 261 reinterpret_cast<ULONG_PTR>(this),
244 overlapped); 262 overlapped);
245 if (!ok) { 263 if (!ok) {
246 FATAL("PostQueuedCompletionStatus failed"); 264 FATAL("PostQueuedCompletionStatus failed");
247 } 265 }
266 NotifyReadThreadFinished();
267 }
268
269
270 void Handle::NotifyReadThreadStarted() {
271 MonitorLocker ml(monitor_);
272 ASSERT(!read_thread_starting_);
273 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
274 read_thread_id_ = Thread::GetCurrentThreadId();
275 read_thread_starting_ = true;
276 ml.Notify();
277 while (read_thread_starting_) {
278 ml.Wait();
279 }
280 }
281
282 void Handle::NotifyReadThreadFinished() {
283 MonitorLocker ml(monitor_);
284 ASSERT(!read_thread_finished_);
285 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
286 read_thread_finished_ = true;
287 ml.Notify();
288 }
289
290
291 void Handle::WaitForReadThreadStarted() {
292 while (!read_thread_starting_) {
293 monitor_->Wait(Monitor::kNoTimeout);
294 }
295 read_thread_starting_ = false;
296 monitor_->Notify();
297 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
248 } 298 }
249 299
250 300
251 bool Handle::IssueRead() { 301 bool Handle::IssueRead() {
Søren Gjesse 2015/08/27 13:28:24 MonitorLocker ml(monitor_); here.
zra 2015/08/27 14:04:51 The monitor is already entered at calls to non-soc
252 ScopedLock lock(this);
253 ASSERT(type_ != kListenSocket); 302 ASSERT(type_ != kListenSocket);
254 ASSERT(pending_read_ == NULL); 303 ASSERT(pending_read_ == NULL);
255 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); 304 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
256 if (SupportsOverlappedIO()) { 305 if (SupportsOverlappedIO()) {
257 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 306 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
258 307
259 BOOL ok = ReadFile(handle_, 308 BOOL ok = ReadFile(handle_,
260 buffer->GetBufferStart(), 309 buffer->GetBufferStart(),
261 buffer->GetBufferSize(), 310 buffer->GetBufferSize(),
262 NULL, 311 NULL,
263 buffer->GetCleanOverlapped()); 312 buffer->GetCleanOverlapped());
264 if (ok || GetLastError() == ERROR_IO_PENDING) { 313 if (ok || GetLastError() == ERROR_IO_PENDING) {
265 // Completing asynchronously. 314 // Completing asynchronously.
266 pending_read_ = buffer; 315 pending_read_ = buffer;
267 return true; 316 return true;
268 } 317 }
269 OverlappedBuffer::DisposeBuffer(buffer); 318 OverlappedBuffer::DisposeBuffer(buffer);
270 HandleIssueError(); 319 HandleIssueError();
271 return false; 320 return false;
272 } else { 321 } else {
273 // Completing asynchronously through thread. 322 // Completing asynchronously through thread.
274 pending_read_ = buffer; 323 pending_read_ = buffer;
275 int result = Thread::Start(ReadFileThread, 324 int result = Thread::Start(ReadFileThread,
276 reinterpret_cast<uword>(this)); 325 reinterpret_cast<uword>(this));
277 if (result != 0) { 326 if (result != 0) {
278 FATAL1("Failed to start read file thread %d", result); 327 FATAL1("Failed to start read file thread %d", result);
279 } 328 }
329 WaitForReadThreadStarted();
280 return true; 330 return true;
281 } 331 }
282 } 332 }
283 333
284 334
285 bool Handle::IssueRecvFrom() { 335 bool Handle::IssueRecvFrom() {
286 return false; 336 return false;
287 } 337 }
288 338
289 339
290 bool Handle::IssueWrite() { 340 bool Handle::IssueWrite() {
291 ScopedLock lock(this); 341 MonitorLocker ml(monitor_);
292 ASSERT(type_ != kListenSocket); 342 ASSERT(type_ != kListenSocket);
293 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 343 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
294 ASSERT(pending_write_ != NULL); 344 ASSERT(pending_write_ != NULL);
295 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); 345 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
296 346
297 OverlappedBuffer* buffer = pending_write_; 347 OverlappedBuffer* buffer = pending_write_;
298 BOOL ok = WriteFile(handle_, 348 BOOL ok = WriteFile(handle_,
299 buffer->GetBufferStart(), 349 buffer->GetBufferStart(),
300 buffer->GetBufferSize(), 350 buffer->GetBufferSize(),
301 NULL, 351 NULL,
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
338 if (error == ERROR_BROKEN_PIPE) { 388 if (error == ERROR_BROKEN_PIPE) {
339 HandleClosed(this); 389 HandleClosed(this);
340 } else { 390 } else {
341 HandleError(this); 391 HandleError(this);
342 } 392 }
343 SetLastError(error); 393 SetLastError(error);
344 } 394 }
345 395
346 396
347 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { 397 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
348 ScopedLock lock(this); 398 MonitorLocker ml(monitor_);
349 event_handler_ = event_handler; 399 event_handler_ = event_handler;
350 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { 400 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) {
351 CreateCompletionPort(event_handler_->completion_port()); 401 CreateCompletionPort(event_handler_->completion_port());
352 } 402 }
353 } 403 }
354 404
355 405
356 bool FileHandle::IsClosed() { 406 bool FileHandle::IsClosed() {
357 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); 407 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
358 } 408 }
359 409
360 410
361 void DirectoryWatchHandle::EnsureInitialized( 411 void DirectoryWatchHandle::EnsureInitialized(
362 EventHandlerImplementation* event_handler) { 412 EventHandlerImplementation* event_handler) {
363 ScopedLock lock(this); 413 MonitorLocker ml(monitor_);
364 event_handler_ = event_handler; 414 event_handler_ = event_handler;
365 if (completion_port_ == INVALID_HANDLE_VALUE) { 415 if (completion_port_ == INVALID_HANDLE_VALUE) {
366 CreateCompletionPort(event_handler_->completion_port()); 416 CreateCompletionPort(event_handler_->completion_port());
367 } 417 }
368 } 418 }
369 419
370 420
371 bool DirectoryWatchHandle::IsClosed() { 421 bool DirectoryWatchHandle::IsClosed() {
372 return IsClosing() && pending_read_ == NULL; 422 return IsClosing() && pending_read_ == NULL;
373 } 423 }
374 424
375 425
376 bool DirectoryWatchHandle::IssueRead() { 426 bool DirectoryWatchHandle::IssueRead() {
Søren Gjesse 2015/08/27 13:28:24 MonitorLocker ml(monitor_); here as well.
zra 2015/08/27 14:04:51 Ditto.
377 ScopedLock lock(this);
378 // It may have been started before, as we start the directory-handler when 427 // It may have been started before, as we start the directory-handler when
379 // we create it. 428 // we create it.
380 if (pending_read_ != NULL || data_ready_ != NULL) return true; 429 if (pending_read_ != NULL || data_ready_ != NULL) return true;
381 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); 430 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
382 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 431 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
383 BOOL ok = ReadDirectoryChangesW(handle_, 432 BOOL ok = ReadDirectoryChangesW(handle_,
384 buffer->GetBufferStart(), 433 buffer->GetBufferStart(),
385 buffer->GetBufferSize(), 434 buffer->GetBufferSize(),
386 recursive_, 435 recursive_,
387 events_, 436 events_,
388 NULL, 437 NULL,
389 buffer->GetCleanOverlapped(), 438 buffer->GetCleanOverlapped(),
390 NULL); 439 NULL);
391 if (ok || GetLastError() == ERROR_IO_PENDING) { 440 if (ok || GetLastError() == ERROR_IO_PENDING) {
392 // Completing asynchronously. 441 // Completing asynchronously.
393 pending_read_ = buffer; 442 pending_read_ = buffer;
394 return true; 443 return true;
395 } 444 }
396 OverlappedBuffer::DisposeBuffer(buffer); 445 OverlappedBuffer::DisposeBuffer(buffer);
397 return false; 446 return false;
398 } 447 }
399 448
400 449
401 void DirectoryWatchHandle::Stop() { 450 void DirectoryWatchHandle::Stop() {
402 ScopedLock lock(this); 451 MonitorLocker ml(monitor_);
403 // Stop the outstanding read, so we can close the handle. 452 // Stop the outstanding read, so we can close the handle.
404 453
405 if (pending_read_ != NULL) { 454 if (pending_read_ != NULL) {
406 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); 455 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
407 // Don't dispose of the buffer, as it will still complete (with length 0). 456 // Don't dispose of the buffer, as it will still complete (with length 0).
408 } 457 }
409 458
410 DoClose(); 459 DoClose();
411 } 460 }
412 461
(...skipping 23 matching lines...) Expand all
436 NULL, 485 NULL,
437 NULL); 486 NULL);
438 if (status == SOCKET_ERROR) { 487 if (status == SOCKET_ERROR) {
439 return false; 488 return false;
440 } 489 }
441 return true; 490 return true;
442 } 491 }
443 492
444 493
445 bool ListenSocket::IssueAccept() { 494 bool ListenSocket::IssueAccept() {
446 ScopedLock lock(this); 495 MonitorLocker ml(monitor_);
447 496
448 // For AcceptEx there needs to be buffer storage for address 497 // For AcceptEx there needs to be buffer storage for address
449 // information for two addresses (local and remote address). The 498 // information for two addresses (local and remote address). The
450 // AcceptEx documentation says: "This value must be at least 16 499 // AcceptEx documentation says: "This value must be at least 16
451 // bytes more than the maximum address length for the transport 500 // bytes more than the maximum address length for the transport
452 // protocol in use." 501 // protocol in use."
453 static const int kAcceptExAddressAdditionalBytes = 16; 502 static const int kAcceptExAddressAdditionalBytes = 16;
454 static const int kAcceptExAddressStorageSize = 503 static const int kAcceptExAddressStorageSize =
455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; 504 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
456 OverlappedBuffer* buffer = 505 OverlappedBuffer* buffer =
(...skipping 19 matching lines...) Expand all
476 } 525 }
477 526
478 pending_accept_count_++; 527 pending_accept_count_++;
479 528
480 return true; 529 return true;
481 } 530 }
482 531
483 532
484 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, 533 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
485 HANDLE completion_port) { 534 HANDLE completion_port) {
486 ScopedLock lock(this); 535 MonitorLocker ml(monitor_);
487 if (!IsClosing()) { 536 if (!IsClosing()) {
488 // Update the accepted socket to support the full range of API calls. 537 // Update the accepted socket to support the full range of API calls.
489 SOCKET s = socket(); 538 SOCKET s = socket();
490 int rc = setsockopt(buffer->client(), 539 int rc = setsockopt(buffer->client(),
491 SOL_SOCKET, 540 SOL_SOCKET,
492 SO_UPDATE_ACCEPT_CONTEXT, 541 SO_UPDATE_ACCEPT_CONTEXT,
493 reinterpret_cast<char*>(&s), sizeof(s)); 542 reinterpret_cast<char*>(&s), sizeof(s));
494 if (rc == NO_ERROR) { 543 if (rc == NO_ERROR) {
495 // Insert the accepted socket into the list. 544 // Insert the accepted socket into the list.
496 ClientSocket* client_socket = new ClientSocket(buffer->client()); 545 ClientSocket* client_socket = new ClientSocket(buffer->client());
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
537 client->Close(); 586 client->Close();
538 DeleteIfClosed(client); 587 DeleteIfClosed(client);
539 } else { 588 } else {
540 break; 589 break;
541 } 590 }
542 } 591 }
543 } 592 }
544 593
545 594
546 bool ListenSocket::CanAccept() { 595 bool ListenSocket::CanAccept() {
547 ScopedLock lock(this); 596 MonitorLocker ml(monitor_);
548 return accepted_head_ != NULL; 597 return accepted_head_ != NULL;
549 } 598 }
550 599
551 600
552 ClientSocket* ListenSocket::Accept() { 601 ClientSocket* ListenSocket::Accept() {
553 ScopedLock lock(this); 602 MonitorLocker ml(monitor_);
554 603
555 ClientSocket *result = NULL; 604 ClientSocket *result = NULL;
556 605
557 if (accepted_head_ != NULL) { 606 if (accepted_head_ != NULL) {
558 result = accepted_head_; 607 result = accepted_head_;
559 accepted_head_ = accepted_head_->next(); 608 accepted_head_ = accepted_head_->next();
560 if (accepted_head_ == NULL) accepted_tail_ = NULL; 609 if (accepted_head_ == NULL) accepted_tail_ = NULL;
561 result->set_next(NULL); 610 result->set_next(NULL);
562 accepted_count_--; 611 accepted_count_--;
563 } 612 }
564 613
565 if (pending_accept_count_ < 5) { 614 if (pending_accept_count_ < 5) {
566 // We have less than 5 pending accepts, queue another. 615 // We have less than 5 pending accepts, queue another.
567 if (!IsClosing()) { 616 if (!IsClosing()) {
568 if (!IssueAccept()) { 617 if (!IssueAccept()) {
569 HandleError(this); 618 HandleError(this);
570 } 619 }
571 } 620 }
572 } 621 }
573 622
574 return result; 623 return result;
575 } 624 }
576 625
577 626
578 void ListenSocket::EnsureInitialized( 627 void ListenSocket::EnsureInitialized(
579 EventHandlerImplementation* event_handler) { 628 EventHandlerImplementation* event_handler) {
580 ScopedLock lock(this); 629 MonitorLocker ml(monitor_);
581 if (AcceptEx_ == NULL) { 630 if (AcceptEx_ == NULL) {
582 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); 631 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
583 ASSERT(event_handler_ == NULL); 632 ASSERT(event_handler_ == NULL);
584 event_handler_ = event_handler; 633 event_handler_ = event_handler;
585 CreateCompletionPort(event_handler_->completion_port()); 634 CreateCompletionPort(event_handler_->completion_port());
586 LoadAcceptEx(); 635 LoadAcceptEx();
587 } 636 }
588 } 637 }
589 638
590 639
591 bool ListenSocket::IsClosed() { 640 bool ListenSocket::IsClosed() {
592 return IsClosing() && !HasPendingAccept(); 641 return IsClosing() && !HasPendingAccept();
593 } 642 }
594 643
595 644
596 intptr_t Handle::Available() { 645 intptr_t Handle::Available() {
597 ScopedLock lock(this); 646 MonitorLocker ml(monitor_);
598 if (data_ready_ == NULL) return 0; 647 if (data_ready_ == NULL) return 0;
599 ASSERT(!data_ready_->IsEmpty()); 648 ASSERT(!data_ready_->IsEmpty());
600 return data_ready_->GetRemainingLength(); 649 return data_ready_->GetRemainingLength();
601 } 650 }
602 651
603 652
604 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { 653 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
605 ScopedLock lock(this); 654 MonitorLocker ml(monitor_);
606 if (data_ready_ == NULL) return 0; 655 if (data_ready_ == NULL) return 0;
607 num_bytes = data_ready_->Read( 656 num_bytes = data_ready_->Read(
608 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); 657 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
609 if (data_ready_->IsEmpty()) { 658 if (data_ready_->IsEmpty()) {
610 OverlappedBuffer::DisposeBuffer(data_ready_); 659 OverlappedBuffer::DisposeBuffer(data_ready_);
611 data_ready_ = NULL; 660 data_ready_ = NULL;
612 if (!IsClosing() && !IsClosedRead()) IssueRead(); 661 if (!IsClosing() && !IsClosedRead()) IssueRead();
613 } 662 }
614 return num_bytes; 663 return num_bytes;
615 } 664 }
616 665
617 666
618 intptr_t Handle::RecvFrom( 667 intptr_t Handle::RecvFrom(
619 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { 668 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) {
620 ScopedLock lock(this); 669 MonitorLocker ml(monitor_);
621 if (data_ready_ == NULL) return 0; 670 if (data_ready_ == NULL) return 0;
622 num_bytes = data_ready_->Read( 671 num_bytes = data_ready_->Read(
623 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); 672 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
624 if (data_ready_->from()->sa_family == AF_INET) { 673 if (data_ready_->from()->sa_family == AF_INET) {
625 ASSERT(sa_len >= sizeof(struct sockaddr_in)); 674 ASSERT(sa_len >= sizeof(struct sockaddr_in));
626 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); 675 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
627 } else { 676 } else {
628 ASSERT(data_ready_->from()->sa_family == AF_INET6); 677 ASSERT(data_ready_->from()->sa_family == AF_INET6);
629 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); 678 ASSERT(sa_len >= sizeof(struct sockaddr_in6));
630 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); 679 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
631 } 680 }
632 // Always dispose of the buffer, as UDP messages must be read in their 681 // Always dispose of the buffer, as UDP messages must be read in their
633 // entirety to match how recvfrom works in a socket. 682 // entirety to match how recvfrom works in a socket.
634 OverlappedBuffer::DisposeBuffer(data_ready_); 683 OverlappedBuffer::DisposeBuffer(data_ready_);
635 data_ready_ = NULL; 684 data_ready_ = NULL;
636 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); 685 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom();
637 return num_bytes; 686 return num_bytes;
638 } 687 }
639 688
640 689
641 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { 690 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
642 ScopedLock lock(this); 691 MonitorLocker ml(monitor_);
643 if (pending_write_ != NULL) return 0; 692 if (pending_write_ != NULL) return 0;
644 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 693 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
645 ASSERT(SupportsOverlappedIO()); 694 ASSERT(SupportsOverlappedIO());
646 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; 695 if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
647 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); 696 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
648 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); 697 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
649 pending_write_->Write(buffer, truncated_bytes); 698 pending_write_->Write(buffer, truncated_bytes);
650 if (!IssueWrite()) return -1; 699 if (!IssueWrite()) return -1;
651 return truncated_bytes; 700 return truncated_bytes;
652 } 701 }
653 702
654 703
655 intptr_t Handle::SendTo(const void* buffer, 704 intptr_t Handle::SendTo(const void* buffer,
656 intptr_t num_bytes, 705 intptr_t num_bytes,
657 struct sockaddr* sa, 706 struct sockaddr* sa,
658 socklen_t sa_len) { 707 socklen_t sa_len) {
659 ScopedLock lock(this); 708 MonitorLocker ml(monitor_);
660 if (pending_write_ != NULL) return 0; 709 if (pending_write_ != NULL) return 0;
661 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 710 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
662 ASSERT(SupportsOverlappedIO()); 711 ASSERT(SupportsOverlappedIO());
663 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; 712 if (completion_port_ == INVALID_HANDLE_VALUE) return 0;
664 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); 713 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
665 pending_write_->Write(buffer, num_bytes); 714 pending_write_->Write(buffer, num_bytes);
666 if (!IssueSendTo(sa, sa_len)) return -1; 715 if (!IssueSendTo(sa, sa_len)) return -1;
667 return num_bytes; 716 return num_bytes;
668 } 717 }
669 718
670 719
671 static void WriteFileThread(uword args) { 720 static void WriteFileThread(uword args) {
672 StdHandle* handle = reinterpret_cast<StdHandle*>(args); 721 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
673 handle->RunWriteLoop(); 722 handle->RunWriteLoop();
674 } 723 }
675 724
676 725
677 void StdHandle::RunWriteLoop() { 726 void StdHandle::RunWriteLoop() {
678 write_monitor_->Enter(); 727 MonitorLocker ml(monitor_);
679 write_thread_running_ = true; 728 write_thread_running_ = true;
729 thread_id_ = Thread::GetCurrentThreadId();
680 // Notify we have started. 730 // Notify we have started.
681 write_monitor_->Notify(); 731 ml.Notify();
682 732
683 while (write_thread_running_) { 733 while (write_thread_running_) {
684 write_monitor_->Wait(Monitor::kNoTimeout); 734 ml.Wait(Monitor::kNoTimeout);
685 if (pending_write_ != NULL) { 735 if (pending_write_ != NULL) {
686 // We woke up and had a pending write. Execute it. 736 // We woke up and had a pending write. Execute it.
687 WriteSyncCompleteAsync(); 737 WriteSyncCompleteAsync();
688 } 738 }
689 } 739 }
690 740
691 write_thread_exists_ = false; 741 write_thread_exists_ = false;
692 write_monitor_->Notify(); 742 ml.Notify();
693 write_monitor_->Exit();
694 } 743 }
695 744
696 745
697 void StdHandle::WriteSyncCompleteAsync() { 746 void StdHandle::WriteSyncCompleteAsync() {
698 ASSERT(pending_write_ != NULL); 747 ASSERT(pending_write_ != NULL);
699 748
700 DWORD bytes_written = -1; 749 DWORD bytes_written = -1;
701 BOOL ok = WriteFile(handle_, 750 BOOL ok = WriteFile(handle_,
702 pending_write_->GetBufferStart(), 751 pending_write_->GetBufferStart(),
703 pending_write_->GetBufferSize(), 752 pending_write_->GetBufferSize(),
704 &bytes_written, 753 &bytes_written,
705 NULL); 754 NULL);
706 if (!ok) { 755 if (!ok) {
707 bytes_written = 0; 756 bytes_written = 0;
708 } 757 }
709 thread_wrote_ += bytes_written; 758 thread_wrote_ += bytes_written;
710 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); 759 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
711 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), 760 ok = PostQueuedCompletionStatus(event_handler_->completion_port(),
712 bytes_written, 761 bytes_written,
713 reinterpret_cast<ULONG_PTR>(this), 762 reinterpret_cast<ULONG_PTR>(this),
714 overlapped); 763 overlapped);
715 if (!ok) { 764 if (!ok) {
716 FATAL("PostQueuedCompletionStatus failed"); 765 FATAL("PostQueuedCompletionStatus failed");
717 } 766 }
718 } 767 }
719 768
720 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { 769 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
721 ScopedLock lock(this); 770 MonitorLocker ml(monitor_);
722 if (pending_write_ != NULL) return 0; 771 if (pending_write_ != NULL) return 0;
723 if (num_bytes > kBufferSize) num_bytes = kBufferSize; 772 if (num_bytes > kBufferSize) num_bytes = kBufferSize;
724 // In the case of stdout and stderr, OverlappedIO is not supported. 773 // In the case of stdout and stderr, OverlappedIO is not supported.
725 // Here we'll instead use a thread, to make it async. 774 // Here we'll instead use a thread, to make it async.
726 // This code is actually never exposed to the user, as stdout and stderr is 775 // This code is actually never exposed to the user, as stdout and stderr is
727 // not available as a RawSocket, but only wrapped in a Socket. 776 // not available as a RawSocket, but only wrapped in a Socket.
728 // Note that we return '0', unless a thread have already completed a write. 777 // Note that we return '0', unless a thread have already completed a write.
729 MonitorLocker locker(write_monitor_);
730 if (thread_wrote_ > 0) { 778 if (thread_wrote_ > 0) {
731 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; 779 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_;
732 thread_wrote_ -= num_bytes; 780 thread_wrote_ -= num_bytes;
733 return num_bytes; 781 return num_bytes;
734 } 782 }
735 if (!write_thread_exists_) { 783 if (!write_thread_exists_) {
736 write_thread_exists_ = true; 784 write_thread_exists_ = true;
737 int result = Thread::Start(WriteFileThread, 785 int result = Thread::Start(
738 reinterpret_cast<uword>(this)); 786 WriteFileThread, reinterpret_cast<uword>(this));
739 if (result != 0) { 787 if (result != 0) {
740 FATAL1("Failed to start write file thread %d", result); 788 FATAL1("Failed to start write file thread %d", result);
741 } 789 }
742 while (!write_thread_running_) { 790 while (!write_thread_running_) {
743 // Wait until we the thread is running. 791 // Wait until we the thread is running.
744 locker.Wait(Monitor::kNoTimeout); 792 ml.Wait(Monitor::kNoTimeout);
745 } 793 }
746 } 794 }
747 // Only queue up to INT_MAX bytes. 795 // Only queue up to INT_MAX bytes.
748 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); 796 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
749 // Create buffer and notify thread about the new handle. 797 // Create buffer and notify thread about the new handle.
750 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); 798 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
751 pending_write_->Write(buffer, truncated_bytes); 799 pending_write_->Write(buffer, truncated_bytes);
752 locker.Notify(); 800 ml.Notify();
753 return 0; 801 return 0;
754 } 802 }
755 803
756 804
757 void StdHandle::DoClose() { 805 void StdHandle::DoClose() {
758 MonitorLocker locker(write_monitor_); 806 MonitorLocker ml(monitor_);
759 if (write_thread_exists_) { 807 if (write_thread_exists_) {
760 write_thread_running_ = false; 808 write_thread_running_ = false;
761 locker.Notify(); 809 ml.Notify();
762 while (write_thread_exists_) { 810 while (write_thread_exists_) {
763 locker.Wait(Monitor::kNoTimeout); 811 ml.Wait(Monitor::kNoTimeout);
764 } 812 }
813 Thread::Join(thread_id_);
765 } 814 }
766 Handle::DoClose(); 815 Handle::DoClose();
767 } 816 }
768 817
769 818
770 bool ClientSocket::LoadDisconnectEx() { 819 bool ClientSocket::LoadDisconnectEx() {
771 // Load the DisconnectEx function into memory using WSAIoctl. 820 // Load the DisconnectEx function into memory using WSAIoctl.
772 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; 821 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
773 DWORD bytes; 822 DWORD bytes;
774 int status = WSAIoctl(socket(), 823 int status = WSAIoctl(socket(),
(...skipping 25 matching lines...) Expand all
800 849
801 void ClientSocket::DoClose() { 850 void ClientSocket::DoClose() {
802 // Always do a shutdown before initiating a disconnect. 851 // Always do a shutdown before initiating a disconnect.
803 shutdown(socket(), SD_BOTH); 852 shutdown(socket(), SD_BOTH);
804 IssueDisconnect(); 853 IssueDisconnect();
805 handle_ = INVALID_HANDLE_VALUE; 854 handle_ = INVALID_HANDLE_VALUE;
806 } 855 }
807 856
808 857
809 bool ClientSocket::IssueRead() { 858 bool ClientSocket::IssueRead() {
810 ScopedLock lock(this); 859 MonitorLocker ml(monitor_);
811 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 860 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
812 ASSERT(pending_read_ == NULL); 861 ASSERT(pending_read_ == NULL);
813 862
814 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can 863 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
815 // handle 64k datagrams. 864 // handle 64k datagrams.
816 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); 865 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
817 866
818 DWORD flags; 867 DWORD flags;
819 flags = 0; 868 flags = 0;
820 int rc = WSARecv(socket(), 869 int rc = WSARecv(socket(),
821 buffer->GetWASBUF(), 870 buffer->GetWASBUF(),
822 1, 871 1,
823 NULL, 872 NULL,
824 &flags, 873 &flags,
825 buffer->GetCleanOverlapped(), 874 buffer->GetCleanOverlapped(),
826 NULL); 875 NULL);
827 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 876 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
828 pending_read_ = buffer; 877 pending_read_ = buffer;
829 return true; 878 return true;
830 } 879 }
831 OverlappedBuffer::DisposeBuffer(buffer); 880 OverlappedBuffer::DisposeBuffer(buffer);
832 pending_read_ = NULL; 881 pending_read_ = NULL;
833 HandleIssueError(); 882 HandleIssueError();
834 return false; 883 return false;
835 } 884 }
836 885
837 886
838 bool ClientSocket::IssueWrite() { 887 bool ClientSocket::IssueWrite() {
839 ScopedLock lock(this); 888 MonitorLocker ml(monitor_);
840 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 889 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
841 ASSERT(pending_write_ != NULL); 890 ASSERT(pending_write_ != NULL);
842 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); 891 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
843 892
844 int rc = WSASend(socket(), 893 int rc = WSASend(socket(),
845 pending_write_->GetWASBUF(), 894 pending_write_->GetWASBUF(),
846 1, 895 1,
847 NULL, 896 NULL,
848 0, 897 0,
849 pending_write_->GetCleanOverlapped(), 898 pending_write_->GetCleanOverlapped(),
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
893 } 942 }
894 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { 943 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
895 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); 944 Dart_Port port = NextNotifyDartPort(1 << kOutEvent);
896 DartUtils::PostInt32(port, 1 << kOutEvent); 945 DartUtils::PostInt32(port, 1 << kOutEvent);
897 } 946 }
898 } 947 }
899 948
900 949
901 void ClientSocket::EnsureInitialized( 950 void ClientSocket::EnsureInitialized(
902 EventHandlerImplementation* event_handler) { 951 EventHandlerImplementation* event_handler) {
903 ScopedLock lock(this); 952 MonitorLocker ml(monitor_);
904 if (completion_port_ == INVALID_HANDLE_VALUE) { 953 if (completion_port_ == INVALID_HANDLE_VALUE) {
905 ASSERT(event_handler_ == NULL); 954 ASSERT(event_handler_ == NULL);
906 event_handler_ = event_handler; 955 event_handler_ = event_handler;
907 CreateCompletionPort(event_handler_->completion_port()); 956 CreateCompletionPort(event_handler_->completion_port());
908 } 957 }
909 } 958 }
910 959
911 960
912 bool ClientSocket::IsClosed() { 961 bool ClientSocket::IsClosed() {
913 return connected_ && closed_; 962 return connected_ && closed_;
914 } 963 }
915 964
916 965
917 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { 966 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
918 ScopedLock lock(this); 967 MonitorLocker ml(monitor_);
919 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 968 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
920 ASSERT(pending_write_ != NULL); 969 ASSERT(pending_write_ != NULL);
921 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); 970 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
922 971
923 int rc = WSASendTo(socket(), 972 int rc = WSASendTo(socket(),
924 pending_write_->GetWASBUF(), 973 pending_write_->GetWASBUF(),
925 1, 974 1,
926 NULL, 975 NULL,
927 0, 976 0,
928 sa, 977 sa,
929 sa_len, 978 sa_len,
930 pending_write_->GetCleanOverlapped(), 979 pending_write_->GetCleanOverlapped(),
931 NULL); 980 NULL);
932 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 981 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
933 return true; 982 return true;
934 } 983 }
935 OverlappedBuffer::DisposeBuffer(pending_write_); 984 OverlappedBuffer::DisposeBuffer(pending_write_);
936 pending_write_ = NULL; 985 pending_write_ = NULL;
937 HandleIssueError(); 986 HandleIssueError();
938 return false; 987 return false;
939 } 988 }
940 989
941 990
942 bool DatagramSocket::IssueRecvFrom() { 991 bool DatagramSocket::IssueRecvFrom() {
943 ScopedLock lock(this); 992 MonitorLocker ml(monitor_);
944 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); 993 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
945 ASSERT(pending_read_ == NULL); 994 ASSERT(pending_read_ == NULL);
946 995
947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); 996 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024);
948 997
949 DWORD flags; 998 DWORD flags;
950 flags = 0; 999 flags = 0;
951 int rc = WSARecvFrom(socket(), 1000 int rc = WSARecvFrom(socket(),
952 buffer->GetWASBUF(), 1001 buffer->GetWASBUF(),
953 1, 1002 1,
954 NULL, 1003 NULL,
955 &flags, 1004 &flags,
956 buffer->from(), 1005 buffer->from(),
957 buffer->from_len_addr(), 1006 buffer->from_len_addr(),
958 buffer->GetCleanOverlapped(), 1007 buffer->GetCleanOverlapped(),
959 NULL); 1008 NULL);
960 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { 1009 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) {
961 pending_read_ = buffer; 1010 pending_read_ = buffer;
962 return true; 1011 return true;
963 } 1012 }
964 OverlappedBuffer::DisposeBuffer(buffer); 1013 OverlappedBuffer::DisposeBuffer(buffer);
965 pending_read_ = NULL; 1014 pending_read_ = NULL;
966 HandleIssueError(); 1015 HandleIssueError();
967 return false; 1016 return false;
968 } 1017 }
969 1018
970 1019
971 void DatagramSocket::EnsureInitialized( 1020 void DatagramSocket::EnsureInitialized(
972 EventHandlerImplementation* event_handler) { 1021 EventHandlerImplementation* event_handler) {
973 ScopedLock lock(this); 1022 MonitorLocker ml(monitor_);
974 if (completion_port_ == INVALID_HANDLE_VALUE) { 1023 if (completion_port_ == INVALID_HANDLE_VALUE) {
975 ASSERT(event_handler_ == NULL); 1024 ASSERT(event_handler_ == NULL);
976 event_handler_ = event_handler; 1025 event_handler_ = event_handler;
977 CreateCompletionPort(event_handler_->completion_port()); 1026 CreateCompletionPort(event_handler_->completion_port());
978 } 1027 }
979 } 1028 }
980 1029
981 1030
982 bool DatagramSocket::IsClosed() { 1031 bool DatagramSocket::IsClosed() {
983 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); 1032 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
(...skipping 19 matching lines...) Expand all
1003 shutdown_ = true; 1052 shutdown_ = true;
1004 } else { 1053 } else {
1005 Handle* handle = reinterpret_cast<Handle*>(msg->id); 1054 Handle* handle = reinterpret_cast<Handle*>(msg->id);
1006 ASSERT(handle != NULL); 1055 ASSERT(handle != NULL);
1007 1056
1008 if (handle->is_listen_socket()) { 1057 if (handle->is_listen_socket()) {
1009 ListenSocket* listen_socket = 1058 ListenSocket* listen_socket =
1010 reinterpret_cast<ListenSocket*>(handle); 1059 reinterpret_cast<ListenSocket*>(handle);
1011 listen_socket->EnsureInitialized(this); 1060 listen_socket->EnsureInitialized(this);
1012 1061
1013 Handle::ScopedLock lock(listen_socket); 1062 MonitorLocker ml(listen_socket->monitor_);
1014 1063
1015 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { 1064 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1016 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); 1065 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1017 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { 1066 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1018 // `events` can only have kInEvent/kOutEvent flags set. 1067 // `events` can only have kInEvent/kOutEvent flags set.
1019 intptr_t events = msg->data & EVENT_MASK; 1068 intptr_t events = msg->data & EVENT_MASK;
1020 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); 1069 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1021 listen_socket->SetPortAndMask(msg->dart_port, events); 1070 listen_socket->SetPortAndMask(msg->dart_port, events);
1022 TryDispatchingPendingAccepts(listen_socket); 1071 TryDispatchingPendingAccepts(listen_socket);
1023 } else if (IS_COMMAND(msg->data, kCloseCommand)) { 1072 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1024 listen_socket->RemovePort(msg->dart_port); 1073 listen_socket->RemovePort(msg->dart_port);
1025 1074
1026 // We only close the socket file descriptor from the operating 1075 // We only close the socket file descriptor from the operating
1027 // system if there are no other dart socket objects which 1076 // system if there are no other dart socket objects which
1028 // are listening on the same (address, port) combination. 1077 // are listening on the same (address, port) combination.
1029 ListeningSocketRegistry *registry = 1078 ListeningSocketRegistry *registry =
1030 ListeningSocketRegistry::Instance(); 1079 ListeningSocketRegistry::Instance();
1031 MutexLocker locker(registry->mutex()); 1080 MutexLocker locker(registry->mutex());
1032 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { 1081 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) {
1033 ASSERT(listen_socket->Mask() == 0); 1082 ASSERT(listen_socket->Mask() == 0);
1034 listen_socket->Close(); 1083 listen_socket->Close();
1035 } 1084 }
1036 1085
1037 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); 1086 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1038 } else { 1087 } else {
1039 UNREACHABLE(); 1088 UNREACHABLE();
1040 } 1089 }
1041 } else { 1090 } else {
1042 handle->EnsureInitialized(this); 1091 handle->EnsureInitialized(this);
1043 Handle::ScopedLock lock(handle); 1092 MonitorLocker ml(handle->monitor_);
1044 1093
1045 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { 1094 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1046 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); 1095 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1047 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { 1096 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1048 // `events` can only have kInEvent/kOutEvent flags set. 1097 // `events` can only have kInEvent/kOutEvent flags set.
1049 intptr_t events = msg->data & EVENT_MASK; 1098 intptr_t events = msg->data & EVENT_MASK;
1050 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); 1099 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1051 1100
1052 handle->SetPortAndMask(msg->dart_port, events); 1101 handle->SetPortAndMask(msg->dart_port, events);
1053 1102
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1107 DeleteIfClosed(handle); 1156 DeleteIfClosed(handle);
1108 } 1157 }
1109 } 1158 }
1110 1159
1111 1160
1112 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, 1161 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1113 OverlappedBuffer* buffer) { 1162 OverlappedBuffer* buffer) {
1114 listen_socket->AcceptComplete(buffer, completion_port_); 1163 listen_socket->AcceptComplete(buffer, completion_port_);
1115 1164
1116 { 1165 {
1117 Handle::ScopedLock lock(listen_socket); 1166 MonitorLocker ml(listen_socket->monitor_);
1118 TryDispatchingPendingAccepts(listen_socket); 1167 TryDispatchingPendingAccepts(listen_socket);
1119 } 1168 }
1120 1169
1121 DeleteIfClosed(listen_socket); 1170 DeleteIfClosed(listen_socket);
1122 } 1171 }
1123 1172
1124 1173
1125 void EventHandlerImplementation::TryDispatchingPendingAccepts( 1174 void EventHandlerImplementation::TryDispatchingPendingAccepts(
1126 ListenSocket *listen_socket) { 1175 ListenSocket *listen_socket) {
1127 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { 1176 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after
1271 HandleConnect(client_socket, bytes, buffer); 1320 HandleConnect(client_socket, bytes, buffer);
1272 break; 1321 break;
1273 } 1322 }
1274 default: 1323 default:
1275 UNREACHABLE(); 1324 UNREACHABLE();
1276 } 1325 }
1277 } 1326 }
1278 1327
1279 1328
1280 EventHandlerImplementation::EventHandlerImplementation() { 1329 EventHandlerImplementation::EventHandlerImplementation() {
1330 startup_monitor_ = new Monitor();
1331 handler_thread_id_ = Thread::kInvalidThreadId;
1281 completion_port_ = 1332 completion_port_ =
1282 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); 1333 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1);
1283 if (completion_port_ == NULL) { 1334 if (completion_port_ == NULL) {
1284 FATAL("Completion port creation failed"); 1335 FATAL("Completion port creation failed");
1285 } 1336 }
1286 shutdown_ = false; 1337 shutdown_ = false;
1287 } 1338 }
1288 1339
1289 1340
1290 EventHandlerImplementation::~EventHandlerImplementation() { 1341 EventHandlerImplementation::~EventHandlerImplementation() {
1342 Thread::Join(handler_thread_id_);
1343 delete startup_monitor_;
1291 CloseHandle(completion_port_); 1344 CloseHandle(completion_port_);
1292 } 1345 }
1293 1346
1294 1347
1295 int64_t EventHandlerImplementation::GetTimeout() { 1348 int64_t EventHandlerImplementation::GetTimeout() {
1296 if (!timeout_queue_.HasTimeout()) { 1349 if (!timeout_queue_.HasTimeout()) {
1297 return kInfinityTimeout; 1350 return kInfinityTimeout;
1298 } 1351 }
1299 int64_t millis = timeout_queue_.CurrentTimeout() - 1352 int64_t millis = timeout_queue_.CurrentTimeout() -
1300 TimerUtils::GetCurrentTimeMilliseconds(); 1353 TimerUtils::GetCurrentTimeMilliseconds();
(...skipping 14 matching lines...) Expand all
1315 FATAL("PostQueuedCompletionStatus failed"); 1368 FATAL("PostQueuedCompletionStatus failed");
1316 } 1369 }
1317 } 1370 }
1318 1371
1319 1372
1320 void EventHandlerImplementation::EventHandlerEntry(uword args) { 1373 void EventHandlerImplementation::EventHandlerEntry(uword args) {
1321 EventHandler* handler = reinterpret_cast<EventHandler*>(args); 1374 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1322 EventHandlerImplementation* handler_impl = &handler->delegate_; 1375 EventHandlerImplementation* handler_impl = &handler->delegate_;
1323 ASSERT(handler_impl != NULL); 1376 ASSERT(handler_impl != NULL);
1324 1377
1378 {
1379 MonitorLocker ml(handler_impl->startup_monitor_);
1380 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1381 ml.Notify();
1382 }
1383
1325 while (!handler_impl->shutdown_) { 1384 while (!handler_impl->shutdown_) {
1326 DWORD bytes; 1385 DWORD bytes;
1327 ULONG_PTR key; 1386 ULONG_PTR key;
1328 OVERLAPPED* overlapped; 1387 OVERLAPPED* overlapped;
1329 int64_t millis = handler_impl->GetTimeout(); 1388 int64_t millis = handler_impl->GetTimeout();
1330 ASSERT(millis == kInfinityTimeout || millis >= 0); 1389 ASSERT(millis == kInfinityTimeout || millis >= 0);
1331 if (millis > kMaxInt32) millis = kMaxInt32; 1390 if (millis > kMaxInt32) millis = kMaxInt32;
1332 ASSERT(sizeof(int32_t) == sizeof(DWORD)); 1391 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1333 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), 1392 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(),
1334 &bytes, 1393 &bytes,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1375 } 1434 }
1376 1435
1377 1436
1378 void EventHandlerImplementation::Start(EventHandler* handler) { 1437 void EventHandlerImplementation::Start(EventHandler* handler) {
1379 int result = Thread::Start(EventHandlerEntry, 1438 int result = Thread::Start(EventHandlerEntry,
1380 reinterpret_cast<uword>(handler)); 1439 reinterpret_cast<uword>(handler));
1381 if (result != 0) { 1440 if (result != 0) {
1382 FATAL1("Failed to start event handler thread %d", result); 1441 FATAL1("Failed to start event handler thread %d", result);
1383 } 1442 }
1384 1443
1444 {
1445 MonitorLocker ml(startup_monitor_);
1446 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1447 ml.Wait();
1448 }
1449 }
1450
1385 // Initialize Winsock32 1451 // Initialize Winsock32
1386 if (!Socket::Initialize()) { 1452 if (!Socket::Initialize()) {
1387 FATAL("Failed to initialized Windows sockets"); 1453 FATAL("Failed to initialized Windows sockets");
1388 } 1454 }
1389 } 1455 }
1390 1456
1391 1457
1392 void EventHandlerImplementation::Shutdown() { 1458 void EventHandlerImplementation::Shutdown() {
1393 SendData(kShutdownId, 0, 0); 1459 SendData(kShutdownId, 0, 0);
1394 } 1460 }
1395 1461
1396 } // namespace bin 1462 } // namespace bin
1397 } // namespace dart 1463 } // namespace dart
1398 1464
1399 #endif // defined(TARGET_OS_WINDOWS) 1465 #endif // defined(TARGET_OS_WINDOWS)
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_win.h ('k') | runtime/bin/main.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698