From e5ae075ff9b3cba2ee7b7393bc7f4b49c3d3ee79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 5 Dec 2016 12:28:57 +0200 Subject: [PATCH] GSocket: Fix race conditions on Win32 if multiple threads are waiting on conditions for the same socket WSAWaitForMultipleEvents() only returns for one of the waiting threads, and that one might not even be the one waiting for the condition that changed. As such, only let a single thread wait on the event and use a GCond for all other threads. With this it is possible to e.g. have an UDP socket that is written to from one thread and read from in another thread on Win32 too. On POSIX systems this was working before already. https://bugzilla.gnome.org/show_bug.cgi?id=762283 --- gio/gsocket.c | 94 ++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/gio/gsocket.c b/gio/gsocket.c index 5a2f33285..ff5403b6f 100644 --- a/gio/gsocket.c +++ b/gio/gsocket.c @@ -249,11 +249,14 @@ struct _GSocketPrivate guint connect_pending : 1; #ifdef G_OS_WIN32 WSAEVENT event; + gboolean waiting; + DWORD waiting_result; int current_events; int current_errors; int selected_events; GList *requested_conditions; /* list of requested GIOCondition * */ GMutex win32_source_lock; + GCond win32_source_cond; #endif struct { @@ -333,8 +336,10 @@ socket_strerror (int err) static void _win32_unset_event_mask (GSocket *socket, int mask) { + g_mutex_lock (&socket->priv->win32_source_lock); socket->priv->current_events &= ~mask; socket->priv->current_errors &= ~mask; + g_mutex_unlock (&socket->priv->win32_source_lock); } #else #define win32_unset_event_mask(_socket, _mask) @@ -831,6 +836,7 @@ g_socket_finalize (GObject *object) g_assert (socket->priv->requested_conditions == NULL); g_mutex_clear (&socket->priv->win32_source_lock); + g_cond_clear (&socket->priv->win32_source_cond); #endif for (i = 0; i < RECV_ADDR_CACHE_SIZE; i++) @@ -1058,6 +1064,7 @@ g_socket_init (GSocket *socket) #ifdef G_OS_WIN32 socket->priv->event = WSA_INVALID_EVENT; g_mutex_init (&socket->priv->win32_source_lock); + g_cond_init (&socket->priv->win32_source_cond); #endif } @@ -2441,6 +2448,8 @@ g_socket_accept (GSocket *socket, while (TRUE) { + win32_unset_event_mask (socket, FD_ACCEPT); + if ((ret = accept (socket->priv->fd, NULL, 0)) < 0) { int errsv = get_socket_errno (); @@ -2455,8 +2464,6 @@ g_socket_accept (GSocket *socket, errsv == EAGAIN) #endif { - win32_unset_event_mask (socket, FD_ACCEPT); - if (socket->priv->blocking) { if (!g_socket_condition_wait (socket, @@ -2473,8 +2480,6 @@ g_socket_accept (GSocket *socket, break; } - win32_unset_event_mask (socket, FD_ACCEPT); - #ifdef G_OS_WIN32 { /* The socket inherits the accepting sockets event mask and even object, @@ -2563,6 +2568,8 @@ g_socket_connect (GSocket *socket, while (1) { + win32_unset_event_mask (socket, FD_CONNECT); + if (connect (socket->priv->fd, (struct sockaddr *) &buffer, g_socket_address_get_native_size (address)) < 0) { @@ -2577,8 +2584,6 @@ g_socket_connect (GSocket *socket, if (errsv == WSAEWOULDBLOCK) #endif { - win32_unset_event_mask (socket, FD_CONNECT); - if (socket->priv->blocking) { if (g_socket_condition_wait (socket, G_IO_OUT, cancellable, error)) @@ -2604,8 +2609,6 @@ g_socket_connect (GSocket *socket, break; } - win32_unset_event_mask (socket, FD_CONNECT); - socket->priv->connected_read = TRUE; socket->priv->connected_write = TRUE; @@ -2785,6 +2788,8 @@ g_socket_receive_with_timeout (GSocket *socket, while (1) { + win32_unset_event_mask (socket, FD_READ); + if ((ret = recv (socket->priv->fd, buffer, size, 0)) < 0) { int errsv = get_socket_errno (); @@ -2799,8 +2804,6 @@ g_socket_receive_with_timeout (GSocket *socket, errsv == EAGAIN) #endif { - win32_unset_event_mask (socket, FD_READ); - if (timeout != 0) { if (!block_on_timeout (socket, G_IO_IN, timeout, start_time, @@ -2811,14 +2814,10 @@ g_socket_receive_with_timeout (GSocket *socket, } } - win32_unset_event_mask (socket, FD_READ); - socket_set_error_lazy (error, errsv, _("Error receiving data: %s")); return -1; } - win32_unset_event_mask (socket, FD_READ); - break; } @@ -2984,6 +2983,8 @@ g_socket_send_with_timeout (GSocket *socket, while (1) { + win32_unset_event_mask (socket, FD_WRITE); + if ((ret = send (socket->priv->fd, buffer, size, G_SOCKET_DEFAULT_SEND_FLAGS)) < 0) { int errsv = get_socket_errno (); @@ -2998,8 +2999,6 @@ g_socket_send_with_timeout (GSocket *socket, errsv == EAGAIN) #endif { - win32_unset_event_mask (socket, FD_WRITE); - if (timeout != 0) { if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time, @@ -3414,7 +3413,7 @@ remove_condition_watch (GSocket *socket, } static GIOCondition -update_condition (GSocket *socket) +update_condition_unlocked (GSocket *socket) { WSANETWORKEVENTS events; GIOCondition condition; @@ -3481,6 +3480,16 @@ update_condition (GSocket *socket) return condition; } + +static GIOCondition +update_condition (GSocket *socket) +{ + GIOCondition res; + g_mutex_lock (&socket->priv->win32_source_lock); + res = update_condition_unlocked (socket); + g_mutex_unlock (&socket->priv->win32_source_lock); + return res; +} #endif typedef struct { @@ -3876,11 +3885,44 @@ g_socket_condition_timed_wait (GSocket *socket, if (timeout == -1) timeout = WSA_INFINITE; - current_condition = update_condition (socket); + g_mutex_lock (&socket->priv->win32_source_lock); + current_condition = update_condition_unlocked (socket); while ((condition & current_condition) == 0) { - res = WSAWaitForMultipleEvents (num_events, events, - FALSE, timeout, FALSE); + if (!socket->priv->waiting) + { + socket->priv->waiting = TRUE; + socket->priv->waiting_result = 0; + g_mutex_unlock (&socket->priv->win32_source_lock); + + res = WSAWaitForMultipleEvents (num_events, events, FALSE, timeout, FALSE); + + g_mutex_lock (&socket->priv->win32_source_lock); + socket->priv->waiting = FALSE; + socket->priv->waiting_result = res; + g_cond_broadcast (&socket->priv->win32_source_cond); + } + else + { + if (timeout != WSA_INFINITE) + { + if (!g_cond_wait_until (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock, timeout)) + { + res = WSA_WAIT_TIMEOUT; + break; + } + else + { + res = socket->priv->waiting_result; + } + } + else + { + g_cond_wait (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock); + res = socket->priv->waiting_result; + } + } + if (res == WSA_WAIT_FAILED) { int errsv = get_socket_errno (); @@ -3901,7 +3943,7 @@ g_socket_condition_timed_wait (GSocket *socket, if (g_cancellable_set_error_if_cancelled (cancellable, error)) break; - current_condition = update_condition (socket); + current_condition = update_condition_unlocked (socket); if (timeout != WSA_INFINITE) { @@ -3910,6 +3952,7 @@ g_socket_condition_timed_wait (GSocket *socket, timeout = 0; } } + g_mutex_unlock (&socket->priv->win32_source_lock); remove_condition_watch (socket, &condition); if (num_events > 1) g_cancellable_release_fd (cancellable); @@ -4405,6 +4448,8 @@ g_socket_send_message_with_timeout (GSocket *socket, while (1) { + win32_unset_event_mask (socket, FD_WRITE); + if (address) result = WSASendTo (socket->priv->fd, bufs, num_vectors, @@ -4426,8 +4471,6 @@ g_socket_send_message_with_timeout (GSocket *socket, if (errsv == WSAEWOULDBLOCK) { - win32_unset_event_mask (socket, FD_WRITE); - if (timeout != 0) { if (!block_on_timeout (socket, G_IO_OUT, timeout, @@ -4875,6 +4918,8 @@ g_socket_receive_message_with_timeout (GSocket *socket, /* do it */ while (1) { + win32_unset_event_mask (socket, FD_READ); + addrlen = sizeof addr; if (address) result = WSARecvFrom (socket->priv->fd, @@ -4896,8 +4941,6 @@ g_socket_receive_message_with_timeout (GSocket *socket, if (errsv == WSAEWOULDBLOCK) { - win32_unset_event_mask (socket, FD_READ); - if (timeout != 0) { if (!block_on_timeout (socket, G_IO_IN, timeout, @@ -4911,7 +4954,6 @@ g_socket_receive_message_with_timeout (GSocket *socket, socket_set_error_lazy (error, errsv, _("Error receiving message: %s")); return -1; } - win32_unset_event_mask (socket, FD_READ); break; } -- 2.11.0