From 574b8304cfcc314adb615bb1fd4b159a59ab0441 Mon Sep 17 00:00:00 2001 From: David Edmondson Date: Thu, 25 Mar 2021 12:29:36 +0100 Subject: [PATCH 1/6] block/vdi: When writing new bmap entry fails, don't leak the buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a new bitmap entry is allocated, requiring the entire block to be written, avoiding leaking the buffer allocated for the block should the write fail. Reviewed-by: Philippe Mathieu-Daudé Signed-off-by: David Edmondson Signed-off-by: Paolo Bonzini Acked-by: Max Reitz Message-id: 20210325112941.365238-2-pbonzini@redhat.com Message-Id: <20210309144015.557477-2-david.edmondson@oracle.com> Acked-by: Max Reitz Signed-off-by: Paolo Bonzini Signed-off-by: Stefan Hajnoczi --- block/vdi.c | 1 + 1 file changed, 1 insertion(+) diff --git a/block/vdi.c b/block/vdi.c index 5627e7d764..2a6dc26124 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -690,6 +690,7 @@ nonallocating_write: logout("finished data write\n"); if (ret < 0) { + g_free(block); return ret; } From 07ee2ab4fd0147edb64ba88e55407dd9d6656175 Mon Sep 17 00:00:00 2001 From: David Edmondson Date: Thu, 25 Mar 2021 12:29:37 +0100 Subject: [PATCH 2/6] block/vdi: Don't assume that blocks are larger than VdiHeader Given that the block size is read from the header of the VDI file, a wide variety of sizes might be seen. Rather than re-using a block sized memory region when writing the VDI header, allocate an appropriately sized buffer. Signed-off-by: David Edmondson Signed-off-by: Paolo Bonzini Acked-by: Max Reitz Message-id: 20210325112941.365238-3-pbonzini@redhat.com Message-Id: <20210309144015.557477-3-david.edmondson@oracle.com> Acked-by: Max Reitz Signed-off-by: Paolo Bonzini Signed-off-by: Stefan Hajnoczi --- block/vdi.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/block/vdi.c b/block/vdi.c index 2a6dc26124..548f8a057b 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -696,18 +696,20 @@ nonallocating_write: if (block) { /* One or more new blocks were allocated. */ - VdiHeader *header = (VdiHeader *) block; + VdiHeader *header; uint8_t *base; uint64_t offset; uint32_t n_sectors; + g_free(block); + header = g_malloc(sizeof(*header)); + logout("now writing modified header\n"); assert(VDI_IS_ALLOCATED(bmap_first)); *header = s->header; vdi_header_to_le(header); - ret = bdrv_pwrite(bs->file, 0, block, sizeof(VdiHeader)); - g_free(block); - block = NULL; + ret = bdrv_pwrite(bs->file, 0, header, sizeof(*header)); + g_free(header); if (ret < 0) { return ret; From 2f6ef0393b54383c204d4d6aa5b8eec2bcad566f Mon Sep 17 00:00:00 2001 From: David Edmondson Date: Thu, 25 Mar 2021 12:29:38 +0100 Subject: [PATCH 3/6] coroutine-lock: Store the coroutine in the CoWaitRecord only once MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When taking the slow path for mutex acquisition, set the coroutine value in the CoWaitRecord in push_waiter(), rather than both there and in the caller. Reviewed-by: Paolo Bonzini Reviewed-by: Philippe Mathieu-Daudé Signed-off-by: David Edmondson Signed-off-by: Paolo Bonzini Message-id: 20210325112941.365238-4-pbonzini@redhat.com Message-Id: <20210309144015.557477-4-david.edmondson@oracle.com> Signed-off-by: Paolo Bonzini Signed-off-by: Stefan Hajnoczi --- util/qemu-coroutine-lock.c | 1 - 1 file changed, 1 deletion(-) diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 5816bf8900..eb73cf11dc 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -204,7 +204,6 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, unsigned old_handoff; trace_qemu_co_mutex_lock_entry(mutex, self); - w.co = self; push_waiter(mutex, &w); /* This is the "Responsibility Hand-Off" protocol; a lock() picks from From 050de36b13f7a841b7805391bca44f36370e86e4 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 25 Mar 2021 12:29:39 +0100 Subject: [PATCH 4/6] coroutine-lock: Reimplement CoRwlock to fix downgrade bug An invariant of the current rwlock is that if multiple coroutines hold a reader lock, all must be runnable. The unlock implementation relies on this, choosing to wake a single coroutine when the final read lock holder exits the critical section, assuming that it will wake a coroutine attempting to acquire a write lock. The downgrade implementation violates this assumption by creating a read lock owning coroutine that is exclusively runnable - any other coroutines that are waiting to acquire a read lock are *not* made runnable when the write lock holder converts its ownership to read only. More in general, the old implementation had lots of other fairness bugs. The root cause of the bugs was that CoQueue would wake up readers even if there were pending writers, and would wake up writers even if there were readers. In that case, the coroutine would go back to sleep *at the end* of the CoQueue, losing its place at the head of the line. To fix this, keep the queue of waiters explicitly in the CoRwlock instead of using CoQueue, and store for each whether it is a potential reader or a writer. This way, downgrade can look at the first queued coroutines and wake it only if it is a reader, causing all other readers in line to be released in turn. Reported-by: David Edmondson Reviewed-by: David Edmondson Signed-off-by: Paolo Bonzini Message-id: 20210325112941.365238-5-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi --- include/qemu/coroutine.h | 17 ++-- util/qemu-coroutine-lock.c | 154 ++++++++++++++++++++++++------------- 2 files changed, 109 insertions(+), 62 deletions(-) diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 84eab6e3bf..ce5b9c6851 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock); bool qemu_co_queue_empty(CoQueue *queue); +typedef struct CoRwTicket CoRwTicket; typedef struct CoRwlock { - int pending_writer; - int reader; CoMutex mutex; - CoQueue queue; + + /* Number of readers, or -1 if owned for writing. */ + int owners; + + /* Waiting coroutines. */ + QSIMPLEQ_HEAD(, CoRwTicket) tickets; } CoRwlock; /** @@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock); /** * Write Locks the CoRwlock from a reader. This is a bit more efficient than * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock. - * However, if the lock cannot be upgraded immediately, control is transferred - * to the caller of the current coroutine. Also, @qemu_co_rwlock_upgrade - * only overrides CoRwlock fairness if there are no concurrent readers, so - * another writer might run while @qemu_co_rwlock_upgrade blocks. + * Note that if the lock cannot be upgraded immediately, control is transferred + * to the caller of the current coroutine; another writer might run while + * @qemu_co_rwlock_upgrade blocks. */ void qemu_co_rwlock_upgrade(CoRwlock *lock); diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index eb73cf11dc..2669403839 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -327,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) trace_qemu_co_mutex_unlock_return(mutex, self); } +struct CoRwTicket { + bool read; + Coroutine *co; + QSIMPLEQ_ENTRY(CoRwTicket) next; +}; + void qemu_co_rwlock_init(CoRwlock *lock) { - memset(lock, 0, sizeof(*lock)); - qemu_co_queue_init(&lock->queue); qemu_co_mutex_init(&lock->mutex); + lock->owners = 0; + QSIMPLEQ_INIT(&lock->tickets); +} + +/* Releases the internal CoMutex. */ +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock) +{ + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets); + Coroutine *co = NULL; + + /* + * Setting lock->owners here prevents rdlock and wrlock from + * sneaking in between unlock and wake. + */ + + if (tkt) { + if (tkt->read) { + if (lock->owners >= 0) { + lock->owners++; + co = tkt->co; + } + } else { + if (lock->owners == 0) { + lock->owners = -1; + co = tkt->co; + } + } + } + + if (co) { + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next); + qemu_co_mutex_unlock(&lock->mutex); + aio_co_wake(co); + } else { + qemu_co_mutex_unlock(&lock->mutex); + } } void qemu_co_rwlock_rdlock(CoRwlock *lock) @@ -340,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock) qemu_co_mutex_lock(&lock->mutex); /* For fairness, wait if a writer is in line. */ - while (lock->pending_writer) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); - } - lock->reader++; - qemu_co_mutex_unlock(&lock->mutex); + if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) { + lock->owners++; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { true, self }; + + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_mutex_unlock(&lock->mutex); + qemu_coroutine_yield(); + assert(lock->owners >= 1); + + /* Possibly wake another reader, which will wake the next in line. */ + qemu_co_mutex_lock(&lock->mutex); + qemu_co_rwlock_maybe_wake_one(lock); + } - /* The rest of the read-side critical section is run without the mutex. */ self->locks_held++; } @@ -355,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) Coroutine *self = qemu_coroutine_self(); assert(qemu_in_coroutine()); - if (!lock->reader) { - /* The critical section started in qemu_co_rwlock_wrlock. */ - qemu_co_queue_restart_all(&lock->queue); - } else { - self->locks_held--; + self->locks_held--; - qemu_co_mutex_lock(&lock->mutex); - lock->reader--; - assert(lock->reader >= 0); - /* Wakeup only one waiting writer */ - if (!lock->reader) { - qemu_co_queue_next(&lock->queue); - } + qemu_co_mutex_lock(&lock->mutex); + if (lock->owners > 0) { + lock->owners--; + } else { + assert(lock->owners == -1); + lock->owners = 0; } - qemu_co_mutex_unlock(&lock->mutex); + + qemu_co_rwlock_maybe_wake_one(lock); } void qemu_co_rwlock_downgrade(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); + qemu_co_mutex_lock(&lock->mutex); + assert(lock->owners == -1); + lock->owners = 1; - /* lock->mutex critical section started in qemu_co_rwlock_wrlock or - * qemu_co_rwlock_upgrade. - */ - assert(lock->reader == 0); - lock->reader++; - qemu_co_mutex_unlock(&lock->mutex); - - /* The rest of the read-side critical section is run without the mutex. */ - self->locks_held++; + /* Possibly wake another reader, which will wake the next in line. */ + qemu_co_rwlock_maybe_wake_one(lock); } void qemu_co_rwlock_wrlock(CoRwlock *lock) { - qemu_co_mutex_lock(&lock->mutex); - lock->pending_writer++; - while (lock->reader) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); - } - lock->pending_writer--; + Coroutine *self = qemu_coroutine_self(); - /* The rest of the write-side critical section is run with - * the mutex taken, so that lock->reader remains zero. - * There is no need to update self->locks_held. - */ + qemu_co_mutex_lock(&lock->mutex); + if (lock->owners == 0) { + lock->owners = -1; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { false, qemu_coroutine_self() }; + + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_mutex_unlock(&lock->mutex); + qemu_coroutine_yield(); + assert(lock->owners == -1); + } + + self->locks_held++; } void qemu_co_rwlock_upgrade(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); - qemu_co_mutex_lock(&lock->mutex); - assert(lock->reader > 0); - lock->reader--; - lock->pending_writer++; - while (lock->reader) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); - } - lock->pending_writer--; + assert(lock->owners > 0); + /* For fairness, wait if a writer is in line. */ + if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) { + lock->owners = -1; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { false, qemu_coroutine_self() }; - /* The rest of the write-side critical section is run with - * the mutex taken, similar to qemu_co_rwlock_wrlock. Do - * not account for the lock twice in self->locks_held. - */ - self->locks_held--; + lock->owners--; + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_rwlock_maybe_wake_one(lock); + qemu_coroutine_yield(); + assert(lock->owners == -1); + } } From 25bc2daed0482732a2dd258dde4386f505582fa9 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 25 Mar 2021 12:29:40 +0100 Subject: [PATCH 5/6] test-coroutine: Add rwlock upgrade test Test that rwlock upgrade is fair, and that readers go back to sleep if a writer is in line. Signed-off-by: Paolo Bonzini Message-id: 20210325112941.365238-6-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi --- tests/unit/test-coroutine.c | 62 +++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/unit/test-coroutine.c b/tests/unit/test-coroutine.c index e946d93a65..6e6f51d480 100644 --- a/tests/unit/test-coroutine.c +++ b/tests/unit/test-coroutine.c @@ -264,6 +264,67 @@ static void test_co_mutex_lockable(void) g_assert(QEMU_MAKE_LOCKABLE(null_pointer) == NULL); } +static CoRwlock rwlock; + +/* Test that readers are properly sent back to the queue when upgrading, + * even if they are the sole readers. The test scenario is as follows: + * + * + * | c1 | c2 | + * |--------------+------------+ + * | rdlock | | + * | yield | | + * | | wrlock | + * | | | + * | upgrade | | + * | | | + * | | unlock | + * | | | + * | unlock | | + */ + +static void coroutine_fn rwlock_yield_upgrade(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_upgrade(&rwlock); + qemu_co_rwlock_unlock(&rwlock); + + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock_yield(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void test_co_rwlock_upgrade(void) +{ + bool c1_done = false; + bool c2_done = false; + Coroutine *c1, *c2; + + qemu_co_rwlock_init(&rwlock); + c1 = qemu_coroutine_create(rwlock_yield_upgrade, &c1_done); + c2 = qemu_coroutine_create(rwlock_wrlock_yield, &c2_done); + + qemu_coroutine_enter(c1); + qemu_coroutine_enter(c2); + + /* c1 now should go to sleep. */ + qemu_coroutine_enter(c1); + g_assert(!c1_done); + + qemu_coroutine_enter(c2); + g_assert(c1_done); + g_assert(c2_done); +} + /* * Check that creation, enter, and return work */ @@ -501,6 +562,7 @@ int main(int argc, char **argv) g_test_add_func("/basic/order", test_order); g_test_add_func("/locking/co-mutex", test_co_mutex); g_test_add_func("/locking/co-mutex/lockable", test_co_mutex_lockable); + g_test_add_func("/locking/co-rwlock/upgrade", test_co_rwlock_upgrade); if (g_test_perf()) { g_test_add_func("/perf/lifecycle", perf_lifecycle); g_test_add_func("/perf/nesting", perf_nesting); From b6489ac06695e257ea0a9841364577e247fdee30 Mon Sep 17 00:00:00 2001 From: David Edmondson Date: Thu, 25 Mar 2021 12:29:41 +0100 Subject: [PATCH 6/6] test-coroutine: Add rwlock downgrade test Test that downgrading an rwlock does not result in a failure to schedule coroutines queued on the rwlock. The diagram associated with test_co_rwlock_downgrade() describes the intended behaviour, but what was observed previously corresponds to: | c1 | c2 | c3 | c4 | |--------+------------+------------+----------| | rdlock | | | | | yield | | | | | | wrlock | | | | | | | | | | | rdlock | | | | | | | | | | | wrlock | | | | | | | unlock | | | | | yield | | | | | | | | | | | downgrade | | | | | ... | | | | | unlock | | | | | | | | | | | | | This results in a failure... ERROR:../tests/test-coroutine.c:369:test_co_rwlock_downgrade: assertion failed: (c3_done) Bail out! ERROR:../tests/test-coroutine.c:369:test_co_rwlock_downgrade: assertion failed: (c3_done) ...as a result of the c3 coroutine failing to run to completion. Signed-off-by: David Edmondson Signed-off-by: Paolo Bonzini Message-id: 20210325112941.365238-7-pbonzini@redhat.com Message-Id: <20210309144015.557477-5-david.edmondson@oracle.com> Signed-off-by: Paolo Bonzini Signed-off-by: Stefan Hajnoczi --- tests/unit/test-coroutine.c | 99 +++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/tests/unit/test-coroutine.c b/tests/unit/test-coroutine.c index 6e6f51d480..aa77a3bcb3 100644 --- a/tests/unit/test-coroutine.c +++ b/tests/unit/test-coroutine.c @@ -325,6 +325,104 @@ static void test_co_rwlock_upgrade(void) g_assert(c2_done); } +static void coroutine_fn rwlock_rdlock_yield(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_unlock(&rwlock); + qemu_coroutine_yield(); + + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock_downgrade(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + + qemu_co_rwlock_downgrade(&rwlock); + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_rdlock(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +/* + * Check that downgrading a reader-writer lock does not cause a hang. + * + * Four coroutines are used to produce a situation where there are + * both reader and writer hopefuls waiting to acquire an rwlock that + * is held by a reader. + * + * The correct sequence of operations we aim to provoke can be + * represented as: + * + * | c1 | c2 | c3 | c4 | + * |--------+------------+------------+------------| + * | rdlock | | | | + * | yield | | | | + * | | wrlock | | | + * | | | | | + * | | | rdlock | | + * | | | | | + * | | | | wrlock | + * | | | | | + * | unlock | | | | + * | yield | | | | + * | | | | | + * | | downgrade | | | + * | | | | | + * | | | unlock | | + * | | ... | | | + * | | unlock | | | + * | | | | | + * | | | | unlock | + */ +static void test_co_rwlock_downgrade(void) +{ + bool c1_done = false; + bool c2_done = false; + bool c3_done = false; + bool c4_done = false; + Coroutine *c1, *c2, *c3, *c4; + + qemu_co_rwlock_init(&rwlock); + + c1 = qemu_coroutine_create(rwlock_rdlock_yield, &c1_done); + c2 = qemu_coroutine_create(rwlock_wrlock_downgrade, &c2_done); + c3 = qemu_coroutine_create(rwlock_rdlock, &c3_done); + c4 = qemu_coroutine_create(rwlock_wrlock, &c4_done); + + qemu_coroutine_enter(c1); + qemu_coroutine_enter(c2); + qemu_coroutine_enter(c3); + qemu_coroutine_enter(c4); + + qemu_coroutine_enter(c1); + + g_assert(c2_done); + g_assert(c3_done); + g_assert(c4_done); + + qemu_coroutine_enter(c1); + + g_assert(c1_done); +} + /* * Check that creation, enter, and return work */ @@ -563,6 +661,7 @@ int main(int argc, char **argv) g_test_add_func("/locking/co-mutex", test_co_mutex); g_test_add_func("/locking/co-mutex/lockable", test_co_mutex_lockable); g_test_add_func("/locking/co-rwlock/upgrade", test_co_rwlock_upgrade); + g_test_add_func("/locking/co-rwlock/downgrade", test_co_rwlock_downgrade); if (g_test_perf()) { g_test_add_func("/perf/lifecycle", perf_lifecycle); g_test_add_func("/perf/nesting", perf_nesting);