OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "platform/globals.h" | 5 #include "platform/globals.h" |
6 #if defined(TARGET_OS_WINDOWS) | 6 #if defined(TARGET_OS_WINDOWS) |
7 | 7 |
8 #include "bin/eventhandler.h" | 8 #include "bin/eventhandler.h" |
9 #include "bin/eventhandler_win.h" | 9 #include "bin/eventhandler_win.h" |
10 | 10 |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
114 | 114 |
115 Handle::Handle(intptr_t handle) | 115 Handle::Handle(intptr_t handle) |
116 : DescriptorInfoBase(handle), | 116 : DescriptorInfoBase(handle), |
117 handle_(reinterpret_cast<HANDLE>(handle)), | 117 handle_(reinterpret_cast<HANDLE>(handle)), |
118 completion_port_(INVALID_HANDLE_VALUE), | 118 completion_port_(INVALID_HANDLE_VALUE), |
119 event_handler_(NULL), | 119 event_handler_(NULL), |
120 data_ready_(NULL), | 120 data_ready_(NULL), |
121 pending_read_(NULL), | 121 pending_read_(NULL), |
122 pending_write_(NULL), | 122 pending_write_(NULL), |
123 last_error_(NOERROR), | 123 last_error_(NOERROR), |
124 flags_(0) { | 124 flags_(0), |
125 InitializeCriticalSection(&cs_); | 125 read_thread_id_(Thread::kInvalidThreadId), |
| 126 read_thread_starting_(false), |
| 127 read_thread_finished_(false), |
| 128 monitor_(new Monitor()) { |
126 } | 129 } |
127 | 130 |
128 | 131 |
129 Handle::~Handle() { | 132 Handle::~Handle() { |
130 DeleteCriticalSection(&cs_); | 133 delete monitor_; |
131 } | 134 } |
132 | 135 |
133 | 136 |
134 void Handle::Lock() { | |
135 EnterCriticalSection(&cs_); | |
136 } | |
137 | |
138 | |
139 void Handle::Unlock() { | |
140 LeaveCriticalSection(&cs_); | |
141 } | |
142 | |
143 | |
144 bool Handle::CreateCompletionPort(HANDLE completion_port) { | 137 bool Handle::CreateCompletionPort(HANDLE completion_port) { |
145 completion_port_ = CreateIoCompletionPort(handle(), | 138 completion_port_ = CreateIoCompletionPort(handle(), |
146 completion_port, | 139 completion_port, |
147 reinterpret_cast<ULONG_PTR>(this), | 140 reinterpret_cast<ULONG_PTR>(this), |
148 0); | 141 0); |
149 if (completion_port_ == NULL) { | 142 if (completion_port_ == NULL) { |
150 return false; | 143 return false; |
151 } | 144 } |
152 return true; | 145 return true; |
153 } | 146 } |
154 | 147 |
155 | 148 |
156 void Handle::Close() { | 149 void Handle::Close() { |
157 ScopedLock lock(this); | 150 MonitorLocker ml(monitor_); |
158 if (!IsClosing()) { | 151 if (!IsClosing()) { |
159 // Close the socket and set the closing state. This close method can be | 152 // Close the socket and set the closing state. This close method can be |
160 // called again if this socket has pending IO operations in flight. | 153 // called again if this socket has pending IO operations in flight. |
161 MarkClosing(); | 154 MarkClosing(); |
162 // Perform handle type specific closing. | 155 // Perform handle type specific closing. |
163 DoClose(); | 156 DoClose(); |
164 } | 157 } |
165 ASSERT(IsHandleClosed()); | 158 ASSERT(IsHandleClosed()); |
166 } | 159 } |
167 | 160 |
168 | 161 |
169 void Handle::DoClose() { | 162 void Handle::DoClose() { |
170 if (!IsHandleClosed()) { | 163 if (!IsHandleClosed()) { |
171 CloseHandle(handle_); | 164 CloseHandle(handle_); |
172 handle_ = INVALID_HANDLE_VALUE; | 165 handle_ = INVALID_HANDLE_VALUE; |
173 } | 166 } |
174 } | 167 } |
175 | 168 |
176 | 169 |
177 bool Handle::HasPendingRead() { | 170 bool Handle::HasPendingRead() { |
178 ScopedLock lock(this); | 171 MonitorLocker ml(monitor_); |
179 return pending_read_ != NULL; | 172 return pending_read_ != NULL; |
180 } | 173 } |
181 | 174 |
182 | 175 |
183 bool Handle::HasPendingWrite() { | 176 bool Handle::HasPendingWrite() { |
184 ScopedLock lock(this); | 177 MonitorLocker ml(monitor_); |
185 return pending_write_ != NULL; | 178 return pending_write_ != NULL; |
186 } | 179 } |
187 | 180 |
188 | 181 |
189 void Handle::ReadComplete(OverlappedBuffer* buffer) { | 182 void Handle::WaitForReadThreadStarted() { |
190 ScopedLock lock(this); | 183 MonitorLocker ml(monitor_); |
191 // Currently only one outstanding read at the time. | 184 while (read_thread_starting_) { |
192 ASSERT(pending_read_ == buffer); | 185 ml.Wait(); |
193 ASSERT(data_ready_ == NULL); | |
194 if (!IsClosing() && !buffer->IsEmpty()) { | |
195 data_ready_ = pending_read_; | |
196 } else { | |
197 OverlappedBuffer::DisposeBuffer(buffer); | |
198 } | 186 } |
199 pending_read_ = NULL; | |
200 } | 187 } |
201 | 188 |
202 | 189 |
| 190 void Handle::WaitForReadThreadFinished() { |
| 191 // Join the Reader thread if there is one. |
| 192 ThreadId to_join = Thread::kInvalidThreadId; |
| 193 { |
| 194 MonitorLocker ml(monitor_); |
| 195 if (read_thread_id_ != Thread::kInvalidThreadId) { |
| 196 while (!read_thread_finished_) { |
| 197 ml.Wait(); |
| 198 } |
| 199 read_thread_finished_ = false; |
| 200 to_join = read_thread_id_; |
| 201 read_thread_id_ = Thread::kInvalidThreadId; |
| 202 } |
| 203 } |
| 204 if (to_join != Thread::kInvalidThreadId) { |
| 205 Thread::Join(to_join); |
| 206 } |
| 207 } |
| 208 |
| 209 |
| 210 void Handle::ReadComplete(OverlappedBuffer* buffer) { |
| 211 WaitForReadThreadStarted(); |
| 212 { |
| 213 MonitorLocker ml(monitor_); |
| 214 // Currently only one outstanding read at the time. |
| 215 ASSERT(pending_read_ == buffer); |
| 216 ASSERT(data_ready_ == NULL); |
| 217 if (!IsClosing() && !buffer->IsEmpty()) { |
| 218 data_ready_ = pending_read_; |
| 219 } else { |
| 220 OverlappedBuffer::DisposeBuffer(buffer); |
| 221 } |
| 222 pending_read_ = NULL; |
| 223 } |
| 224 WaitForReadThreadFinished(); |
| 225 } |
| 226 |
| 227 |
203 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { | 228 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
204 ReadComplete(buffer); | 229 ReadComplete(buffer); |
205 } | 230 } |
206 | 231 |
207 | 232 |
208 void Handle::WriteComplete(OverlappedBuffer* buffer) { | 233 void Handle::WriteComplete(OverlappedBuffer* buffer) { |
209 ScopedLock lock(this); | 234 MonitorLocker ml(monitor_); |
210 // Currently only one outstanding write at the time. | 235 // Currently only one outstanding write at the time. |
211 ASSERT(pending_write_ == buffer); | 236 ASSERT(pending_write_ == buffer); |
212 OverlappedBuffer::DisposeBuffer(buffer); | 237 OverlappedBuffer::DisposeBuffer(buffer); |
213 pending_write_ = NULL; | 238 pending_write_ = NULL; |
214 } | 239 } |
215 | 240 |
216 | 241 |
217 static void ReadFileThread(uword args) { | 242 static void ReadFileThread(uword args) { |
218 Handle* handle = reinterpret_cast<Handle*>(args); | 243 Handle* handle = reinterpret_cast<Handle*>(args); |
219 handle->ReadSyncCompleteAsync(); | 244 handle->ReadSyncCompleteAsync(); |
220 } | 245 } |
221 | 246 |
222 | 247 |
| 248 void Handle::NotifyReadThreadStarted() { |
| 249 MonitorLocker ml(monitor_); |
| 250 ASSERT(read_thread_starting_); |
| 251 ASSERT(read_thread_id_ == Thread::kInvalidThreadId); |
| 252 read_thread_id_ = Thread::GetCurrentThreadId(); |
| 253 read_thread_starting_ = false; |
| 254 ml.Notify(); |
| 255 } |
| 256 |
| 257 void Handle::NotifyReadThreadFinished() { |
| 258 MonitorLocker ml(monitor_); |
| 259 ASSERT(!read_thread_finished_); |
| 260 ASSERT(read_thread_id_ != Thread::kInvalidThreadId); |
| 261 read_thread_finished_ = true; |
| 262 ml.Notify(); |
| 263 } |
| 264 |
| 265 |
223 void Handle::ReadSyncCompleteAsync() { | 266 void Handle::ReadSyncCompleteAsync() { |
| 267 NotifyReadThreadStarted(); |
224 ASSERT(pending_read_ != NULL); | 268 ASSERT(pending_read_ != NULL); |
225 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); | 269 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); |
226 | 270 |
227 DWORD buffer_size = pending_read_->GetBufferSize(); | 271 DWORD buffer_size = pending_read_->GetBufferSize(); |
228 if (GetFileType(handle_) == FILE_TYPE_CHAR) { | 272 if (GetFileType(handle_) == FILE_TYPE_CHAR) { |
229 buffer_size = kStdOverlappedBufferSize; | 273 buffer_size = kStdOverlappedBufferSize; |
230 } | 274 } |
| 275 char* buffer_start = pending_read_->GetBufferStart(); |
231 DWORD bytes_read = 0; | 276 DWORD bytes_read = 0; |
232 BOOL ok = ReadFile(handle_, | 277 BOOL ok = ReadFile(handle_, |
233 pending_read_->GetBufferStart(), | 278 buffer_start, |
234 buffer_size, | 279 buffer_size, |
235 &bytes_read, | 280 &bytes_read, |
236 NULL); | 281 NULL); |
237 if (!ok) { | 282 if (!ok) { |
238 bytes_read = 0; | 283 bytes_read = 0; |
239 } | 284 } |
240 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); | 285 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); |
241 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), | 286 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), |
242 bytes_read, | 287 bytes_read, |
243 reinterpret_cast<ULONG_PTR>(this), | 288 reinterpret_cast<ULONG_PTR>(this), |
244 overlapped); | 289 overlapped); |
245 if (!ok) { | 290 if (!ok) { |
246 FATAL("PostQueuedCompletionStatus failed"); | 291 FATAL("PostQueuedCompletionStatus failed"); |
247 } | 292 } |
| 293 NotifyReadThreadFinished(); |
248 } | 294 } |
249 | 295 |
250 | 296 |
251 bool Handle::IssueRead() { | 297 bool Handle::IssueRead() { |
252 ScopedLock lock(this); | |
253 ASSERT(type_ != kListenSocket); | 298 ASSERT(type_ != kListenSocket); |
254 ASSERT(pending_read_ == NULL); | 299 ASSERT(pending_read_ == NULL); |
255 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); | 300 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
256 if (SupportsOverlappedIO()) { | 301 if (SupportsOverlappedIO()) { |
257 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 302 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
258 | 303 |
259 BOOL ok = ReadFile(handle_, | 304 BOOL ok = ReadFile(handle_, |
260 buffer->GetBufferStart(), | 305 buffer->GetBufferStart(), |
261 buffer->GetBufferSize(), | 306 buffer->GetBufferSize(), |
262 NULL, | 307 NULL, |
263 buffer->GetCleanOverlapped()); | 308 buffer->GetCleanOverlapped()); |
264 if (ok || GetLastError() == ERROR_IO_PENDING) { | 309 if (ok || GetLastError() == ERROR_IO_PENDING) { |
265 // Completing asynchronously. | 310 // Completing asynchronously. |
266 pending_read_ = buffer; | 311 pending_read_ = buffer; |
267 return true; | 312 return true; |
268 } | 313 } |
269 OverlappedBuffer::DisposeBuffer(buffer); | 314 OverlappedBuffer::DisposeBuffer(buffer); |
270 HandleIssueError(); | 315 HandleIssueError(); |
271 return false; | 316 return false; |
272 } else { | 317 } else { |
273 // Completing asynchronously through thread. | 318 // Completing asynchronously through thread. |
274 pending_read_ = buffer; | 319 pending_read_ = buffer; |
| 320 read_thread_starting_ = true; |
275 int result = Thread::Start(ReadFileThread, | 321 int result = Thread::Start(ReadFileThread, |
276 reinterpret_cast<uword>(this)); | 322 reinterpret_cast<uword>(this)); |
277 if (result != 0) { | 323 if (result != 0) { |
278 FATAL1("Failed to start read file thread %d", result); | 324 FATAL1("Failed to start read file thread %d", result); |
279 } | 325 } |
280 return true; | 326 return true; |
281 } | 327 } |
282 } | 328 } |
283 | 329 |
284 | 330 |
285 bool Handle::IssueRecvFrom() { | 331 bool Handle::IssueRecvFrom() { |
286 return false; | 332 return false; |
287 } | 333 } |
288 | 334 |
289 | 335 |
290 bool Handle::IssueWrite() { | 336 bool Handle::IssueWrite() { |
291 ScopedLock lock(this); | 337 MonitorLocker ml(monitor_); |
292 ASSERT(type_ != kListenSocket); | 338 ASSERT(type_ != kListenSocket); |
293 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 339 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
294 ASSERT(pending_write_ != NULL); | 340 ASSERT(pending_write_ != NULL); |
295 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); | 341 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
296 | 342 |
297 OverlappedBuffer* buffer = pending_write_; | 343 OverlappedBuffer* buffer = pending_write_; |
298 BOOL ok = WriteFile(handle_, | 344 BOOL ok = WriteFile(handle_, |
299 buffer->GetBufferStart(), | 345 buffer->GetBufferStart(), |
300 buffer->GetBufferSize(), | 346 buffer->GetBufferSize(), |
301 NULL, | 347 NULL, |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
338 if (error == ERROR_BROKEN_PIPE) { | 384 if (error == ERROR_BROKEN_PIPE) { |
339 HandleClosed(this); | 385 HandleClosed(this); |
340 } else { | 386 } else { |
341 HandleError(this); | 387 HandleError(this); |
342 } | 388 } |
343 SetLastError(error); | 389 SetLastError(error); |
344 } | 390 } |
345 | 391 |
346 | 392 |
347 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { | 393 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
348 ScopedLock lock(this); | 394 MonitorLocker ml(monitor_); |
349 event_handler_ = event_handler; | 395 event_handler_ = event_handler; |
350 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { | 396 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { |
351 CreateCompletionPort(event_handler_->completion_port()); | 397 CreateCompletionPort(event_handler_->completion_port()); |
352 } | 398 } |
353 } | 399 } |
354 | 400 |
355 | 401 |
356 bool FileHandle::IsClosed() { | 402 bool FileHandle::IsClosed() { |
357 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 403 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
358 } | 404 } |
359 | 405 |
360 | 406 |
361 void DirectoryWatchHandle::EnsureInitialized( | 407 void DirectoryWatchHandle::EnsureInitialized( |
362 EventHandlerImplementation* event_handler) { | 408 EventHandlerImplementation* event_handler) { |
363 ScopedLock lock(this); | 409 MonitorLocker ml(monitor_); |
364 event_handler_ = event_handler; | 410 event_handler_ = event_handler; |
365 if (completion_port_ == INVALID_HANDLE_VALUE) { | 411 if (completion_port_ == INVALID_HANDLE_VALUE) { |
366 CreateCompletionPort(event_handler_->completion_port()); | 412 CreateCompletionPort(event_handler_->completion_port()); |
367 } | 413 } |
368 } | 414 } |
369 | 415 |
370 | 416 |
371 bool DirectoryWatchHandle::IsClosed() { | 417 bool DirectoryWatchHandle::IsClosed() { |
372 return IsClosing() && pending_read_ == NULL; | 418 return IsClosing() && pending_read_ == NULL; |
373 } | 419 } |
374 | 420 |
375 | 421 |
376 bool DirectoryWatchHandle::IssueRead() { | 422 bool DirectoryWatchHandle::IssueRead() { |
377 ScopedLock lock(this); | |
378 // It may have been started before, as we start the directory-handler when | 423 // It may have been started before, as we start the directory-handler when |
379 // we create it. | 424 // we create it. |
380 if (pending_read_ != NULL || data_ready_ != NULL) return true; | 425 if (pending_read_ != NULL || data_ready_ != NULL) return true; |
381 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); | 426 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
382 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 427 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
383 BOOL ok = ReadDirectoryChangesW(handle_, | 428 BOOL ok = ReadDirectoryChangesW(handle_, |
384 buffer->GetBufferStart(), | 429 buffer->GetBufferStart(), |
385 buffer->GetBufferSize(), | 430 buffer->GetBufferSize(), |
386 recursive_, | 431 recursive_, |
387 events_, | 432 events_, |
388 NULL, | 433 NULL, |
389 buffer->GetCleanOverlapped(), | 434 buffer->GetCleanOverlapped(), |
390 NULL); | 435 NULL); |
391 if (ok || GetLastError() == ERROR_IO_PENDING) { | 436 if (ok || GetLastError() == ERROR_IO_PENDING) { |
392 // Completing asynchronously. | 437 // Completing asynchronously. |
393 pending_read_ = buffer; | 438 pending_read_ = buffer; |
394 return true; | 439 return true; |
395 } | 440 } |
396 OverlappedBuffer::DisposeBuffer(buffer); | 441 OverlappedBuffer::DisposeBuffer(buffer); |
397 return false; | 442 return false; |
398 } | 443 } |
399 | 444 |
400 | 445 |
401 void DirectoryWatchHandle::Stop() { | 446 void DirectoryWatchHandle::Stop() { |
402 ScopedLock lock(this); | 447 MonitorLocker ml(monitor_); |
403 // Stop the outstanding read, so we can close the handle. | 448 // Stop the outstanding read, so we can close the handle. |
404 | 449 |
405 if (pending_read_ != NULL) { | 450 if (pending_read_ != NULL) { |
406 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); | 451 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); |
407 // Don't dispose of the buffer, as it will still complete (with length 0). | 452 // Don't dispose of the buffer, as it will still complete (with length 0). |
408 } | 453 } |
409 | 454 |
410 DoClose(); | 455 DoClose(); |
411 } | 456 } |
412 | 457 |
(...skipping 23 matching lines...) Expand all Loading... |
436 NULL, | 481 NULL, |
437 NULL); | 482 NULL); |
438 if (status == SOCKET_ERROR) { | 483 if (status == SOCKET_ERROR) { |
439 return false; | 484 return false; |
440 } | 485 } |
441 return true; | 486 return true; |
442 } | 487 } |
443 | 488 |
444 | 489 |
445 bool ListenSocket::IssueAccept() { | 490 bool ListenSocket::IssueAccept() { |
446 ScopedLock lock(this); | 491 MonitorLocker ml(monitor_); |
447 | 492 |
448 // For AcceptEx there needs to be buffer storage for address | 493 // For AcceptEx there needs to be buffer storage for address |
449 // information for two addresses (local and remote address). The | 494 // information for two addresses (local and remote address). The |
450 // AcceptEx documentation says: "This value must be at least 16 | 495 // AcceptEx documentation says: "This value must be at least 16 |
451 // bytes more than the maximum address length for the transport | 496 // bytes more than the maximum address length for the transport |
452 // protocol in use." | 497 // protocol in use." |
453 static const int kAcceptExAddressAdditionalBytes = 16; | 498 static const int kAcceptExAddressAdditionalBytes = 16; |
454 static const int kAcceptExAddressStorageSize = | 499 static const int kAcceptExAddressStorageSize = |
455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; | 500 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; |
456 OverlappedBuffer* buffer = | 501 OverlappedBuffer* buffer = |
(...skipping 19 matching lines...) Expand all Loading... |
476 } | 521 } |
477 | 522 |
478 pending_accept_count_++; | 523 pending_accept_count_++; |
479 | 524 |
480 return true; | 525 return true; |
481 } | 526 } |
482 | 527 |
483 | 528 |
484 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, | 529 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
485 HANDLE completion_port) { | 530 HANDLE completion_port) { |
486 ScopedLock lock(this); | 531 MonitorLocker ml(monitor_); |
487 if (!IsClosing()) { | 532 if (!IsClosing()) { |
488 // Update the accepted socket to support the full range of API calls. | 533 // Update the accepted socket to support the full range of API calls. |
489 SOCKET s = socket(); | 534 SOCKET s = socket(); |
490 int rc = setsockopt(buffer->client(), | 535 int rc = setsockopt(buffer->client(), |
491 SOL_SOCKET, | 536 SOL_SOCKET, |
492 SO_UPDATE_ACCEPT_CONTEXT, | 537 SO_UPDATE_ACCEPT_CONTEXT, |
493 reinterpret_cast<char*>(&s), sizeof(s)); | 538 reinterpret_cast<char*>(&s), sizeof(s)); |
494 if (rc == NO_ERROR) { | 539 if (rc == NO_ERROR) { |
495 // Insert the accepted socket into the list. | 540 // Insert the accepted socket into the list. |
496 ClientSocket* client_socket = new ClientSocket(buffer->client()); | 541 ClientSocket* client_socket = new ClientSocket(buffer->client()); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
537 client->Close(); | 582 client->Close(); |
538 DeleteIfClosed(client); | 583 DeleteIfClosed(client); |
539 } else { | 584 } else { |
540 break; | 585 break; |
541 } | 586 } |
542 } | 587 } |
543 } | 588 } |
544 | 589 |
545 | 590 |
546 bool ListenSocket::CanAccept() { | 591 bool ListenSocket::CanAccept() { |
547 ScopedLock lock(this); | 592 MonitorLocker ml(monitor_); |
548 return accepted_head_ != NULL; | 593 return accepted_head_ != NULL; |
549 } | 594 } |
550 | 595 |
551 | 596 |
552 ClientSocket* ListenSocket::Accept() { | 597 ClientSocket* ListenSocket::Accept() { |
553 ScopedLock lock(this); | 598 MonitorLocker ml(monitor_); |
554 | 599 |
555 ClientSocket *result = NULL; | 600 ClientSocket *result = NULL; |
556 | 601 |
557 if (accepted_head_ != NULL) { | 602 if (accepted_head_ != NULL) { |
558 result = accepted_head_; | 603 result = accepted_head_; |
559 accepted_head_ = accepted_head_->next(); | 604 accepted_head_ = accepted_head_->next(); |
560 if (accepted_head_ == NULL) accepted_tail_ = NULL; | 605 if (accepted_head_ == NULL) accepted_tail_ = NULL; |
561 result->set_next(NULL); | 606 result->set_next(NULL); |
562 accepted_count_--; | 607 accepted_count_--; |
563 } | 608 } |
564 | 609 |
565 if (pending_accept_count_ < 5) { | 610 if (pending_accept_count_ < 5) { |
566 // We have less than 5 pending accepts, queue another. | 611 // We have less than 5 pending accepts, queue another. |
567 if (!IsClosing()) { | 612 if (!IsClosing()) { |
568 if (!IssueAccept()) { | 613 if (!IssueAccept()) { |
569 HandleError(this); | 614 HandleError(this); |
570 } | 615 } |
571 } | 616 } |
572 } | 617 } |
573 | 618 |
574 return result; | 619 return result; |
575 } | 620 } |
576 | 621 |
577 | 622 |
578 void ListenSocket::EnsureInitialized( | 623 void ListenSocket::EnsureInitialized( |
579 EventHandlerImplementation* event_handler) { | 624 EventHandlerImplementation* event_handler) { |
580 ScopedLock lock(this); | 625 MonitorLocker ml(monitor_); |
581 if (AcceptEx_ == NULL) { | 626 if (AcceptEx_ == NULL) { |
582 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); | 627 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
583 ASSERT(event_handler_ == NULL); | 628 ASSERT(event_handler_ == NULL); |
584 event_handler_ = event_handler; | 629 event_handler_ = event_handler; |
585 CreateCompletionPort(event_handler_->completion_port()); | 630 CreateCompletionPort(event_handler_->completion_port()); |
586 LoadAcceptEx(); | 631 LoadAcceptEx(); |
587 } | 632 } |
588 } | 633 } |
589 | 634 |
590 | 635 |
591 bool ListenSocket::IsClosed() { | 636 bool ListenSocket::IsClosed() { |
592 return IsClosing() && !HasPendingAccept(); | 637 return IsClosing() && !HasPendingAccept(); |
593 } | 638 } |
594 | 639 |
595 | 640 |
596 intptr_t Handle::Available() { | 641 intptr_t Handle::Available() { |
597 ScopedLock lock(this); | 642 MonitorLocker ml(monitor_); |
598 if (data_ready_ == NULL) return 0; | 643 if (data_ready_ == NULL) return 0; |
599 ASSERT(!data_ready_->IsEmpty()); | 644 ASSERT(!data_ready_->IsEmpty()); |
600 return data_ready_->GetRemainingLength(); | 645 return data_ready_->GetRemainingLength(); |
601 } | 646 } |
602 | 647 |
603 | 648 |
604 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { | 649 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
605 ScopedLock lock(this); | 650 MonitorLocker ml(monitor_); |
606 if (data_ready_ == NULL) return 0; | 651 if (data_ready_ == NULL) return 0; |
607 num_bytes = data_ready_->Read( | 652 num_bytes = data_ready_->Read( |
608 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); | 653 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
609 if (data_ready_->IsEmpty()) { | 654 if (data_ready_->IsEmpty()) { |
610 OverlappedBuffer::DisposeBuffer(data_ready_); | 655 OverlappedBuffer::DisposeBuffer(data_ready_); |
611 data_ready_ = NULL; | 656 data_ready_ = NULL; |
612 if (!IsClosing() && !IsClosedRead()) IssueRead(); | 657 if (!IsClosing() && !IsClosedRead()) IssueRead(); |
613 } | 658 } |
614 return num_bytes; | 659 return num_bytes; |
615 } | 660 } |
616 | 661 |
617 | 662 |
618 intptr_t Handle::RecvFrom( | 663 intptr_t Handle::RecvFrom( |
619 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { | 664 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
620 ScopedLock lock(this); | 665 MonitorLocker ml(monitor_); |
621 if (data_ready_ == NULL) return 0; | 666 if (data_ready_ == NULL) return 0; |
622 num_bytes = data_ready_->Read( | 667 num_bytes = data_ready_->Read( |
623 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); | 668 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
624 if (data_ready_->from()->sa_family == AF_INET) { | 669 if (data_ready_->from()->sa_family == AF_INET) { |
625 ASSERT(sa_len >= sizeof(struct sockaddr_in)); | 670 ASSERT(sa_len >= sizeof(struct sockaddr_in)); |
626 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); | 671 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); |
627 } else { | 672 } else { |
628 ASSERT(data_ready_->from()->sa_family == AF_INET6); | 673 ASSERT(data_ready_->from()->sa_family == AF_INET6); |
629 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); | 674 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); |
630 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); | 675 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); |
631 } | 676 } |
632 // Always dispose of the buffer, as UDP messages must be read in their | 677 // Always dispose of the buffer, as UDP messages must be read in their |
633 // entirety to match how recvfrom works in a socket. | 678 // entirety to match how recvfrom works in a socket. |
634 OverlappedBuffer::DisposeBuffer(data_ready_); | 679 OverlappedBuffer::DisposeBuffer(data_ready_); |
635 data_ready_ = NULL; | 680 data_ready_ = NULL; |
636 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); | 681 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); |
637 return num_bytes; | 682 return num_bytes; |
638 } | 683 } |
639 | 684 |
640 | 685 |
641 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { | 686 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { |
642 ScopedLock lock(this); | 687 MonitorLocker ml(monitor_); |
643 if (pending_write_ != NULL) return 0; | 688 if (pending_write_ != NULL) return 0; |
644 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 689 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
645 ASSERT(SupportsOverlappedIO()); | 690 ASSERT(SupportsOverlappedIO()); |
646 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; | 691 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; |
647 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); | 692 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
648 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); | 693 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
649 pending_write_->Write(buffer, truncated_bytes); | 694 pending_write_->Write(buffer, truncated_bytes); |
650 if (!IssueWrite()) return -1; | 695 if (!IssueWrite()) return -1; |
651 return truncated_bytes; | 696 return truncated_bytes; |
652 } | 697 } |
653 | 698 |
654 | 699 |
655 intptr_t Handle::SendTo(const void* buffer, | 700 intptr_t Handle::SendTo(const void* buffer, |
656 intptr_t num_bytes, | 701 intptr_t num_bytes, |
657 struct sockaddr* sa, | 702 struct sockaddr* sa, |
658 socklen_t sa_len) { | 703 socklen_t sa_len) { |
659 ScopedLock lock(this); | 704 MonitorLocker ml(monitor_); |
660 if (pending_write_ != NULL) return 0; | 705 if (pending_write_ != NULL) return 0; |
661 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 706 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
662 ASSERT(SupportsOverlappedIO()); | 707 ASSERT(SupportsOverlappedIO()); |
663 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; | 708 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; |
664 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); | 709 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); |
665 pending_write_->Write(buffer, num_bytes); | 710 pending_write_->Write(buffer, num_bytes); |
666 if (!IssueSendTo(sa, sa_len)) return -1; | 711 if (!IssueSendTo(sa, sa_len)) return -1; |
667 return num_bytes; | 712 return num_bytes; |
668 } | 713 } |
669 | 714 |
670 | 715 |
671 static void WriteFileThread(uword args) { | 716 static void WriteFileThread(uword args) { |
672 StdHandle* handle = reinterpret_cast<StdHandle*>(args); | 717 StdHandle* handle = reinterpret_cast<StdHandle*>(args); |
673 handle->RunWriteLoop(); | 718 handle->RunWriteLoop(); |
674 } | 719 } |
675 | 720 |
676 | 721 |
677 void StdHandle::RunWriteLoop() { | 722 void StdHandle::RunWriteLoop() { |
678 write_monitor_->Enter(); | 723 MonitorLocker ml(monitor_); |
679 write_thread_running_ = true; | 724 write_thread_running_ = true; |
| 725 thread_id_ = Thread::GetCurrentThreadId(); |
680 // Notify we have started. | 726 // Notify we have started. |
681 write_monitor_->Notify(); | 727 ml.Notify(); |
682 | 728 |
683 while (write_thread_running_) { | 729 while (write_thread_running_) { |
684 write_monitor_->Wait(Monitor::kNoTimeout); | 730 ml.Wait(Monitor::kNoTimeout); |
685 if (pending_write_ != NULL) { | 731 if (pending_write_ != NULL) { |
686 // We woke up and had a pending write. Execute it. | 732 // We woke up and had a pending write. Execute it. |
687 WriteSyncCompleteAsync(); | 733 WriteSyncCompleteAsync(); |
688 } | 734 } |
689 } | 735 } |
690 | 736 |
691 write_thread_exists_ = false; | 737 write_thread_exists_ = false; |
692 write_monitor_->Notify(); | 738 ml.Notify(); |
693 write_monitor_->Exit(); | |
694 } | 739 } |
695 | 740 |
696 | 741 |
697 void StdHandle::WriteSyncCompleteAsync() { | 742 void StdHandle::WriteSyncCompleteAsync() { |
698 ASSERT(pending_write_ != NULL); | 743 ASSERT(pending_write_ != NULL); |
699 | 744 |
700 DWORD bytes_written = -1; | 745 DWORD bytes_written = -1; |
701 BOOL ok = WriteFile(handle_, | 746 BOOL ok = WriteFile(handle_, |
702 pending_write_->GetBufferStart(), | 747 pending_write_->GetBufferStart(), |
703 pending_write_->GetBufferSize(), | 748 pending_write_->GetBufferSize(), |
704 &bytes_written, | 749 &bytes_written, |
705 NULL); | 750 NULL); |
706 if (!ok) { | 751 if (!ok) { |
707 bytes_written = 0; | 752 bytes_written = 0; |
708 } | 753 } |
709 thread_wrote_ += bytes_written; | 754 thread_wrote_ += bytes_written; |
710 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); | 755 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); |
711 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), | 756 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), |
712 bytes_written, | 757 bytes_written, |
713 reinterpret_cast<ULONG_PTR>(this), | 758 reinterpret_cast<ULONG_PTR>(this), |
714 overlapped); | 759 overlapped); |
715 if (!ok) { | 760 if (!ok) { |
716 FATAL("PostQueuedCompletionStatus failed"); | 761 FATAL("PostQueuedCompletionStatus failed"); |
717 } | 762 } |
718 } | 763 } |
719 | 764 |
720 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { | 765 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
721 ScopedLock lock(this); | 766 MonitorLocker ml(monitor_); |
722 if (pending_write_ != NULL) return 0; | 767 if (pending_write_ != NULL) return 0; |
723 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 768 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
724 // In the case of stdout and stderr, OverlappedIO is not supported. | 769 // In the case of stdout and stderr, OverlappedIO is not supported. |
725 // Here we'll instead use a thread, to make it async. | 770 // Here we'll instead use a thread, to make it async. |
726 // This code is actually never exposed to the user, as stdout and stderr is | 771 // This code is actually never exposed to the user, as stdout and stderr is |
727 // not available as a RawSocket, but only wrapped in a Socket. | 772 // not available as a RawSocket, but only wrapped in a Socket. |
728 // Note that we return '0', unless a thread have already completed a write. | 773 // Note that we return '0', unless a thread have already completed a write. |
729 MonitorLocker locker(write_monitor_); | |
730 if (thread_wrote_ > 0) { | 774 if (thread_wrote_ > 0) { |
731 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; | 775 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; |
732 thread_wrote_ -= num_bytes; | 776 thread_wrote_ -= num_bytes; |
733 return num_bytes; | 777 return num_bytes; |
734 } | 778 } |
735 if (!write_thread_exists_) { | 779 if (!write_thread_exists_) { |
736 write_thread_exists_ = true; | 780 write_thread_exists_ = true; |
737 int result = Thread::Start(WriteFileThread, | 781 int result = Thread::Start( |
738 reinterpret_cast<uword>(this)); | 782 WriteFileThread, reinterpret_cast<uword>(this)); |
739 if (result != 0) { | 783 if (result != 0) { |
740 FATAL1("Failed to start write file thread %d", result); | 784 FATAL1("Failed to start write file thread %d", result); |
741 } | 785 } |
742 while (!write_thread_running_) { | 786 while (!write_thread_running_) { |
743 // Wait until we the thread is running. | 787 // Wait until we the thread is running. |
744 locker.Wait(Monitor::kNoTimeout); | 788 ml.Wait(Monitor::kNoTimeout); |
745 } | 789 } |
746 } | 790 } |
747 // Only queue up to INT_MAX bytes. | 791 // Only queue up to INT_MAX bytes. |
748 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); | 792 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
749 // Create buffer and notify thread about the new handle. | 793 // Create buffer and notify thread about the new handle. |
750 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); | 794 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
751 pending_write_->Write(buffer, truncated_bytes); | 795 pending_write_->Write(buffer, truncated_bytes); |
752 locker.Notify(); | 796 ml.Notify(); |
753 return 0; | 797 return 0; |
754 } | 798 } |
755 | 799 |
756 | 800 |
757 void StdHandle::DoClose() { | 801 void StdHandle::DoClose() { |
758 MonitorLocker locker(write_monitor_); | 802 MonitorLocker ml(monitor_); |
759 if (write_thread_exists_) { | 803 if (write_thread_exists_) { |
760 write_thread_running_ = false; | 804 write_thread_running_ = false; |
761 locker.Notify(); | 805 ml.Notify(); |
762 while (write_thread_exists_) { | 806 while (write_thread_exists_) { |
763 locker.Wait(Monitor::kNoTimeout); | 807 ml.Wait(Monitor::kNoTimeout); |
764 } | 808 } |
| 809 Thread::Join(thread_id_); |
765 } | 810 } |
766 Handle::DoClose(); | 811 Handle::DoClose(); |
767 } | 812 } |
768 | 813 |
769 | 814 |
770 bool ClientSocket::LoadDisconnectEx() { | 815 bool ClientSocket::LoadDisconnectEx() { |
771 // Load the DisconnectEx function into memory using WSAIoctl. | 816 // Load the DisconnectEx function into memory using WSAIoctl. |
772 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; | 817 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
773 DWORD bytes; | 818 DWORD bytes; |
774 int status = WSAIoctl(socket(), | 819 int status = WSAIoctl(socket(), |
(...skipping 25 matching lines...) Expand all Loading... |
800 | 845 |
801 void ClientSocket::DoClose() { | 846 void ClientSocket::DoClose() { |
802 // Always do a shutdown before initiating a disconnect. | 847 // Always do a shutdown before initiating a disconnect. |
803 shutdown(socket(), SD_BOTH); | 848 shutdown(socket(), SD_BOTH); |
804 IssueDisconnect(); | 849 IssueDisconnect(); |
805 handle_ = INVALID_HANDLE_VALUE; | 850 handle_ = INVALID_HANDLE_VALUE; |
806 } | 851 } |
807 | 852 |
808 | 853 |
809 bool ClientSocket::IssueRead() { | 854 bool ClientSocket::IssueRead() { |
810 ScopedLock lock(this); | 855 MonitorLocker ml(monitor_); |
811 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 856 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
812 ASSERT(pending_read_ == NULL); | 857 ASSERT(pending_read_ == NULL); |
813 | 858 |
814 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can | 859 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can |
815 // handle 64k datagrams. | 860 // handle 64k datagrams. |
816 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); | 861 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); |
817 | 862 |
818 DWORD flags; | 863 DWORD flags; |
819 flags = 0; | 864 flags = 0; |
820 int rc = WSARecv(socket(), | 865 int rc = WSARecv(socket(), |
821 buffer->GetWASBUF(), | 866 buffer->GetWASBUF(), |
822 1, | 867 1, |
823 NULL, | 868 NULL, |
824 &flags, | 869 &flags, |
825 buffer->GetCleanOverlapped(), | 870 buffer->GetCleanOverlapped(), |
826 NULL); | 871 NULL); |
827 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 872 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
828 pending_read_ = buffer; | 873 pending_read_ = buffer; |
829 return true; | 874 return true; |
830 } | 875 } |
831 OverlappedBuffer::DisposeBuffer(buffer); | 876 OverlappedBuffer::DisposeBuffer(buffer); |
832 pending_read_ = NULL; | 877 pending_read_ = NULL; |
833 HandleIssueError(); | 878 HandleIssueError(); |
834 return false; | 879 return false; |
835 } | 880 } |
836 | 881 |
837 | 882 |
838 bool ClientSocket::IssueWrite() { | 883 bool ClientSocket::IssueWrite() { |
839 ScopedLock lock(this); | 884 MonitorLocker ml(monitor_); |
840 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 885 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
841 ASSERT(pending_write_ != NULL); | 886 ASSERT(pending_write_ != NULL); |
842 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); | 887 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
843 | 888 |
844 int rc = WSASend(socket(), | 889 int rc = WSASend(socket(), |
845 pending_write_->GetWASBUF(), | 890 pending_write_->GetWASBUF(), |
846 1, | 891 1, |
847 NULL, | 892 NULL, |
848 0, | 893 0, |
849 pending_write_->GetCleanOverlapped(), | 894 pending_write_->GetCleanOverlapped(), |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
893 } | 938 } |
894 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { | 939 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { |
895 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); | 940 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); |
896 DartUtils::PostInt32(port, 1 << kOutEvent); | 941 DartUtils::PostInt32(port, 1 << kOutEvent); |
897 } | 942 } |
898 } | 943 } |
899 | 944 |
900 | 945 |
901 void ClientSocket::EnsureInitialized( | 946 void ClientSocket::EnsureInitialized( |
902 EventHandlerImplementation* event_handler) { | 947 EventHandlerImplementation* event_handler) { |
903 ScopedLock lock(this); | 948 MonitorLocker ml(monitor_); |
904 if (completion_port_ == INVALID_HANDLE_VALUE) { | 949 if (completion_port_ == INVALID_HANDLE_VALUE) { |
905 ASSERT(event_handler_ == NULL); | 950 ASSERT(event_handler_ == NULL); |
906 event_handler_ = event_handler; | 951 event_handler_ = event_handler; |
907 CreateCompletionPort(event_handler_->completion_port()); | 952 CreateCompletionPort(event_handler_->completion_port()); |
908 } | 953 } |
909 } | 954 } |
910 | 955 |
911 | 956 |
912 bool ClientSocket::IsClosed() { | 957 bool ClientSocket::IsClosed() { |
913 return connected_ && closed_; | 958 return connected_ && closed_; |
914 } | 959 } |
915 | 960 |
916 | 961 |
917 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { | 962 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
918 ScopedLock lock(this); | 963 MonitorLocker ml(monitor_); |
919 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 964 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
920 ASSERT(pending_write_ != NULL); | 965 ASSERT(pending_write_ != NULL); |
921 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); | 966 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
922 | 967 |
923 int rc = WSASendTo(socket(), | 968 int rc = WSASendTo(socket(), |
924 pending_write_->GetWASBUF(), | 969 pending_write_->GetWASBUF(), |
925 1, | 970 1, |
926 NULL, | 971 NULL, |
927 0, | 972 0, |
928 sa, | 973 sa, |
929 sa_len, | 974 sa_len, |
930 pending_write_->GetCleanOverlapped(), | 975 pending_write_->GetCleanOverlapped(), |
931 NULL); | 976 NULL); |
932 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 977 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
933 return true; | 978 return true; |
934 } | 979 } |
935 OverlappedBuffer::DisposeBuffer(pending_write_); | 980 OverlappedBuffer::DisposeBuffer(pending_write_); |
936 pending_write_ = NULL; | 981 pending_write_ = NULL; |
937 HandleIssueError(); | 982 HandleIssueError(); |
938 return false; | 983 return false; |
939 } | 984 } |
940 | 985 |
941 | 986 |
942 bool DatagramSocket::IssueRecvFrom() { | 987 bool DatagramSocket::IssueRecvFrom() { |
943 ScopedLock lock(this); | 988 MonitorLocker ml(monitor_); |
944 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 989 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
945 ASSERT(pending_read_ == NULL); | 990 ASSERT(pending_read_ == NULL); |
946 | 991 |
947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); | 992 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); |
948 | 993 |
949 DWORD flags; | 994 DWORD flags; |
950 flags = 0; | 995 flags = 0; |
951 int rc = WSARecvFrom(socket(), | 996 int rc = WSARecvFrom(socket(), |
952 buffer->GetWASBUF(), | 997 buffer->GetWASBUF(), |
953 1, | 998 1, |
954 NULL, | 999 NULL, |
955 &flags, | 1000 &flags, |
956 buffer->from(), | 1001 buffer->from(), |
957 buffer->from_len_addr(), | 1002 buffer->from_len_addr(), |
958 buffer->GetCleanOverlapped(), | 1003 buffer->GetCleanOverlapped(), |
959 NULL); | 1004 NULL); |
960 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 1005 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
961 pending_read_ = buffer; | 1006 pending_read_ = buffer; |
962 return true; | 1007 return true; |
963 } | 1008 } |
964 OverlappedBuffer::DisposeBuffer(buffer); | 1009 OverlappedBuffer::DisposeBuffer(buffer); |
965 pending_read_ = NULL; | 1010 pending_read_ = NULL; |
966 HandleIssueError(); | 1011 HandleIssueError(); |
967 return false; | 1012 return false; |
968 } | 1013 } |
969 | 1014 |
970 | 1015 |
971 void DatagramSocket::EnsureInitialized( | 1016 void DatagramSocket::EnsureInitialized( |
972 EventHandlerImplementation* event_handler) { | 1017 EventHandlerImplementation* event_handler) { |
973 ScopedLock lock(this); | 1018 MonitorLocker ml(monitor_); |
974 if (completion_port_ == INVALID_HANDLE_VALUE) { | 1019 if (completion_port_ == INVALID_HANDLE_VALUE) { |
975 ASSERT(event_handler_ == NULL); | 1020 ASSERT(event_handler_ == NULL); |
976 event_handler_ = event_handler; | 1021 event_handler_ = event_handler; |
977 CreateCompletionPort(event_handler_->completion_port()); | 1022 CreateCompletionPort(event_handler_->completion_port()); |
978 } | 1023 } |
979 } | 1024 } |
980 | 1025 |
981 | 1026 |
982 bool DatagramSocket::IsClosed() { | 1027 bool DatagramSocket::IsClosed() { |
983 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 1028 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
(...skipping 19 matching lines...) Expand all Loading... |
1003 shutdown_ = true; | 1048 shutdown_ = true; |
1004 } else { | 1049 } else { |
1005 Handle* handle = reinterpret_cast<Handle*>(msg->id); | 1050 Handle* handle = reinterpret_cast<Handle*>(msg->id); |
1006 ASSERT(handle != NULL); | 1051 ASSERT(handle != NULL); |
1007 | 1052 |
1008 if (handle->is_listen_socket()) { | 1053 if (handle->is_listen_socket()) { |
1009 ListenSocket* listen_socket = | 1054 ListenSocket* listen_socket = |
1010 reinterpret_cast<ListenSocket*>(handle); | 1055 reinterpret_cast<ListenSocket*>(handle); |
1011 listen_socket->EnsureInitialized(this); | 1056 listen_socket->EnsureInitialized(this); |
1012 | 1057 |
1013 Handle::ScopedLock lock(listen_socket); | 1058 MonitorLocker ml(listen_socket->monitor_); |
1014 | 1059 |
1015 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1060 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1016 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1061 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1017 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1062 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1018 // `events` can only have kInEvent/kOutEvent flags set. | 1063 // `events` can only have kInEvent/kOutEvent flags set. |
1019 intptr_t events = msg->data & EVENT_MASK; | 1064 intptr_t events = msg->data & EVENT_MASK; |
1020 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1065 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1021 listen_socket->SetPortAndMask(msg->dart_port, events); | 1066 listen_socket->SetPortAndMask(msg->dart_port, events); |
1022 TryDispatchingPendingAccepts(listen_socket); | 1067 TryDispatchingPendingAccepts(listen_socket); |
1023 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1068 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1024 listen_socket->RemovePort(msg->dart_port); | 1069 listen_socket->RemovePort(msg->dart_port); |
1025 | 1070 |
1026 // We only close the socket file descriptor from the operating | 1071 // We only close the socket file descriptor from the operating |
1027 // system if there are no other dart socket objects which | 1072 // system if there are no other dart socket objects which |
1028 // are listening on the same (address, port) combination. | 1073 // are listening on the same (address, port) combination. |
1029 ListeningSocketRegistry *registry = | 1074 ListeningSocketRegistry *registry = |
1030 ListeningSocketRegistry::Instance(); | 1075 ListeningSocketRegistry::Instance(); |
1031 MutexLocker locker(registry->mutex()); | 1076 MutexLocker locker(registry->mutex()); |
1032 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { | 1077 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { |
1033 ASSERT(listen_socket->Mask() == 0); | 1078 ASSERT(listen_socket->Mask() == 0); |
1034 listen_socket->Close(); | 1079 listen_socket->Close(); |
1035 } | 1080 } |
1036 | 1081 |
1037 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); | 1082 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
1038 } else { | 1083 } else { |
1039 UNREACHABLE(); | 1084 UNREACHABLE(); |
1040 } | 1085 } |
1041 } else { | 1086 } else { |
1042 handle->EnsureInitialized(this); | 1087 handle->EnsureInitialized(this); |
1043 Handle::ScopedLock lock(handle); | 1088 MonitorLocker ml(handle->monitor_); |
1044 | 1089 |
1045 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1090 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1046 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1091 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1047 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1092 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1048 // `events` can only have kInEvent/kOutEvent flags set. | 1093 // `events` can only have kInEvent/kOutEvent flags set. |
1049 intptr_t events = msg->data & EVENT_MASK; | 1094 intptr_t events = msg->data & EVENT_MASK; |
1050 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1095 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1051 | 1096 |
1052 handle->SetPortAndMask(msg->dart_port, events); | 1097 handle->SetPortAndMask(msg->dart_port, events); |
1053 | 1098 |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1107 DeleteIfClosed(handle); | 1152 DeleteIfClosed(handle); |
1108 } | 1153 } |
1109 } | 1154 } |
1110 | 1155 |
1111 | 1156 |
1112 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, | 1157 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
1113 OverlappedBuffer* buffer) { | 1158 OverlappedBuffer* buffer) { |
1114 listen_socket->AcceptComplete(buffer, completion_port_); | 1159 listen_socket->AcceptComplete(buffer, completion_port_); |
1115 | 1160 |
1116 { | 1161 { |
1117 Handle::ScopedLock lock(listen_socket); | 1162 MonitorLocker ml(listen_socket->monitor_); |
1118 TryDispatchingPendingAccepts(listen_socket); | 1163 TryDispatchingPendingAccepts(listen_socket); |
1119 } | 1164 } |
1120 | 1165 |
1121 DeleteIfClosed(listen_socket); | 1166 DeleteIfClosed(listen_socket); |
1122 } | 1167 } |
1123 | 1168 |
1124 | 1169 |
1125 void EventHandlerImplementation::TryDispatchingPendingAccepts( | 1170 void EventHandlerImplementation::TryDispatchingPendingAccepts( |
1126 ListenSocket *listen_socket) { | 1171 ListenSocket *listen_socket) { |
1127 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { | 1172 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { |
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1271 HandleConnect(client_socket, bytes, buffer); | 1316 HandleConnect(client_socket, bytes, buffer); |
1272 break; | 1317 break; |
1273 } | 1318 } |
1274 default: | 1319 default: |
1275 UNREACHABLE(); | 1320 UNREACHABLE(); |
1276 } | 1321 } |
1277 } | 1322 } |
1278 | 1323 |
1279 | 1324 |
1280 EventHandlerImplementation::EventHandlerImplementation() { | 1325 EventHandlerImplementation::EventHandlerImplementation() { |
| 1326 startup_monitor_ = new Monitor(); |
| 1327 handler_thread_id_ = Thread::kInvalidThreadId; |
1281 completion_port_ = | 1328 completion_port_ = |
1282 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); | 1329 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
1283 if (completion_port_ == NULL) { | 1330 if (completion_port_ == NULL) { |
1284 FATAL("Completion port creation failed"); | 1331 FATAL("Completion port creation failed"); |
1285 } | 1332 } |
1286 shutdown_ = false; | 1333 shutdown_ = false; |
1287 } | 1334 } |
1288 | 1335 |
1289 | 1336 |
1290 EventHandlerImplementation::~EventHandlerImplementation() { | 1337 EventHandlerImplementation::~EventHandlerImplementation() { |
| 1338 Thread::Join(handler_thread_id_); |
| 1339 delete startup_monitor_; |
1291 CloseHandle(completion_port_); | 1340 CloseHandle(completion_port_); |
1292 } | 1341 } |
1293 | 1342 |
1294 | 1343 |
1295 int64_t EventHandlerImplementation::GetTimeout() { | 1344 int64_t EventHandlerImplementation::GetTimeout() { |
1296 if (!timeout_queue_.HasTimeout()) { | 1345 if (!timeout_queue_.HasTimeout()) { |
1297 return kInfinityTimeout; | 1346 return kInfinityTimeout; |
1298 } | 1347 } |
1299 int64_t millis = timeout_queue_.CurrentTimeout() - | 1348 int64_t millis = timeout_queue_.CurrentTimeout() - |
1300 TimerUtils::GetCurrentTimeMilliseconds(); | 1349 TimerUtils::GetCurrentTimeMilliseconds(); |
(...skipping 14 matching lines...) Expand all Loading... |
1315 FATAL("PostQueuedCompletionStatus failed"); | 1364 FATAL("PostQueuedCompletionStatus failed"); |
1316 } | 1365 } |
1317 } | 1366 } |
1318 | 1367 |
1319 | 1368 |
1320 void EventHandlerImplementation::EventHandlerEntry(uword args) { | 1369 void EventHandlerImplementation::EventHandlerEntry(uword args) { |
1321 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 1370 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
1322 EventHandlerImplementation* handler_impl = &handler->delegate_; | 1371 EventHandlerImplementation* handler_impl = &handler->delegate_; |
1323 ASSERT(handler_impl != NULL); | 1372 ASSERT(handler_impl != NULL); |
1324 | 1373 |
| 1374 { |
| 1375 MonitorLocker ml(handler_impl->startup_monitor_); |
| 1376 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); |
| 1377 ml.Notify(); |
| 1378 } |
| 1379 |
1325 while (!handler_impl->shutdown_) { | 1380 while (!handler_impl->shutdown_) { |
1326 DWORD bytes; | 1381 DWORD bytes; |
1327 ULONG_PTR key; | 1382 ULONG_PTR key; |
1328 OVERLAPPED* overlapped; | 1383 OVERLAPPED* overlapped; |
1329 int64_t millis = handler_impl->GetTimeout(); | 1384 int64_t millis = handler_impl->GetTimeout(); |
1330 ASSERT(millis == kInfinityTimeout || millis >= 0); | 1385 ASSERT(millis == kInfinityTimeout || millis >= 0); |
1331 if (millis > kMaxInt32) millis = kMaxInt32; | 1386 if (millis > kMaxInt32) millis = kMaxInt32; |
1332 ASSERT(sizeof(int32_t) == sizeof(DWORD)); | 1387 ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
1333 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), | 1388 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), |
1334 &bytes, | 1389 &bytes, |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1375 } | 1430 } |
1376 | 1431 |
1377 | 1432 |
1378 void EventHandlerImplementation::Start(EventHandler* handler) { | 1433 void EventHandlerImplementation::Start(EventHandler* handler) { |
1379 int result = Thread::Start(EventHandlerEntry, | 1434 int result = Thread::Start(EventHandlerEntry, |
1380 reinterpret_cast<uword>(handler)); | 1435 reinterpret_cast<uword>(handler)); |
1381 if (result != 0) { | 1436 if (result != 0) { |
1382 FATAL1("Failed to start event handler thread %d", result); | 1437 FATAL1("Failed to start event handler thread %d", result); |
1383 } | 1438 } |
1384 | 1439 |
| 1440 { |
| 1441 MonitorLocker ml(startup_monitor_); |
| 1442 while (handler_thread_id_ == Thread::kInvalidThreadId) { |
| 1443 ml.Wait(); |
| 1444 } |
| 1445 } |
| 1446 |
1385 // Initialize Winsock32 | 1447 // Initialize Winsock32 |
1386 if (!Socket::Initialize()) { | 1448 if (!Socket::Initialize()) { |
1387 FATAL("Failed to initialized Windows sockets"); | 1449 FATAL("Failed to initialized Windows sockets"); |
1388 } | 1450 } |
1389 } | 1451 } |
1390 | 1452 |
1391 | 1453 |
1392 void EventHandlerImplementation::Shutdown() { | 1454 void EventHandlerImplementation::Shutdown() { |
1393 SendData(kShutdownId, 0, 0); | 1455 SendData(kShutdownId, 0, 0); |
1394 } | 1456 } |
1395 | 1457 |
1396 } // namespace bin | 1458 } // namespace bin |
1397 } // namespace dart | 1459 } // namespace dart |
1398 | 1460 |
1399 #endif // defined(TARGET_OS_WINDOWS) | 1461 #endif // defined(TARGET_OS_WINDOWS) |
OLD | NEW |