OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "platform/globals.h" | 5 #include "platform/globals.h" |
6 #if defined(TARGET_OS_WINDOWS) | 6 #if defined(TARGET_OS_WINDOWS) |
7 | 7 |
8 #include "bin/eventhandler.h" | 8 #include "bin/eventhandler.h" |
9 #include "bin/eventhandler_win.h" | 9 #include "bin/eventhandler_win.h" |
10 | 10 |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
114 | 114 |
115 Handle::Handle(intptr_t handle) | 115 Handle::Handle(intptr_t handle) |
116 : DescriptorInfoBase(handle), | 116 : DescriptorInfoBase(handle), |
117 handle_(reinterpret_cast<HANDLE>(handle)), | 117 handle_(reinterpret_cast<HANDLE>(handle)), |
118 completion_port_(INVALID_HANDLE_VALUE), | 118 completion_port_(INVALID_HANDLE_VALUE), |
119 event_handler_(NULL), | 119 event_handler_(NULL), |
120 data_ready_(NULL), | 120 data_ready_(NULL), |
121 pending_read_(NULL), | 121 pending_read_(NULL), |
122 pending_write_(NULL), | 122 pending_write_(NULL), |
123 last_error_(NOERROR), | 123 last_error_(NOERROR), |
124 flags_(0) { | 124 flags_(0), |
125 InitializeCriticalSection(&cs_); | 125 read_thread_id_(Thread::kInvalidThreadId), |
126 read_thread_starting_(false), | |
127 read_thread_finished_(false), | |
128 monitor_(new Monitor()) { | |
126 } | 129 } |
127 | 130 |
128 | 131 |
129 Handle::~Handle() { | 132 Handle::~Handle() { |
130 DeleteCriticalSection(&cs_); | 133 delete monitor_; |
131 } | 134 } |
132 | 135 |
133 | 136 |
134 void Handle::Lock() { | |
135 EnterCriticalSection(&cs_); | |
136 } | |
137 | |
138 | |
139 void Handle::Unlock() { | |
140 LeaveCriticalSection(&cs_); | |
141 } | |
142 | |
143 | |
144 bool Handle::CreateCompletionPort(HANDLE completion_port) { | 137 bool Handle::CreateCompletionPort(HANDLE completion_port) { |
145 completion_port_ = CreateIoCompletionPort(handle(), | 138 completion_port_ = CreateIoCompletionPort(handle(), |
146 completion_port, | 139 completion_port, |
147 reinterpret_cast<ULONG_PTR>(this), | 140 reinterpret_cast<ULONG_PTR>(this), |
148 0); | 141 0); |
149 if (completion_port_ == NULL) { | 142 if (completion_port_ == NULL) { |
150 return false; | 143 return false; |
151 } | 144 } |
152 return true; | 145 return true; |
153 } | 146 } |
154 | 147 |
155 | 148 |
156 void Handle::Close() { | 149 void Handle::Close() { |
157 ScopedLock lock(this); | 150 MonitorLocker ml(monitor_); |
158 if (!IsClosing()) { | 151 if (!IsClosing()) { |
159 // Close the socket and set the closing state. This close method can be | 152 // Close the socket and set the closing state. This close method can be |
160 // called again if this socket has pending IO operations in flight. | 153 // called again if this socket has pending IO operations in flight. |
161 MarkClosing(); | 154 MarkClosing(); |
162 // Perform handle type specific closing. | 155 // Perform handle type specific closing. |
163 DoClose(); | 156 DoClose(); |
164 } | 157 } |
165 ASSERT(IsHandleClosed()); | 158 ASSERT(IsHandleClosed()); |
166 } | 159 } |
167 | 160 |
168 | 161 |
169 void Handle::DoClose() { | 162 void Handle::DoClose() { |
170 if (!IsHandleClosed()) { | 163 if (!IsHandleClosed()) { |
171 CloseHandle(handle_); | 164 CloseHandle(handle_); |
172 handle_ = INVALID_HANDLE_VALUE; | 165 handle_ = INVALID_HANDLE_VALUE; |
173 } | 166 } |
174 } | 167 } |
175 | 168 |
176 | 169 |
177 bool Handle::HasPendingRead() { | 170 bool Handle::HasPendingRead() { |
178 ScopedLock lock(this); | 171 MonitorLocker ml(monitor_); |
179 return pending_read_ != NULL; | 172 return pending_read_ != NULL; |
180 } | 173 } |
181 | 174 |
182 | 175 |
183 bool Handle::HasPendingWrite() { | 176 bool Handle::HasPendingWrite() { |
184 ScopedLock lock(this); | 177 MonitorLocker ml(monitor_); |
185 return pending_write_ != NULL; | 178 return pending_write_ != NULL; |
186 } | 179 } |
187 | 180 |
188 | 181 |
189 void Handle::ReadComplete(OverlappedBuffer* buffer) { | 182 void Handle::WaitForReadThreadFinished() { |
190 ScopedLock lock(this); | 183 // Join the Reader thread if there is one. |
191 // Currently only one outstanding read at the time. | 184 ThreadId to_join = Thread::kInvalidThreadId; |
192 ASSERT(pending_read_ == buffer); | 185 { |
193 ASSERT(data_ready_ == NULL); | 186 MonitorLocker ml(monitor_); |
194 if (!IsClosing() && !buffer->IsEmpty()) { | 187 if (read_thread_id_ != Thread::kInvalidThreadId) { |
195 data_ready_ = pending_read_; | 188 while (!read_thread_finished_) { |
196 } else { | 189 ml.Wait(); |
197 OverlappedBuffer::DisposeBuffer(buffer); | 190 } |
191 read_thread_finished_ = false; | |
192 to_join = read_thread_id_; | |
193 read_thread_id_ = Thread::kInvalidThreadId; | |
194 } | |
198 } | 195 } |
199 pending_read_ = NULL; | 196 if (to_join != Thread::kInvalidThreadId) { |
197 Thread::Join(to_join); | |
198 } | |
200 } | 199 } |
201 | 200 |
202 | 201 |
202 void Handle::ReadComplete(OverlappedBuffer* buffer) { | |
203 { | |
204 MonitorLocker ml(monitor_); | |
205 // Currently only one outstanding read at the time. | |
206 ASSERT(pending_read_ == buffer); | |
207 ASSERT(data_ready_ == NULL); | |
208 if (!IsClosing() && !buffer->IsEmpty()) { | |
209 data_ready_ = pending_read_; | |
210 } else { | |
211 OverlappedBuffer::DisposeBuffer(buffer); | |
212 } | |
213 pending_read_ = NULL; | |
214 } | |
215 WaitForReadThreadFinished(); | |
216 } | |
217 | |
218 | |
203 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { | 219 void Handle::RecvFromComplete(OverlappedBuffer* buffer) { |
204 ReadComplete(buffer); | 220 ReadComplete(buffer); |
205 } | 221 } |
206 | 222 |
207 | 223 |
208 void Handle::WriteComplete(OverlappedBuffer* buffer) { | 224 void Handle::WriteComplete(OverlappedBuffer* buffer) { |
209 ScopedLock lock(this); | 225 MonitorLocker ml(monitor_); |
210 // Currently only one outstanding write at the time. | 226 // Currently only one outstanding write at the time. |
211 ASSERT(pending_write_ == buffer); | 227 ASSERT(pending_write_ == buffer); |
212 OverlappedBuffer::DisposeBuffer(buffer); | 228 OverlappedBuffer::DisposeBuffer(buffer); |
213 pending_write_ = NULL; | 229 pending_write_ = NULL; |
214 } | 230 } |
215 | 231 |
216 | 232 |
217 static void ReadFileThread(uword args) { | 233 static void ReadFileThread(uword args) { |
218 Handle* handle = reinterpret_cast<Handle*>(args); | 234 Handle* handle = reinterpret_cast<Handle*>(args); |
219 handle->ReadSyncCompleteAsync(); | 235 handle->ReadSyncCompleteAsync(); |
220 } | 236 } |
221 | 237 |
222 | 238 |
223 void Handle::ReadSyncCompleteAsync() { | 239 void Handle::ReadSyncCompleteAsync() { |
240 NotifyReadThreadStarted(); | |
224 ASSERT(pending_read_ != NULL); | 241 ASSERT(pending_read_ != NULL); |
225 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); | 242 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize); |
226 | 243 |
227 DWORD buffer_size = pending_read_->GetBufferSize(); | 244 DWORD buffer_size = pending_read_->GetBufferSize(); |
228 if (GetFileType(handle_) == FILE_TYPE_CHAR) { | 245 if (GetFileType(handle_) == FILE_TYPE_CHAR) { |
229 buffer_size = kStdOverlappedBufferSize; | 246 buffer_size = kStdOverlappedBufferSize; |
230 } | 247 } |
248 char* buffer_start = pending_read_->GetBufferStart(); | |
231 DWORD bytes_read = 0; | 249 DWORD bytes_read = 0; |
232 BOOL ok = ReadFile(handle_, | 250 BOOL ok = ReadFile(handle_, |
233 pending_read_->GetBufferStart(), | 251 buffer_start, |
234 buffer_size, | 252 buffer_size, |
235 &bytes_read, | 253 &bytes_read, |
236 NULL); | 254 NULL); |
237 if (!ok) { | 255 if (!ok) { |
238 bytes_read = 0; | 256 bytes_read = 0; |
239 } | 257 } |
240 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); | 258 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped(); |
241 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), | 259 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), |
242 bytes_read, | 260 bytes_read, |
243 reinterpret_cast<ULONG_PTR>(this), | 261 reinterpret_cast<ULONG_PTR>(this), |
244 overlapped); | 262 overlapped); |
245 if (!ok) { | 263 if (!ok) { |
246 FATAL("PostQueuedCompletionStatus failed"); | 264 FATAL("PostQueuedCompletionStatus failed"); |
247 } | 265 } |
266 NotifyReadThreadFinished(); | |
267 } | |
268 | |
269 | |
270 void Handle::NotifyReadThreadStarted() { | |
271 MonitorLocker ml(monitor_); | |
272 ASSERT(!read_thread_starting_); | |
273 ASSERT(read_thread_id_ == Thread::kInvalidThreadId); | |
274 read_thread_id_ = Thread::GetCurrentThreadId(); | |
275 read_thread_starting_ = true; | |
276 ml.Notify(); | |
277 while (read_thread_starting_) { | |
278 ml.Wait(); | |
279 } | |
280 } | |
281 | |
282 void Handle::NotifyReadThreadFinished() { | |
283 MonitorLocker ml(monitor_); | |
284 ASSERT(!read_thread_finished_); | |
285 ASSERT(read_thread_id_ != Thread::kInvalidThreadId); | |
286 read_thread_finished_ = true; | |
287 ml.Notify(); | |
288 } | |
289 | |
290 | |
291 void Handle::WaitForReadThreadStarted() { | |
292 while (!read_thread_starting_) { | |
293 monitor_->Wait(Monitor::kNoTimeout); | |
294 } | |
295 read_thread_starting_ = false; | |
296 monitor_->Notify(); | |
297 ASSERT(read_thread_id_ != Thread::kInvalidThreadId); | |
248 } | 298 } |
249 | 299 |
250 | 300 |
251 bool Handle::IssueRead() { | 301 bool Handle::IssueRead() { |
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here.
zra
2015/08/27 14:04:51
The monitor is already entered at calls to non-soc
| |
252 ScopedLock lock(this); | |
253 ASSERT(type_ != kListenSocket); | 302 ASSERT(type_ != kListenSocket); |
254 ASSERT(pending_read_ == NULL); | 303 ASSERT(pending_read_ == NULL); |
255 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); | 304 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
256 if (SupportsOverlappedIO()) { | 305 if (SupportsOverlappedIO()) { |
257 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 306 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
258 | 307 |
259 BOOL ok = ReadFile(handle_, | 308 BOOL ok = ReadFile(handle_, |
260 buffer->GetBufferStart(), | 309 buffer->GetBufferStart(), |
261 buffer->GetBufferSize(), | 310 buffer->GetBufferSize(), |
262 NULL, | 311 NULL, |
263 buffer->GetCleanOverlapped()); | 312 buffer->GetCleanOverlapped()); |
264 if (ok || GetLastError() == ERROR_IO_PENDING) { | 313 if (ok || GetLastError() == ERROR_IO_PENDING) { |
265 // Completing asynchronously. | 314 // Completing asynchronously. |
266 pending_read_ = buffer; | 315 pending_read_ = buffer; |
267 return true; | 316 return true; |
268 } | 317 } |
269 OverlappedBuffer::DisposeBuffer(buffer); | 318 OverlappedBuffer::DisposeBuffer(buffer); |
270 HandleIssueError(); | 319 HandleIssueError(); |
271 return false; | 320 return false; |
272 } else { | 321 } else { |
273 // Completing asynchronously through thread. | 322 // Completing asynchronously through thread. |
274 pending_read_ = buffer; | 323 pending_read_ = buffer; |
275 int result = Thread::Start(ReadFileThread, | 324 int result = Thread::Start(ReadFileThread, |
276 reinterpret_cast<uword>(this)); | 325 reinterpret_cast<uword>(this)); |
277 if (result != 0) { | 326 if (result != 0) { |
278 FATAL1("Failed to start read file thread %d", result); | 327 FATAL1("Failed to start read file thread %d", result); |
279 } | 328 } |
329 WaitForReadThreadStarted(); | |
280 return true; | 330 return true; |
281 } | 331 } |
282 } | 332 } |
283 | 333 |
284 | 334 |
285 bool Handle::IssueRecvFrom() { | 335 bool Handle::IssueRecvFrom() { |
286 return false; | 336 return false; |
287 } | 337 } |
288 | 338 |
289 | 339 |
290 bool Handle::IssueWrite() { | 340 bool Handle::IssueWrite() { |
291 ScopedLock lock(this); | 341 MonitorLocker ml(monitor_); |
292 ASSERT(type_ != kListenSocket); | 342 ASSERT(type_ != kListenSocket); |
293 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 343 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
294 ASSERT(pending_write_ != NULL); | 344 ASSERT(pending_write_ != NULL); |
295 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); | 345 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
296 | 346 |
297 OverlappedBuffer* buffer = pending_write_; | 347 OverlappedBuffer* buffer = pending_write_; |
298 BOOL ok = WriteFile(handle_, | 348 BOOL ok = WriteFile(handle_, |
299 buffer->GetBufferStart(), | 349 buffer->GetBufferStart(), |
300 buffer->GetBufferSize(), | 350 buffer->GetBufferSize(), |
301 NULL, | 351 NULL, |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
338 if (error == ERROR_BROKEN_PIPE) { | 388 if (error == ERROR_BROKEN_PIPE) { |
339 HandleClosed(this); | 389 HandleClosed(this); |
340 } else { | 390 } else { |
341 HandleError(this); | 391 HandleError(this); |
342 } | 392 } |
343 SetLastError(error); | 393 SetLastError(error); |
344 } | 394 } |
345 | 395 |
346 | 396 |
347 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { | 397 void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) { |
348 ScopedLock lock(this); | 398 MonitorLocker ml(monitor_); |
349 event_handler_ = event_handler; | 399 event_handler_ = event_handler; |
350 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { | 400 if (SupportsOverlappedIO() && completion_port_ == INVALID_HANDLE_VALUE) { |
351 CreateCompletionPort(event_handler_->completion_port()); | 401 CreateCompletionPort(event_handler_->completion_port()); |
352 } | 402 } |
353 } | 403 } |
354 | 404 |
355 | 405 |
356 bool FileHandle::IsClosed() { | 406 bool FileHandle::IsClosed() { |
357 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 407 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
358 } | 408 } |
359 | 409 |
360 | 410 |
361 void DirectoryWatchHandle::EnsureInitialized( | 411 void DirectoryWatchHandle::EnsureInitialized( |
362 EventHandlerImplementation* event_handler) { | 412 EventHandlerImplementation* event_handler) { |
363 ScopedLock lock(this); | 413 MonitorLocker ml(monitor_); |
364 event_handler_ = event_handler; | 414 event_handler_ = event_handler; |
365 if (completion_port_ == INVALID_HANDLE_VALUE) { | 415 if (completion_port_ == INVALID_HANDLE_VALUE) { |
366 CreateCompletionPort(event_handler_->completion_port()); | 416 CreateCompletionPort(event_handler_->completion_port()); |
367 } | 417 } |
368 } | 418 } |
369 | 419 |
370 | 420 |
371 bool DirectoryWatchHandle::IsClosed() { | 421 bool DirectoryWatchHandle::IsClosed() { |
372 return IsClosing() && pending_read_ == NULL; | 422 return IsClosing() && pending_read_ == NULL; |
373 } | 423 } |
374 | 424 |
375 | 425 |
376 bool DirectoryWatchHandle::IssueRead() { | 426 bool DirectoryWatchHandle::IssueRead() { |
Søren Gjesse
2015/08/27 13:28:24
MonitorLocker ml(monitor_); here as well.
zra
2015/08/27 14:04:51
Ditto.
| |
377 ScopedLock lock(this); | |
378 // It may have been started before, as we start the directory-handler when | 427 // It may have been started before, as we start the directory-handler when |
379 // we create it. | 428 // we create it. |
380 if (pending_read_ != NULL || data_ready_ != NULL) return true; | 429 if (pending_read_ != NULL || data_ready_ != NULL) return true; |
381 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); | 430 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize); |
382 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 431 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
383 BOOL ok = ReadDirectoryChangesW(handle_, | 432 BOOL ok = ReadDirectoryChangesW(handle_, |
384 buffer->GetBufferStart(), | 433 buffer->GetBufferStart(), |
385 buffer->GetBufferSize(), | 434 buffer->GetBufferSize(), |
386 recursive_, | 435 recursive_, |
387 events_, | 436 events_, |
388 NULL, | 437 NULL, |
389 buffer->GetCleanOverlapped(), | 438 buffer->GetCleanOverlapped(), |
390 NULL); | 439 NULL); |
391 if (ok || GetLastError() == ERROR_IO_PENDING) { | 440 if (ok || GetLastError() == ERROR_IO_PENDING) { |
392 // Completing asynchronously. | 441 // Completing asynchronously. |
393 pending_read_ = buffer; | 442 pending_read_ = buffer; |
394 return true; | 443 return true; |
395 } | 444 } |
396 OverlappedBuffer::DisposeBuffer(buffer); | 445 OverlappedBuffer::DisposeBuffer(buffer); |
397 return false; | 446 return false; |
398 } | 447 } |
399 | 448 |
400 | 449 |
401 void DirectoryWatchHandle::Stop() { | 450 void DirectoryWatchHandle::Stop() { |
402 ScopedLock lock(this); | 451 MonitorLocker ml(monitor_); |
403 // Stop the outstanding read, so we can close the handle. | 452 // Stop the outstanding read, so we can close the handle. |
404 | 453 |
405 if (pending_read_ != NULL) { | 454 if (pending_read_ != NULL) { |
406 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); | 455 CancelIoEx(handle(), pending_read_->GetCleanOverlapped()); |
407 // Don't dispose of the buffer, as it will still complete (with length 0). | 456 // Don't dispose of the buffer, as it will still complete (with length 0). |
408 } | 457 } |
409 | 458 |
410 DoClose(); | 459 DoClose(); |
411 } | 460 } |
412 | 461 |
(...skipping 23 matching lines...) Expand all Loading... | |
436 NULL, | 485 NULL, |
437 NULL); | 486 NULL); |
438 if (status == SOCKET_ERROR) { | 487 if (status == SOCKET_ERROR) { |
439 return false; | 488 return false; |
440 } | 489 } |
441 return true; | 490 return true; |
442 } | 491 } |
443 | 492 |
444 | 493 |
445 bool ListenSocket::IssueAccept() { | 494 bool ListenSocket::IssueAccept() { |
446 ScopedLock lock(this); | 495 MonitorLocker ml(monitor_); |
447 | 496 |
448 // For AcceptEx there needs to be buffer storage for address | 497 // For AcceptEx there needs to be buffer storage for address |
449 // information for two addresses (local and remote address). The | 498 // information for two addresses (local and remote address). The |
450 // AcceptEx documentation says: "This value must be at least 16 | 499 // AcceptEx documentation says: "This value must be at least 16 |
451 // bytes more than the maximum address length for the transport | 500 // bytes more than the maximum address length for the transport |
452 // protocol in use." | 501 // protocol in use." |
453 static const int kAcceptExAddressAdditionalBytes = 16; | 502 static const int kAcceptExAddressAdditionalBytes = 16; |
454 static const int kAcceptExAddressStorageSize = | 503 static const int kAcceptExAddressStorageSize = |
455 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; | 504 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes; |
456 OverlappedBuffer* buffer = | 505 OverlappedBuffer* buffer = |
(...skipping 19 matching lines...) Expand all Loading... | |
476 } | 525 } |
477 | 526 |
478 pending_accept_count_++; | 527 pending_accept_count_++; |
479 | 528 |
480 return true; | 529 return true; |
481 } | 530 } |
482 | 531 |
483 | 532 |
484 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, | 533 void ListenSocket::AcceptComplete(OverlappedBuffer* buffer, |
485 HANDLE completion_port) { | 534 HANDLE completion_port) { |
486 ScopedLock lock(this); | 535 MonitorLocker ml(monitor_); |
487 if (!IsClosing()) { | 536 if (!IsClosing()) { |
488 // Update the accepted socket to support the full range of API calls. | 537 // Update the accepted socket to support the full range of API calls. |
489 SOCKET s = socket(); | 538 SOCKET s = socket(); |
490 int rc = setsockopt(buffer->client(), | 539 int rc = setsockopt(buffer->client(), |
491 SOL_SOCKET, | 540 SOL_SOCKET, |
492 SO_UPDATE_ACCEPT_CONTEXT, | 541 SO_UPDATE_ACCEPT_CONTEXT, |
493 reinterpret_cast<char*>(&s), sizeof(s)); | 542 reinterpret_cast<char*>(&s), sizeof(s)); |
494 if (rc == NO_ERROR) { | 543 if (rc == NO_ERROR) { |
495 // Insert the accepted socket into the list. | 544 // Insert the accepted socket into the list. |
496 ClientSocket* client_socket = new ClientSocket(buffer->client()); | 545 ClientSocket* client_socket = new ClientSocket(buffer->client()); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
537 client->Close(); | 586 client->Close(); |
538 DeleteIfClosed(client); | 587 DeleteIfClosed(client); |
539 } else { | 588 } else { |
540 break; | 589 break; |
541 } | 590 } |
542 } | 591 } |
543 } | 592 } |
544 | 593 |
545 | 594 |
546 bool ListenSocket::CanAccept() { | 595 bool ListenSocket::CanAccept() { |
547 ScopedLock lock(this); | 596 MonitorLocker ml(monitor_); |
548 return accepted_head_ != NULL; | 597 return accepted_head_ != NULL; |
549 } | 598 } |
550 | 599 |
551 | 600 |
552 ClientSocket* ListenSocket::Accept() { | 601 ClientSocket* ListenSocket::Accept() { |
553 ScopedLock lock(this); | 602 MonitorLocker ml(monitor_); |
554 | 603 |
555 ClientSocket *result = NULL; | 604 ClientSocket *result = NULL; |
556 | 605 |
557 if (accepted_head_ != NULL) { | 606 if (accepted_head_ != NULL) { |
558 result = accepted_head_; | 607 result = accepted_head_; |
559 accepted_head_ = accepted_head_->next(); | 608 accepted_head_ = accepted_head_->next(); |
560 if (accepted_head_ == NULL) accepted_tail_ = NULL; | 609 if (accepted_head_ == NULL) accepted_tail_ = NULL; |
561 result->set_next(NULL); | 610 result->set_next(NULL); |
562 accepted_count_--; | 611 accepted_count_--; |
563 } | 612 } |
564 | 613 |
565 if (pending_accept_count_ < 5) { | 614 if (pending_accept_count_ < 5) { |
566 // We have less than 5 pending accepts, queue another. | 615 // We have less than 5 pending accepts, queue another. |
567 if (!IsClosing()) { | 616 if (!IsClosing()) { |
568 if (!IssueAccept()) { | 617 if (!IssueAccept()) { |
569 HandleError(this); | 618 HandleError(this); |
570 } | 619 } |
571 } | 620 } |
572 } | 621 } |
573 | 622 |
574 return result; | 623 return result; |
575 } | 624 } |
576 | 625 |
577 | 626 |
578 void ListenSocket::EnsureInitialized( | 627 void ListenSocket::EnsureInitialized( |
579 EventHandlerImplementation* event_handler) { | 628 EventHandlerImplementation* event_handler) { |
580 ScopedLock lock(this); | 629 MonitorLocker ml(monitor_); |
581 if (AcceptEx_ == NULL) { | 630 if (AcceptEx_ == NULL) { |
582 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); | 631 ASSERT(completion_port_ == INVALID_HANDLE_VALUE); |
583 ASSERT(event_handler_ == NULL); | 632 ASSERT(event_handler_ == NULL); |
584 event_handler_ = event_handler; | 633 event_handler_ = event_handler; |
585 CreateCompletionPort(event_handler_->completion_port()); | 634 CreateCompletionPort(event_handler_->completion_port()); |
586 LoadAcceptEx(); | 635 LoadAcceptEx(); |
587 } | 636 } |
588 } | 637 } |
589 | 638 |
590 | 639 |
591 bool ListenSocket::IsClosed() { | 640 bool ListenSocket::IsClosed() { |
592 return IsClosing() && !HasPendingAccept(); | 641 return IsClosing() && !HasPendingAccept(); |
593 } | 642 } |
594 | 643 |
595 | 644 |
596 intptr_t Handle::Available() { | 645 intptr_t Handle::Available() { |
597 ScopedLock lock(this); | 646 MonitorLocker ml(monitor_); |
598 if (data_ready_ == NULL) return 0; | 647 if (data_ready_ == NULL) return 0; |
599 ASSERT(!data_ready_->IsEmpty()); | 648 ASSERT(!data_ready_->IsEmpty()); |
600 return data_ready_->GetRemainingLength(); | 649 return data_ready_->GetRemainingLength(); |
601 } | 650 } |
602 | 651 |
603 | 652 |
604 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { | 653 intptr_t Handle::Read(void* buffer, intptr_t num_bytes) { |
605 ScopedLock lock(this); | 654 MonitorLocker ml(monitor_); |
606 if (data_ready_ == NULL) return 0; | 655 if (data_ready_ == NULL) return 0; |
607 num_bytes = data_ready_->Read( | 656 num_bytes = data_ready_->Read( |
608 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); | 657 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
609 if (data_ready_->IsEmpty()) { | 658 if (data_ready_->IsEmpty()) { |
610 OverlappedBuffer::DisposeBuffer(data_ready_); | 659 OverlappedBuffer::DisposeBuffer(data_ready_); |
611 data_ready_ = NULL; | 660 data_ready_ = NULL; |
612 if (!IsClosing() && !IsClosedRead()) IssueRead(); | 661 if (!IsClosing() && !IsClosedRead()) IssueRead(); |
613 } | 662 } |
614 return num_bytes; | 663 return num_bytes; |
615 } | 664 } |
616 | 665 |
617 | 666 |
618 intptr_t Handle::RecvFrom( | 667 intptr_t Handle::RecvFrom( |
619 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { | 668 void* buffer, intptr_t num_bytes, struct sockaddr* sa, socklen_t sa_len) { |
620 ScopedLock lock(this); | 669 MonitorLocker ml(monitor_); |
621 if (data_ready_ == NULL) return 0; | 670 if (data_ready_ == NULL) return 0; |
622 num_bytes = data_ready_->Read( | 671 num_bytes = data_ready_->Read( |
623 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); | 672 buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX)); |
624 if (data_ready_->from()->sa_family == AF_INET) { | 673 if (data_ready_->from()->sa_family == AF_INET) { |
625 ASSERT(sa_len >= sizeof(struct sockaddr_in)); | 674 ASSERT(sa_len >= sizeof(struct sockaddr_in)); |
626 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); | 675 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in)); |
627 } else { | 676 } else { |
628 ASSERT(data_ready_->from()->sa_family == AF_INET6); | 677 ASSERT(data_ready_->from()->sa_family == AF_INET6); |
629 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); | 678 ASSERT(sa_len >= sizeof(struct sockaddr_in6)); |
630 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); | 679 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6)); |
631 } | 680 } |
632 // Always dispose of the buffer, as UDP messages must be read in their | 681 // Always dispose of the buffer, as UDP messages must be read in their |
633 // entirety to match how recvfrom works in a socket. | 682 // entirety to match how recvfrom works in a socket. |
634 OverlappedBuffer::DisposeBuffer(data_ready_); | 683 OverlappedBuffer::DisposeBuffer(data_ready_); |
635 data_ready_ = NULL; | 684 data_ready_ = NULL; |
636 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); | 685 if (!IsClosing() && !IsClosedRead()) IssueRecvFrom(); |
637 return num_bytes; | 686 return num_bytes; |
638 } | 687 } |
639 | 688 |
640 | 689 |
641 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { | 690 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) { |
642 ScopedLock lock(this); | 691 MonitorLocker ml(monitor_); |
643 if (pending_write_ != NULL) return 0; | 692 if (pending_write_ != NULL) return 0; |
644 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 693 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
645 ASSERT(SupportsOverlappedIO()); | 694 ASSERT(SupportsOverlappedIO()); |
646 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; | 695 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; |
647 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); | 696 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
648 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); | 697 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
649 pending_write_->Write(buffer, truncated_bytes); | 698 pending_write_->Write(buffer, truncated_bytes); |
650 if (!IssueWrite()) return -1; | 699 if (!IssueWrite()) return -1; |
651 return truncated_bytes; | 700 return truncated_bytes; |
652 } | 701 } |
653 | 702 |
654 | 703 |
655 intptr_t Handle::SendTo(const void* buffer, | 704 intptr_t Handle::SendTo(const void* buffer, |
656 intptr_t num_bytes, | 705 intptr_t num_bytes, |
657 struct sockaddr* sa, | 706 struct sockaddr* sa, |
658 socklen_t sa_len) { | 707 socklen_t sa_len) { |
659 ScopedLock lock(this); | 708 MonitorLocker ml(monitor_); |
660 if (pending_write_ != NULL) return 0; | 709 if (pending_write_ != NULL) return 0; |
661 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 710 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
662 ASSERT(SupportsOverlappedIO()); | 711 ASSERT(SupportsOverlappedIO()); |
663 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; | 712 if (completion_port_ == INVALID_HANDLE_VALUE) return 0; |
664 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); | 713 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes); |
665 pending_write_->Write(buffer, num_bytes); | 714 pending_write_->Write(buffer, num_bytes); |
666 if (!IssueSendTo(sa, sa_len)) return -1; | 715 if (!IssueSendTo(sa, sa_len)) return -1; |
667 return num_bytes; | 716 return num_bytes; |
668 } | 717 } |
669 | 718 |
670 | 719 |
671 static void WriteFileThread(uword args) { | 720 static void WriteFileThread(uword args) { |
672 StdHandle* handle = reinterpret_cast<StdHandle*>(args); | 721 StdHandle* handle = reinterpret_cast<StdHandle*>(args); |
673 handle->RunWriteLoop(); | 722 handle->RunWriteLoop(); |
674 } | 723 } |
675 | 724 |
676 | 725 |
677 void StdHandle::RunWriteLoop() { | 726 void StdHandle::RunWriteLoop() { |
678 write_monitor_->Enter(); | 727 MonitorLocker ml(monitor_); |
679 write_thread_running_ = true; | 728 write_thread_running_ = true; |
729 thread_id_ = Thread::GetCurrentThreadId(); | |
680 // Notify we have started. | 730 // Notify we have started. |
681 write_monitor_->Notify(); | 731 ml.Notify(); |
682 | 732 |
683 while (write_thread_running_) { | 733 while (write_thread_running_) { |
684 write_monitor_->Wait(Monitor::kNoTimeout); | 734 ml.Wait(Monitor::kNoTimeout); |
685 if (pending_write_ != NULL) { | 735 if (pending_write_ != NULL) { |
686 // We woke up and had a pending write. Execute it. | 736 // We woke up and had a pending write. Execute it. |
687 WriteSyncCompleteAsync(); | 737 WriteSyncCompleteAsync(); |
688 } | 738 } |
689 } | 739 } |
690 | 740 |
691 write_thread_exists_ = false; | 741 write_thread_exists_ = false; |
692 write_monitor_->Notify(); | 742 ml.Notify(); |
693 write_monitor_->Exit(); | |
694 } | 743 } |
695 | 744 |
696 | 745 |
697 void StdHandle::WriteSyncCompleteAsync() { | 746 void StdHandle::WriteSyncCompleteAsync() { |
698 ASSERT(pending_write_ != NULL); | 747 ASSERT(pending_write_ != NULL); |
699 | 748 |
700 DWORD bytes_written = -1; | 749 DWORD bytes_written = -1; |
701 BOOL ok = WriteFile(handle_, | 750 BOOL ok = WriteFile(handle_, |
702 pending_write_->GetBufferStart(), | 751 pending_write_->GetBufferStart(), |
703 pending_write_->GetBufferSize(), | 752 pending_write_->GetBufferSize(), |
704 &bytes_written, | 753 &bytes_written, |
705 NULL); | 754 NULL); |
706 if (!ok) { | 755 if (!ok) { |
707 bytes_written = 0; | 756 bytes_written = 0; |
708 } | 757 } |
709 thread_wrote_ += bytes_written; | 758 thread_wrote_ += bytes_written; |
710 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); | 759 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped(); |
711 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), | 760 ok = PostQueuedCompletionStatus(event_handler_->completion_port(), |
712 bytes_written, | 761 bytes_written, |
713 reinterpret_cast<ULONG_PTR>(this), | 762 reinterpret_cast<ULONG_PTR>(this), |
714 overlapped); | 763 overlapped); |
715 if (!ok) { | 764 if (!ok) { |
716 FATAL("PostQueuedCompletionStatus failed"); | 765 FATAL("PostQueuedCompletionStatus failed"); |
717 } | 766 } |
718 } | 767 } |
719 | 768 |
720 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { | 769 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) { |
721 ScopedLock lock(this); | 770 MonitorLocker ml(monitor_); |
722 if (pending_write_ != NULL) return 0; | 771 if (pending_write_ != NULL) return 0; |
723 if (num_bytes > kBufferSize) num_bytes = kBufferSize; | 772 if (num_bytes > kBufferSize) num_bytes = kBufferSize; |
724 // In the case of stdout and stderr, OverlappedIO is not supported. | 773 // In the case of stdout and stderr, OverlappedIO is not supported. |
725 // Here we'll instead use a thread, to make it async. | 774 // Here we'll instead use a thread, to make it async. |
726 // This code is actually never exposed to the user, as stdout and stderr is | 775 // This code is actually never exposed to the user, as stdout and stderr is |
727 // not available as a RawSocket, but only wrapped in a Socket. | 776 // not available as a RawSocket, but only wrapped in a Socket. |
728 // Note that we return '0', unless a thread have already completed a write. | 777 // Note that we return '0', unless a thread have already completed a write. |
729 MonitorLocker locker(write_monitor_); | |
730 if (thread_wrote_ > 0) { | 778 if (thread_wrote_ > 0) { |
731 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; | 779 if (num_bytes > thread_wrote_) num_bytes = thread_wrote_; |
732 thread_wrote_ -= num_bytes; | 780 thread_wrote_ -= num_bytes; |
733 return num_bytes; | 781 return num_bytes; |
734 } | 782 } |
735 if (!write_thread_exists_) { | 783 if (!write_thread_exists_) { |
736 write_thread_exists_ = true; | 784 write_thread_exists_ = true; |
737 int result = Thread::Start(WriteFileThread, | 785 int result = Thread::Start( |
738 reinterpret_cast<uword>(this)); | 786 WriteFileThread, reinterpret_cast<uword>(this)); |
739 if (result != 0) { | 787 if (result != 0) { |
740 FATAL1("Failed to start write file thread %d", result); | 788 FATAL1("Failed to start write file thread %d", result); |
741 } | 789 } |
742 while (!write_thread_running_) { | 790 while (!write_thread_running_) { |
743 // Wait until we the thread is running. | 791 // Wait until we the thread is running. |
744 locker.Wait(Monitor::kNoTimeout); | 792 ml.Wait(Monitor::kNoTimeout); |
745 } | 793 } |
746 } | 794 } |
747 // Only queue up to INT_MAX bytes. | 795 // Only queue up to INT_MAX bytes. |
748 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); | 796 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX); |
749 // Create buffer and notify thread about the new handle. | 797 // Create buffer and notify thread about the new handle. |
750 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); | 798 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes); |
751 pending_write_->Write(buffer, truncated_bytes); | 799 pending_write_->Write(buffer, truncated_bytes); |
752 locker.Notify(); | 800 ml.Notify(); |
753 return 0; | 801 return 0; |
754 } | 802 } |
755 | 803 |
756 | 804 |
757 void StdHandle::DoClose() { | 805 void StdHandle::DoClose() { |
758 MonitorLocker locker(write_monitor_); | 806 MonitorLocker ml(monitor_); |
759 if (write_thread_exists_) { | 807 if (write_thread_exists_) { |
760 write_thread_running_ = false; | 808 write_thread_running_ = false; |
761 locker.Notify(); | 809 ml.Notify(); |
762 while (write_thread_exists_) { | 810 while (write_thread_exists_) { |
763 locker.Wait(Monitor::kNoTimeout); | 811 ml.Wait(Monitor::kNoTimeout); |
764 } | 812 } |
813 Thread::Join(thread_id_); | |
765 } | 814 } |
766 Handle::DoClose(); | 815 Handle::DoClose(); |
767 } | 816 } |
768 | 817 |
769 | 818 |
770 bool ClientSocket::LoadDisconnectEx() { | 819 bool ClientSocket::LoadDisconnectEx() { |
771 // Load the DisconnectEx function into memory using WSAIoctl. | 820 // Load the DisconnectEx function into memory using WSAIoctl. |
772 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; | 821 GUID guid_disconnect_ex = WSAID_DISCONNECTEX; |
773 DWORD bytes; | 822 DWORD bytes; |
774 int status = WSAIoctl(socket(), | 823 int status = WSAIoctl(socket(), |
(...skipping 25 matching lines...) Expand all Loading... | |
800 | 849 |
801 void ClientSocket::DoClose() { | 850 void ClientSocket::DoClose() { |
802 // Always do a shutdown before initiating a disconnect. | 851 // Always do a shutdown before initiating a disconnect. |
803 shutdown(socket(), SD_BOTH); | 852 shutdown(socket(), SD_BOTH); |
804 IssueDisconnect(); | 853 IssueDisconnect(); |
805 handle_ = INVALID_HANDLE_VALUE; | 854 handle_ = INVALID_HANDLE_VALUE; |
806 } | 855 } |
807 | 856 |
808 | 857 |
809 bool ClientSocket::IssueRead() { | 858 bool ClientSocket::IssueRead() { |
810 ScopedLock lock(this); | 859 MonitorLocker ml(monitor_); |
811 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 860 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
812 ASSERT(pending_read_ == NULL); | 861 ASSERT(pending_read_ == NULL); |
813 | 862 |
814 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can | 863 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can |
815 // handle 64k datagrams. | 864 // handle 64k datagrams. |
816 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); | 865 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536); |
817 | 866 |
818 DWORD flags; | 867 DWORD flags; |
819 flags = 0; | 868 flags = 0; |
820 int rc = WSARecv(socket(), | 869 int rc = WSARecv(socket(), |
821 buffer->GetWASBUF(), | 870 buffer->GetWASBUF(), |
822 1, | 871 1, |
823 NULL, | 872 NULL, |
824 &flags, | 873 &flags, |
825 buffer->GetCleanOverlapped(), | 874 buffer->GetCleanOverlapped(), |
826 NULL); | 875 NULL); |
827 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 876 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
828 pending_read_ = buffer; | 877 pending_read_ = buffer; |
829 return true; | 878 return true; |
830 } | 879 } |
831 OverlappedBuffer::DisposeBuffer(buffer); | 880 OverlappedBuffer::DisposeBuffer(buffer); |
832 pending_read_ = NULL; | 881 pending_read_ = NULL; |
833 HandleIssueError(); | 882 HandleIssueError(); |
834 return false; | 883 return false; |
835 } | 884 } |
836 | 885 |
837 | 886 |
838 bool ClientSocket::IssueWrite() { | 887 bool ClientSocket::IssueWrite() { |
839 ScopedLock lock(this); | 888 MonitorLocker ml(monitor_); |
840 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 889 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
841 ASSERT(pending_write_ != NULL); | 890 ASSERT(pending_write_ != NULL); |
842 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); | 891 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite); |
843 | 892 |
844 int rc = WSASend(socket(), | 893 int rc = WSASend(socket(), |
845 pending_write_->GetWASBUF(), | 894 pending_write_->GetWASBUF(), |
846 1, | 895 1, |
847 NULL, | 896 NULL, |
848 0, | 897 0, |
849 pending_write_->GetCleanOverlapped(), | 898 pending_write_->GetCleanOverlapped(), |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
893 } | 942 } |
894 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { | 943 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) { |
895 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); | 944 Dart_Port port = NextNotifyDartPort(1 << kOutEvent); |
896 DartUtils::PostInt32(port, 1 << kOutEvent); | 945 DartUtils::PostInt32(port, 1 << kOutEvent); |
897 } | 946 } |
898 } | 947 } |
899 | 948 |
900 | 949 |
901 void ClientSocket::EnsureInitialized( | 950 void ClientSocket::EnsureInitialized( |
902 EventHandlerImplementation* event_handler) { | 951 EventHandlerImplementation* event_handler) { |
903 ScopedLock lock(this); | 952 MonitorLocker ml(monitor_); |
904 if (completion_port_ == INVALID_HANDLE_VALUE) { | 953 if (completion_port_ == INVALID_HANDLE_VALUE) { |
905 ASSERT(event_handler_ == NULL); | 954 ASSERT(event_handler_ == NULL); |
906 event_handler_ = event_handler; | 955 event_handler_ = event_handler; |
907 CreateCompletionPort(event_handler_->completion_port()); | 956 CreateCompletionPort(event_handler_->completion_port()); |
908 } | 957 } |
909 } | 958 } |
910 | 959 |
911 | 960 |
912 bool ClientSocket::IsClosed() { | 961 bool ClientSocket::IsClosed() { |
913 return connected_ && closed_; | 962 return connected_ && closed_; |
914 } | 963 } |
915 | 964 |
916 | 965 |
917 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { | 966 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) { |
918 ScopedLock lock(this); | 967 MonitorLocker ml(monitor_); |
919 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 968 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
920 ASSERT(pending_write_ != NULL); | 969 ASSERT(pending_write_ != NULL); |
921 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); | 970 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo); |
922 | 971 |
923 int rc = WSASendTo(socket(), | 972 int rc = WSASendTo(socket(), |
924 pending_write_->GetWASBUF(), | 973 pending_write_->GetWASBUF(), |
925 1, | 974 1, |
926 NULL, | 975 NULL, |
927 0, | 976 0, |
928 sa, | 977 sa, |
929 sa_len, | 978 sa_len, |
930 pending_write_->GetCleanOverlapped(), | 979 pending_write_->GetCleanOverlapped(), |
931 NULL); | 980 NULL); |
932 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 981 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
933 return true; | 982 return true; |
934 } | 983 } |
935 OverlappedBuffer::DisposeBuffer(pending_write_); | 984 OverlappedBuffer::DisposeBuffer(pending_write_); |
936 pending_write_ = NULL; | 985 pending_write_ = NULL; |
937 HandleIssueError(); | 986 HandleIssueError(); |
938 return false; | 987 return false; |
939 } | 988 } |
940 | 989 |
941 | 990 |
942 bool DatagramSocket::IssueRecvFrom() { | 991 bool DatagramSocket::IssueRecvFrom() { |
943 ScopedLock lock(this); | 992 MonitorLocker ml(monitor_); |
944 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); | 993 ASSERT(completion_port_ != INVALID_HANDLE_VALUE); |
945 ASSERT(pending_read_ == NULL); | 994 ASSERT(pending_read_ == NULL); |
946 | 995 |
947 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); | 996 OverlappedBuffer* buffer = OverlappedBuffer::AllocateRecvFromBuffer(1024); |
948 | 997 |
949 DWORD flags; | 998 DWORD flags; |
950 flags = 0; | 999 flags = 0; |
951 int rc = WSARecvFrom(socket(), | 1000 int rc = WSARecvFrom(socket(), |
952 buffer->GetWASBUF(), | 1001 buffer->GetWASBUF(), |
953 1, | 1002 1, |
954 NULL, | 1003 NULL, |
955 &flags, | 1004 &flags, |
956 buffer->from(), | 1005 buffer->from(), |
957 buffer->from_len_addr(), | 1006 buffer->from_len_addr(), |
958 buffer->GetCleanOverlapped(), | 1007 buffer->GetCleanOverlapped(), |
959 NULL); | 1008 NULL); |
960 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { | 1009 if (rc == NO_ERROR || WSAGetLastError() == WSA_IO_PENDING) { |
961 pending_read_ = buffer; | 1010 pending_read_ = buffer; |
962 return true; | 1011 return true; |
963 } | 1012 } |
964 OverlappedBuffer::DisposeBuffer(buffer); | 1013 OverlappedBuffer::DisposeBuffer(buffer); |
965 pending_read_ = NULL; | 1014 pending_read_ = NULL; |
966 HandleIssueError(); | 1015 HandleIssueError(); |
967 return false; | 1016 return false; |
968 } | 1017 } |
969 | 1018 |
970 | 1019 |
971 void DatagramSocket::EnsureInitialized( | 1020 void DatagramSocket::EnsureInitialized( |
972 EventHandlerImplementation* event_handler) { | 1021 EventHandlerImplementation* event_handler) { |
973 ScopedLock lock(this); | 1022 MonitorLocker ml(monitor_); |
974 if (completion_port_ == INVALID_HANDLE_VALUE) { | 1023 if (completion_port_ == INVALID_HANDLE_VALUE) { |
975 ASSERT(event_handler_ == NULL); | 1024 ASSERT(event_handler_ == NULL); |
976 event_handler_ = event_handler; | 1025 event_handler_ = event_handler; |
977 CreateCompletionPort(event_handler_->completion_port()); | 1026 CreateCompletionPort(event_handler_->completion_port()); |
978 } | 1027 } |
979 } | 1028 } |
980 | 1029 |
981 | 1030 |
982 bool DatagramSocket::IsClosed() { | 1031 bool DatagramSocket::IsClosed() { |
983 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); | 1032 return IsClosing() && !HasPendingRead() && !HasPendingWrite(); |
(...skipping 19 matching lines...) Expand all Loading... | |
1003 shutdown_ = true; | 1052 shutdown_ = true; |
1004 } else { | 1053 } else { |
1005 Handle* handle = reinterpret_cast<Handle*>(msg->id); | 1054 Handle* handle = reinterpret_cast<Handle*>(msg->id); |
1006 ASSERT(handle != NULL); | 1055 ASSERT(handle != NULL); |
1007 | 1056 |
1008 if (handle->is_listen_socket()) { | 1057 if (handle->is_listen_socket()) { |
1009 ListenSocket* listen_socket = | 1058 ListenSocket* listen_socket = |
1010 reinterpret_cast<ListenSocket*>(handle); | 1059 reinterpret_cast<ListenSocket*>(handle); |
1011 listen_socket->EnsureInitialized(this); | 1060 listen_socket->EnsureInitialized(this); |
1012 | 1061 |
1013 Handle::ScopedLock lock(listen_socket); | 1062 MonitorLocker ml(listen_socket->monitor_); |
1014 | 1063 |
1015 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1064 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1016 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1065 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1017 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1066 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1018 // `events` can only have kInEvent/kOutEvent flags set. | 1067 // `events` can only have kInEvent/kOutEvent flags set. |
1019 intptr_t events = msg->data & EVENT_MASK; | 1068 intptr_t events = msg->data & EVENT_MASK; |
1020 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1069 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1021 listen_socket->SetPortAndMask(msg->dart_port, events); | 1070 listen_socket->SetPortAndMask(msg->dart_port, events); |
1022 TryDispatchingPendingAccepts(listen_socket); | 1071 TryDispatchingPendingAccepts(listen_socket); |
1023 } else if (IS_COMMAND(msg->data, kCloseCommand)) { | 1072 } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
1024 listen_socket->RemovePort(msg->dart_port); | 1073 listen_socket->RemovePort(msg->dart_port); |
1025 | 1074 |
1026 // We only close the socket file descriptor from the operating | 1075 // We only close the socket file descriptor from the operating |
1027 // system if there are no other dart socket objects which | 1076 // system if there are no other dart socket objects which |
1028 // are listening on the same (address, port) combination. | 1077 // are listening on the same (address, port) combination. |
1029 ListeningSocketRegistry *registry = | 1078 ListeningSocketRegistry *registry = |
1030 ListeningSocketRegistry::Instance(); | 1079 ListeningSocketRegistry::Instance(); |
1031 MutexLocker locker(registry->mutex()); | 1080 MutexLocker locker(registry->mutex()); |
1032 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { | 1081 if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) { |
1033 ASSERT(listen_socket->Mask() == 0); | 1082 ASSERT(listen_socket->Mask() == 0); |
1034 listen_socket->Close(); | 1083 listen_socket->Close(); |
1035 } | 1084 } |
1036 | 1085 |
1037 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); | 1086 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent); |
1038 } else { | 1087 } else { |
1039 UNREACHABLE(); | 1088 UNREACHABLE(); |
1040 } | 1089 } |
1041 } else { | 1090 } else { |
1042 handle->EnsureInitialized(this); | 1091 handle->EnsureInitialized(this); |
1043 Handle::ScopedLock lock(handle); | 1092 MonitorLocker ml(handle->monitor_); |
1044 | 1093 |
1045 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { | 1094 if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
1046 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); | 1095 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data)); |
1047 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { | 1096 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
1048 // `events` can only have kInEvent/kOutEvent flags set. | 1097 // `events` can only have kInEvent/kOutEvent flags set. |
1049 intptr_t events = msg->data & EVENT_MASK; | 1098 intptr_t events = msg->data & EVENT_MASK; |
1050 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); | 1099 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
1051 | 1100 |
1052 handle->SetPortAndMask(msg->dart_port, events); | 1101 handle->SetPortAndMask(msg->dart_port, events); |
1053 | 1102 |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1107 DeleteIfClosed(handle); | 1156 DeleteIfClosed(handle); |
1108 } | 1157 } |
1109 } | 1158 } |
1110 | 1159 |
1111 | 1160 |
1112 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, | 1161 void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket, |
1113 OverlappedBuffer* buffer) { | 1162 OverlappedBuffer* buffer) { |
1114 listen_socket->AcceptComplete(buffer, completion_port_); | 1163 listen_socket->AcceptComplete(buffer, completion_port_); |
1115 | 1164 |
1116 { | 1165 { |
1117 Handle::ScopedLock lock(listen_socket); | 1166 MonitorLocker ml(listen_socket->monitor_); |
1118 TryDispatchingPendingAccepts(listen_socket); | 1167 TryDispatchingPendingAccepts(listen_socket); |
1119 } | 1168 } |
1120 | 1169 |
1121 DeleteIfClosed(listen_socket); | 1170 DeleteIfClosed(listen_socket); |
1122 } | 1171 } |
1123 | 1172 |
1124 | 1173 |
1125 void EventHandlerImplementation::TryDispatchingPendingAccepts( | 1174 void EventHandlerImplementation::TryDispatchingPendingAccepts( |
1126 ListenSocket *listen_socket) { | 1175 ListenSocket *listen_socket) { |
1127 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { | 1176 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) { |
(...skipping 143 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1271 HandleConnect(client_socket, bytes, buffer); | 1320 HandleConnect(client_socket, bytes, buffer); |
1272 break; | 1321 break; |
1273 } | 1322 } |
1274 default: | 1323 default: |
1275 UNREACHABLE(); | 1324 UNREACHABLE(); |
1276 } | 1325 } |
1277 } | 1326 } |
1278 | 1327 |
1279 | 1328 |
1280 EventHandlerImplementation::EventHandlerImplementation() { | 1329 EventHandlerImplementation::EventHandlerImplementation() { |
1330 startup_monitor_ = new Monitor(); | |
1331 handler_thread_id_ = Thread::kInvalidThreadId; | |
1281 completion_port_ = | 1332 completion_port_ = |
1282 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); | 1333 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); |
1283 if (completion_port_ == NULL) { | 1334 if (completion_port_ == NULL) { |
1284 FATAL("Completion port creation failed"); | 1335 FATAL("Completion port creation failed"); |
1285 } | 1336 } |
1286 shutdown_ = false; | 1337 shutdown_ = false; |
1287 } | 1338 } |
1288 | 1339 |
1289 | 1340 |
1290 EventHandlerImplementation::~EventHandlerImplementation() { | 1341 EventHandlerImplementation::~EventHandlerImplementation() { |
1342 Thread::Join(handler_thread_id_); | |
1343 delete startup_monitor_; | |
1291 CloseHandle(completion_port_); | 1344 CloseHandle(completion_port_); |
1292 } | 1345 } |
1293 | 1346 |
1294 | 1347 |
1295 int64_t EventHandlerImplementation::GetTimeout() { | 1348 int64_t EventHandlerImplementation::GetTimeout() { |
1296 if (!timeout_queue_.HasTimeout()) { | 1349 if (!timeout_queue_.HasTimeout()) { |
1297 return kInfinityTimeout; | 1350 return kInfinityTimeout; |
1298 } | 1351 } |
1299 int64_t millis = timeout_queue_.CurrentTimeout() - | 1352 int64_t millis = timeout_queue_.CurrentTimeout() - |
1300 TimerUtils::GetCurrentTimeMilliseconds(); | 1353 TimerUtils::GetCurrentTimeMilliseconds(); |
(...skipping 14 matching lines...) Expand all Loading... | |
1315 FATAL("PostQueuedCompletionStatus failed"); | 1368 FATAL("PostQueuedCompletionStatus failed"); |
1316 } | 1369 } |
1317 } | 1370 } |
1318 | 1371 |
1319 | 1372 |
1320 void EventHandlerImplementation::EventHandlerEntry(uword args) { | 1373 void EventHandlerImplementation::EventHandlerEntry(uword args) { |
1321 EventHandler* handler = reinterpret_cast<EventHandler*>(args); | 1374 EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
1322 EventHandlerImplementation* handler_impl = &handler->delegate_; | 1375 EventHandlerImplementation* handler_impl = &handler->delegate_; |
1323 ASSERT(handler_impl != NULL); | 1376 ASSERT(handler_impl != NULL); |
1324 | 1377 |
1378 { | |
1379 MonitorLocker ml(handler_impl->startup_monitor_); | |
1380 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId(); | |
1381 ml.Notify(); | |
1382 } | |
1383 | |
1325 while (!handler_impl->shutdown_) { | 1384 while (!handler_impl->shutdown_) { |
1326 DWORD bytes; | 1385 DWORD bytes; |
1327 ULONG_PTR key; | 1386 ULONG_PTR key; |
1328 OVERLAPPED* overlapped; | 1387 OVERLAPPED* overlapped; |
1329 int64_t millis = handler_impl->GetTimeout(); | 1388 int64_t millis = handler_impl->GetTimeout(); |
1330 ASSERT(millis == kInfinityTimeout || millis >= 0); | 1389 ASSERT(millis == kInfinityTimeout || millis >= 0); |
1331 if (millis > kMaxInt32) millis = kMaxInt32; | 1390 if (millis > kMaxInt32) millis = kMaxInt32; |
1332 ASSERT(sizeof(int32_t) == sizeof(DWORD)); | 1391 ASSERT(sizeof(int32_t) == sizeof(DWORD)); |
1333 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), | 1392 BOOL ok = GetQueuedCompletionStatus(handler_impl->completion_port(), |
1334 &bytes, | 1393 &bytes, |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1375 } | 1434 } |
1376 | 1435 |
1377 | 1436 |
1378 void EventHandlerImplementation::Start(EventHandler* handler) { | 1437 void EventHandlerImplementation::Start(EventHandler* handler) { |
1379 int result = Thread::Start(EventHandlerEntry, | 1438 int result = Thread::Start(EventHandlerEntry, |
1380 reinterpret_cast<uword>(handler)); | 1439 reinterpret_cast<uword>(handler)); |
1381 if (result != 0) { | 1440 if (result != 0) { |
1382 FATAL1("Failed to start event handler thread %d", result); | 1441 FATAL1("Failed to start event handler thread %d", result); |
1383 } | 1442 } |
1384 | 1443 |
1444 { | |
1445 MonitorLocker ml(startup_monitor_); | |
1446 while (handler_thread_id_ == Thread::kInvalidThreadId) { | |
1447 ml.Wait(); | |
1448 } | |
1449 } | |
1450 | |
1385 // Initialize Winsock32 | 1451 // Initialize Winsock32 |
1386 if (!Socket::Initialize()) { | 1452 if (!Socket::Initialize()) { |
1387 FATAL("Failed to initialized Windows sockets"); | 1453 FATAL("Failed to initialized Windows sockets"); |
1388 } | 1454 } |
1389 } | 1455 } |
1390 | 1456 |
1391 | 1457 |
1392 void EventHandlerImplementation::Shutdown() { | 1458 void EventHandlerImplementation::Shutdown() { |
1393 SendData(kShutdownId, 0, 0); | 1459 SendData(kShutdownId, 0, 0); |
1394 } | 1460 } |
1395 | 1461 |
1396 } // namespace bin | 1462 } // namespace bin |
1397 } // namespace dart | 1463 } // namespace dart |
1398 | 1464 |
1399 #endif // defined(TARGET_OS_WINDOWS) | 1465 #endif // defined(TARGET_OS_WINDOWS) |
OLD | NEW |