X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fworkqueue.c;h=10b9fdee5df22155875edda1ccf4fff77adf3f0f;hb=HEAD;hp=e5931b5e74bfee68c3efd1ff61b53e9bc68af23f;hpb=014775106c60f02818ca755b331f887030bd440f;p=urcu.git diff --git a/src/workqueue.c b/src/workqueue.c index e5931b5..10b9fde 100644 --- a/src/workqueue.c +++ b/src/workqueue.c @@ -1,24 +1,10 @@ +// SPDX-FileCopyrightText: 2010 Paul E. McKenney +// SPDX-FileCopyrightText: 2017 Mathieu Desnoyers +// +// SPDX-License-Identifier: LGPL-2.1-or-later + /* - * workqueue.c - * * Userspace RCU library - Userspace workqeues - * - * Copyright (c) 2010 Paul E. McKenney - * Copyright (c) 2017 Mathieu Desnoyers - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #define _LGPL_SOURCE @@ -131,16 +117,25 @@ static void futex_wait(int32_t *futex) { /* Read condition before read futex */ cmm_smp_mb(); - if (uatomic_read(futex) != -1) - return; - while (futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + while (uatomic_read(futex) == -1) { + if (!futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * -1 (spurious wakeups). Check the value again + * in user-space to validate whether it really + * differs from -1. + */ + continue; + } switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ return; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ urcu_die(errno); @@ -275,6 +270,7 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, { struct urcu_workqueue *workqueue; int ret; + sigset_t newmask, oldmask; workqueue = malloc(sizeof(*workqueue)); if (workqueue == NULL) @@ -295,10 +291,20 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, workqueue->cpu_affinity = cpu_affinity; workqueue->loop_count = 0; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + return workqueue; } @@ -378,7 +384,7 @@ struct urcu_workqueue_completion *urcu_workqueue_create_completion(void) { struct urcu_workqueue_completion *completion; - completion = calloc(sizeof(*completion), 1); + completion = calloc(1, sizeof(*completion)); if (!completion) urcu_die(errno); urcu_ref_set(&completion->ref, 1); @@ -409,7 +415,7 @@ void urcu_workqueue_queue_completion(struct urcu_workqueue *workqueue, { struct urcu_workqueue_completion_work *work; - work = calloc(sizeof(*work), 1); + work = calloc(1, sizeof(*work)); if (!work) urcu_die(errno); work->completion = completion; @@ -455,13 +461,23 @@ void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue) void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue) { int ret; + sigset_t newmask, oldmask; /* Clear workqueue state from parent. */ workqueue->flags &= ~URCU_WORKQUEUE_PAUSED; workqueue->flags &= ~URCU_WORKQUEUE_PAUSE; workqueue->tid = 0; + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); }