Index: base/sync_socket_win.cc |
diff --git a/base/sync_socket_win.cc b/base/sync_socket_win.cc |
index 99a6afea3ec2fdde77f7ac829cbf102fa6998098..58c4432efac5052a1a1bd2b92eb442d557f5f475 100644 |
--- a/base/sync_socket_win.cc |
+++ b/base/sync_socket_win.cc |
@@ -5,6 +5,7 @@ |
#include "base/sync_socket.h" |
#include "base/logging.h" |
+#include "base/threading/thread_restrictions.h" |
#include "base/win/scoped_handle.h" |
namespace base { |
@@ -27,9 +28,9 @@ const int kInBufferSize = 4096; |
const int kDefaultTimeoutMilliSeconds = 1000; |
bool CreatePairImpl(HANDLE* socket_a, HANDLE* socket_b, bool overlapped) { |
- DCHECK(socket_a != socket_b); |
- DCHECK(*socket_a == SyncSocket::kInvalidHandle); |
- DCHECK(*socket_b == SyncSocket::kInvalidHandle); |
+ DCHECK_NE(socket_a, socket_b); |
+ DCHECK_EQ(*socket_a, SyncSocket::kInvalidHandle); |
+ DCHECK_EQ(*socket_b, SyncSocket::kInvalidHandle); |
wchar_t name[kPipePathMax]; |
ScopedHandle handle_a; |
@@ -112,33 +113,49 @@ DWORD GetNextChunkSize(size_t current_pos, size_t max_size) { |
template <typename BufferType, typename Function> |
size_t CancelableFileOperation(Function operation, HANDLE file, |
BufferType* buffer, size_t length, |
- base::WaitableEvent* io_event, |
- base::WaitableEvent* cancel_event, |
+ WaitableEvent* io_event, |
+ WaitableEvent* cancel_event, |
CancelableSyncSocket* socket, |
DWORD timeout_in_ms) { |
+ ThreadRestrictions::AssertIOAllowed(); |
// The buffer must be byte size or the length check won't make much sense. |
COMPILE_ASSERT(sizeof(buffer[0]) == sizeof(char), incorrect_buffer_type); |
+ DCHECK_GT(length, 0u); |
DCHECK_LE(length, kMaxMessageLength); |
+ DCHECK_NE(file, SyncSocket::kInvalidHandle); |
+ |
+ // Track the finish time so we can calculate the timeout as data is read. |
+ TimeTicks current_time, finish_time; |
+ if (timeout_in_ms != INFINITE) { |
+ current_time = TimeTicks::Now(); |
+ finish_time = |
+ current_time + base::TimeDelta::FromMilliseconds(timeout_in_ms); |
+ } |
- OVERLAPPED ol = {0}; |
- ol.hEvent = io_event->handle(); |
size_t count = 0; |
- while (count < length) { |
- DWORD chunk = GetNextChunkSize(count, length); |
+ do { |
+ // The OVERLAPPED structure will be modified by ReadFile or WriteFile. |
+ OVERLAPPED ol = { 0 }; |
+ ol.hEvent = io_event->handle(); |
+ |
+ const DWORD chunk = GetNextChunkSize(count, length); |
// This is either the ReadFile or WriteFile call depending on whether |
// we're receiving or sending data. |
DWORD len = 0; |
- BOOL ok = operation(file, static_cast<BufferType*>(buffer) + count, chunk, |
- &len, &ol); |
- if (!ok) { |
+ const BOOL operation_ok = operation( |
+ file, static_cast<BufferType*>(buffer) + count, chunk, &len, &ol); |
+ if (!operation_ok) { |
if (::GetLastError() == ERROR_IO_PENDING) { |
HANDLE events[] = { io_event->handle(), cancel_event->handle() }; |
- int wait_result = WaitForMultipleObjects( |
- arraysize(events), events, FALSE, timeout_in_ms); |
+ const int wait_result = WaitForMultipleObjects( |
+ ARRAYSIZE_UNSAFE(events), events, FALSE, |
+ timeout_in_ms == INFINITE |
+ ? timeout_in_ms |
+ : (finish_time - current_time).InMilliseconds()); |
if (wait_result == (WAIT_OBJECT_0 + 0)) { |
GetOverlappedResult(file, &ol, &len, TRUE); |
} else if (wait_result == (WAIT_OBJECT_0 + 1)) { |
- VLOG(1) << "Shutdown was signaled. Closing socket."; |
+ DVLOG(1) << "Shutdown was signaled. Closing socket."; |
CancelIo(file); |
socket->Close(); |
count = 0; |
@@ -146,9 +163,8 @@ size_t CancelableFileOperation(Function operation, HANDLE file, |
} else { |
// Timeout happened. |
DCHECK_EQ(WAIT_TIMEOUT, wait_result); |
- if (!CancelIo(file)){ |
+ if (!CancelIo(file)) |
DLOG(WARNING) << "CancelIo() failed"; |
- } |
break; |
} |
} else { |
@@ -161,9 +177,15 @@ size_t CancelableFileOperation(Function operation, HANDLE file, |
// Quit the operation if we can't write/read anymore. |
if (len != chunk) |
break; |
- } |
- return (count > 0) ? count : 0; |
+ // Since TimeTicks::Now() is expensive, only bother updating the time if we |
+ // have more work to do. |
+ if (timeout_in_ms != INFINITE && count < length) |
+ current_time = base::TimeTicks::Now(); |
+ } while (count < length && |
+ (timeout_in_ms == INFINITE || current_time < finish_time)); |
+ |
+ return count; |
} |
} // namespace |
@@ -185,37 +207,50 @@ bool SyncSocket::CreatePair(SyncSocket* socket_a, SyncSocket* socket_b) { |
bool SyncSocket::Close() { |
if (handle_ == kInvalidHandle) |
- return false; |
+ return true; |
- BOOL retval = CloseHandle(handle_); |
+ const BOOL result = CloseHandle(handle_); |
handle_ = kInvalidHandle; |
- return retval ? true : false; |
+ return result == TRUE; |
} |
size_t SyncSocket::Send(const void* buffer, size_t length) { |
+ ThreadRestrictions::AssertIOAllowed(); |
+ DCHECK_GT(length, 0u); |
DCHECK_LE(length, kMaxMessageLength); |
+ DCHECK_NE(handle_, kInvalidHandle); |
size_t count = 0; |
while (count < length) { |
DWORD len; |
DWORD chunk = GetNextChunkSize(count, length); |
if (WriteFile(handle_, static_cast<const char*>(buffer) + count, |
chunk, &len, NULL) == FALSE) { |
- return (0 < count) ? count : 0; |
+ return count; |
} |
count += len; |
} |
return count; |
} |
+size_t SyncSocket::ReceiveWithTimeout(void* buffer, |
+ size_t length, |
+ TimeDelta timeout) { |
+ NOTIMPLEMENTED(); |
+ return 0; |
+} |
+ |
size_t SyncSocket::Receive(void* buffer, size_t length) { |
+ ThreadRestrictions::AssertIOAllowed(); |
+ DCHECK_GT(length, 0u); |
DCHECK_LE(length, kMaxMessageLength); |
+ DCHECK_NE(handle_, kInvalidHandle); |
size_t count = 0; |
while (count < length) { |
DWORD len; |
DWORD chunk = GetNextChunkSize(count, length); |
if (ReadFile(handle_, static_cast<char*>(buffer) + count, |
chunk, &len, NULL) == FALSE) { |
- return (0 < count) ? count : 0; |
+ return count; |
} |
count += len; |
} |
@@ -245,9 +280,9 @@ bool CancelableSyncSocket::Shutdown() { |
} |
bool CancelableSyncSocket::Close() { |
- bool ret = SyncSocket::Close(); |
+ const bool result = SyncSocket::Close(); |
shutdown_event_.Reset(); |
- return ret; |
+ return result; |
} |
size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { |
@@ -258,9 +293,17 @@ size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { |
} |
size_t CancelableSyncSocket::Receive(void* buffer, size_t length) { |
- return CancelableFileOperation(&ReadFile, handle_, |
- reinterpret_cast<char*>(buffer), length, &file_operation_, |
- &shutdown_event_, this, INFINITE); |
+ return CancelableFileOperation( |
+ &ReadFile, handle_, reinterpret_cast<char*>(buffer), length, |
+ &file_operation_, &shutdown_event_, this, INFINITE); |
+} |
+ |
+size_t CancelableSyncSocket::ReceiveWithTimeout(void* buffer, |
+ size_t length, |
+ TimeDelta timeout) { |
+ return CancelableFileOperation( |
+ &ReadFile, handle_, reinterpret_cast<char*>(buffer), length, |
+ &file_operation_, &shutdown_event_, this, timeout.InMilliseconds()); |
} |
// static |
@@ -269,5 +312,4 @@ bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a, |
return CreatePairImpl(&socket_a->handle_, &socket_b->handle_, true); |
} |
- |
} // namespace base |