NBD patches for 2023-09-07

- Andrey Drobyshev - fix regression in iotest 197 under -nbd
 - Stefan Hajnoczi - allow coroutine read and write context to split
 across threads
 - Philippe Mathieu-Daudé - remove a VLA allocation
 - Denis V. Lunev - fix regression in iotest 233 with qemu-nbd -v --fork
 -----BEGIN PGP SIGNATURE-----
 
 iQEzBAABCAAdFiEEccLMIrHEYCkn0vOqp6FrSiUnQ2oFAmT7EsUACgkQp6FrSiUn
 Q2qiKgf9EqCWPmcsH2nvXrDvZmDc0/I4tineaNY+hSdPtSb6RFA1IH8AvzkrkPYU
 9ojX6QFp1Z30fUs+pwweQhBMYta03QyjCFhsbPRmDq391dtIDCeww3o+RD1kw/pg
 2ZC+P9N1U3pi2Hi8FhxH17GYYgOQnHMKM9gt1V7JOQvFsDFWbTo9sFj8p/BPoWxV
 I3TeLQDWqVnNjf57lG2pwhdKc8DbKoqRmA3XNiXiKI5inEBeRJsTdMMGn4YWpwJE
 Y5imM/PbyCqRKQ6MYyJenVk4QVTe1IKO6D4vf1ZHLDBEiaw9NaeYHlk6lnDC4O9v
 PeTycAwND6cMKYlKMyEzcJXv9IdRBw==
 =jAZi
 -----END PGP SIGNATURE-----

Merge tag 'pull-nbd-2023-09-07-v2' of https://repo.or.cz/qemu/ericb into staging

NBD patches for 2023-09-07

- Andrey Drobyshev - fix regression in iotest 197 under -nbd
- Stefan Hajnoczi - allow coroutine read and write context to split
across threads
- Philippe Mathieu-Daudé - remove a VLA allocation
- Denis V. Lunev - fix regression in iotest 233 with qemu-nbd -v --fork

# -----BEGIN PGP SIGNATURE-----
#
# iQEzBAABCAAdFiEEccLMIrHEYCkn0vOqp6FrSiUnQ2oFAmT7EsUACgkQp6FrSiUn
# Q2qiKgf9EqCWPmcsH2nvXrDvZmDc0/I4tineaNY+hSdPtSb6RFA1IH8AvzkrkPYU
# 9ojX6QFp1Z30fUs+pwweQhBMYta03QyjCFhsbPRmDq391dtIDCeww3o+RD1kw/pg
# 2ZC+P9N1U3pi2Hi8FhxH17GYYgOQnHMKM9gt1V7JOQvFsDFWbTo9sFj8p/BPoWxV
# I3TeLQDWqVnNjf57lG2pwhdKc8DbKoqRmA3XNiXiKI5inEBeRJsTdMMGn4YWpwJE
# Y5imM/PbyCqRKQ6MYyJenVk4QVTe1IKO6D4vf1ZHLDBEiaw9NaeYHlk6lnDC4O9v
# PeTycAwND6cMKYlKMyEzcJXv9IdRBw==
# =jAZi
# -----END PGP SIGNATURE-----
# gpg: Signature made Fri 08 Sep 2023 08:25:41 EDT
# gpg:                using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg:                 aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg:                 aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2  F3AA A7A1 6B4A 2527 436A

* tag 'pull-nbd-2023-09-07-v2' of https://repo.or.cz/qemu/ericb:
  qemu-nbd: document -v behavior in respect to --fork in man
  qemu-nbd: Restore "qemu-nbd -v --fork" output
  qemu-nbd: invent nbd_client_release_pipe() helper
  qemu-nbd: put saddr into into struct NbdClientOpts
  qemu-nbd: move srcpath into struct NbdClientOpts
  qemu-nbd: define struct NbdClientOpts when HAVE_NBD_DEVICE is not defined
  qemu-nbd: improve error message for dup2 error
  util/iov: Avoid dynamic stack allocation
  io: follow coroutine AioContext in qio_channel_yield()
  io: check there are no qio_channel_yield() coroutines during ->finalize()
  nbd: drop unused nbd_start_negotiate() aio_context argument
  nbd: drop unused nbd_receive_negotiate() aio_context argument
  qemu-iotests/197: use more generic commands for formats other than qcow2

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Stefan Hajnoczi 2023-09-08 10:06:01 -04:00
commit 0b63052a46
24 changed files with 328 additions and 219 deletions

View File

@ -352,7 +352,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
} }
qio_channel_set_blocking(s->ioc, false, NULL); qio_channel_set_blocking(s->ioc, false, NULL);
qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs)); qio_channel_set_follow_coroutine_ctx(s->ioc, true);
/* successfully connected */ /* successfully connected */
WITH_QEMU_LOCK_GUARD(&s->requests_lock) { WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
@ -397,7 +397,6 @@ static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
/* Finalize previous connection if any */ /* Finalize previous connection if any */
if (s->ioc) { if (s->ioc) {
qio_channel_detach_aio_context(s->ioc);
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs); nbd_yank, s->bs);
object_unref(OBJECT(s->ioc)); object_unref(OBJECT(s->ioc));
@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
* the reconnect_delay_timer cannot be active here. * the reconnect_delay_timer cannot be active here.
*/ */
assert(!s->reconnect_delay_timer); assert(!s->reconnect_delay_timer);
if (s->ioc) {
qio_channel_attach_aio_context(s->ioc, new_context);
}
} }
static void nbd_detach_aio_context(BlockDriverState *bs) static void nbd_detach_aio_context(BlockDriverState *bs)
@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)
assert(!s->open_timer); assert(!s->open_timer);
assert(!s->reconnect_delay_timer); assert(!s->reconnect_delay_timer);
if (s->ioc) {
qio_channel_detach_aio_context(s->ioc);
}
} }
static BlockDriver bdrv_nbd = { static BlockDriver bdrv_nbd = {

View File

@ -197,7 +197,9 @@ driver options if :option:`--image-opts` is specified.
.. option:: -v, --verbose .. option:: -v, --verbose
Display extra debugging information. Display extra debugging information. This option also keeps the original
*STDERR* stream open if the ``qemu-nbd`` process is daemonized due to
other options like :option:`--fork` or :option:`-c`.
.. option:: -h, --help .. option:: -h, --help

View File

@ -324,8 +324,7 @@ typedef struct NBDExportInfo {
char **contexts; char **contexts;
} NBDExportInfo; } NBDExportInfo;
int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc, int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
QCryptoTLSCreds *tlscreds,
const char *hostname, QIOChannel **outioc, const char *hostname, QIOChannel **outioc,
NBDExportInfo *info, Error **errp); NBDExportInfo *info, Error **errp);
void nbd_free_export_list(NBDExportInfo *info, int count); void nbd_free_export_list(NBDExportInfo *info, int count);

View File

@ -49,4 +49,27 @@
QIOChannel *qio_channel_new_fd(int fd, QIOChannel *qio_channel_new_fd(int fd,
Error **errp); Error **errp);
/**
* qio_channel_util_set_aio_fd_handler:
* @read_fd: the file descriptor for the read handler
* @read_ctx: the AioContext for the read handler
* @io_read: the read handler
* @write_fd: the file descriptor for the write handler
* @write_ctx: the AioContext for the write handler
* @io_write: the write handler
* @opaque: the opaque argument to the read and write handler
*
* Set the read and write handlers when @read_ctx and @write_ctx are non-NULL,
* respectively. To leave a handler in its current state, pass a NULL
* AioContext. To clear a handler, pass a non-NULL AioContext and a NULL
* handler.
*/
void qio_channel_util_set_aio_fd_handler(int read_fd,
AioContext *read_ctx,
IOHandler *io_read,
int write_fd,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
#endif /* QIO_CHANNEL_UTIL_H */ #endif /* QIO_CHANNEL_UTIL_H */

View File

@ -81,9 +81,11 @@ struct QIOChannel {
Object parent; Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */ unsigned int features; /* bitmask of QIOChannelFeatures */
char *name; char *name;
AioContext *ctx; AioContext *read_ctx;
Coroutine *read_coroutine; Coroutine *read_coroutine;
AioContext *write_ctx;
Coroutine *write_coroutine; Coroutine *write_coroutine;
bool follow_coroutine_ctx;
#ifdef _WIN32 #ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */ HANDLE event; /* For use with GSource on Win32 */
#endif #endif
@ -140,8 +142,9 @@ struct QIOChannelClass {
int whence, int whence,
Error **errp); Error **errp);
void (*io_set_aio_fd_handler)(QIOChannel *ioc, void (*io_set_aio_fd_handler)(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque); void *opaque);
int (*io_flush)(QIOChannel *ioc, int (*io_flush)(QIOChannel *ioc,
@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
bool enabled, bool enabled,
Error **errp); Error **errp);
/**
* qio_channel_set_follow_coroutine_ctx:
* @ioc: the channel object
* @enabled: whether or not to follow the coroutine's AioContext
*
* If @enabled is true, calls to qio_channel_yield() use the current
* coroutine's AioContext. Usually this is desirable.
*
* If @enabled is false, calls to qio_channel_yield() use the global iohandler
* AioContext. This is may be used by coroutines that run in the main loop and
* do not wish to respond to I/O during nested event loops. This is the
* default for compatibility with code that is not aware of AioContexts.
*/
void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
/** /**
* qio_channel_close: * qio_channel_close:
* @ioc: the channel object * @ioc: the channel object
@ -703,41 +721,6 @@ GSource *qio_channel_add_watch_source(QIOChannel *ioc,
GDestroyNotify notify, GDestroyNotify notify,
GMainContext *context); GMainContext *context);
/**
* qio_channel_attach_aio_context:
* @ioc: the channel object
* @ctx: the #AioContext to set the handlers on
*
* Request that qio_channel_yield() sets I/O handlers on
* the given #AioContext. If @ctx is %NULL, qio_channel_yield()
* uses QEMU's main thread event loop.
*
* You can move a #QIOChannel from one #AioContext to another even if
* I/O handlers are set for a coroutine. However, #QIOChannel provides
* no synchronization between the calls to qio_channel_yield() and
* qio_channel_attach_aio_context().
*
* Therefore you should first call qio_channel_detach_aio_context()
* to ensure that the coroutine is not entered concurrently. Then,
* while the coroutine has yielded, call qio_channel_attach_aio_context(),
* and then aio_co_schedule() to place the coroutine on the new
* #AioContext. The calls to qio_channel_detach_aio_context()
* and qio_channel_attach_aio_context() should be protected with
* aio_context_acquire() and aio_context_release().
*/
void qio_channel_attach_aio_context(QIOChannel *ioc,
AioContext *ctx);
/**
* qio_channel_detach_aio_context:
* @ioc: the channel object
*
* Disable any I/O handlers set by qio_channel_yield(). With the
* help of aio_co_schedule(), this allows moving a coroutine that was
* paused by qio_channel_yield() to another context.
*/
void qio_channel_detach_aio_context(QIOChannel *ioc);
/** /**
* qio_channel_yield: * qio_channel_yield:
* @ioc: the channel object * @ioc: the channel object
@ -785,8 +768,9 @@ void qio_channel_wait(QIOChannel *ioc,
/** /**
* qio_channel_set_aio_fd_handler: * qio_channel_set_aio_fd_handler:
* @ioc: the channel object * @ioc: the channel object
* @ctx: the AioContext to set the handlers on * @read_ctx: the AioContext to set the read handler on or NULL
* @io_read: the read handler * @io_read: the read handler
* @write_ctx: the AioContext to set the write handler on or NULL
* @io_write: the write handler * @io_write: the write handler
* @opaque: the opaque value passed to the handler * @opaque: the opaque value passed to the handler
* *
@ -794,10 +778,17 @@ void qio_channel_wait(QIOChannel *ioc,
* be used by channel implementations to forward the handlers * be used by channel implementations to forward the handlers
* to another channel (e.g. from #QIOChannelTLS to the * to another channel (e.g. from #QIOChannelTLS to the
* underlying socket). * underlying socket).
*
* When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
* NULL, don't touch the write handler. Note that setting the read handler
* clears the write handler, and vice versa, if they share the same AioContext.
* Therefore the caller must pass both handlers together when sharing the same
* AioContext.
*/ */
void qio_channel_set_aio_fd_handler(QIOChannel *ioc, void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque); void *opaque);

View File

@ -43,6 +43,7 @@ typedef struct {
unsigned int in_flight; /* atomic */ unsigned int in_flight; /* atomic */
/* Protected by ctx lock */ /* Protected by ctx lock */
bool in_qio_channel_yield;
bool wait_idle; bool wait_idle;
VuDev vu_dev; VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */ QIOChannel *ioc; /* The I/O channel with the client */

View File

@ -20,6 +20,7 @@
#include "qemu/osdep.h" #include "qemu/osdep.h"
#include "io/channel-command.h" #include "io/channel-command.h"
#include "io/channel-util.h"
#include "io/channel-watch.h" #include "io/channel-watch.h"
#include "qapi/error.h" #include "qapi/error.h"
#include "qemu/module.h" #include "qemu/module.h"
@ -331,14 +332,17 @@ static int qio_channel_command_close(QIOChannel *ioc,
static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc, static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc); QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque); qio_channel_util_set_aio_fd_handler(cioc->readfd, read_ctx, io_read,
cioc->writefd, write_ctx, io_write,
opaque);
} }

View File

@ -20,6 +20,7 @@
#include "qemu/osdep.h" #include "qemu/osdep.h"
#include "io/channel-file.h" #include "io/channel-file.h"
#include "io/channel-util.h"
#include "io/channel-watch.h" #include "io/channel-watch.h"
#include "qapi/error.h" #include "qapi/error.h"
#include "qemu/module.h" #include "qemu/module.h"
@ -192,13 +193,17 @@ static int qio_channel_file_close(QIOChannel *ioc,
static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc, static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc); QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
qio_channel_util_set_aio_fd_handler(fioc->fd, read_ctx, io_read,
fioc->fd, write_ctx, io_write,
opaque);
} }
static GSource *qio_channel_file_create_watch(QIOChannel *ioc, static GSource *qio_channel_file_create_watch(QIOChannel *ioc,

View File

@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
static void static void
qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED, qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
AioContext *ctx G_GNUC_UNUSED, AioContext *read_ctx G_GNUC_UNUSED,
IOHandler *io_read G_GNUC_UNUSED, IOHandler *io_read G_GNUC_UNUSED,
AioContext *write_ctx G_GNUC_UNUSED,
IOHandler *io_write G_GNUC_UNUSED, IOHandler *io_write G_GNUC_UNUSED,
void *opaque G_GNUC_UNUSED) void *opaque G_GNUC_UNUSED)
{ {

View File

@ -22,6 +22,7 @@
#include "qapi/qapi-visit-sockets.h" #include "qapi/qapi-visit-sockets.h"
#include "qemu/module.h" #include "qemu/module.h"
#include "io/channel-socket.h" #include "io/channel-socket.h"
#include "io/channel-util.h"
#include "io/channel-watch.h" #include "io/channel-watch.h"
#include "trace.h" #include "trace.h"
#include "qapi/clone-visitor.h" #include "qapi/clone-visitor.h"
@ -893,13 +894,17 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
} }
static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
sioc->fd, write_ctx, io_write,
opaque);
} }
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,

View File

@ -388,14 +388,16 @@ static int qio_channel_tls_close(QIOChannel *ioc,
} }
static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc, static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc); QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque); qio_channel_set_aio_fd_handler(tioc->master, read_ctx, io_read,
write_ctx, io_write, opaque);
} }
typedef struct QIOChannelTLSSource QIOChannelTLSSource; typedef struct QIOChannelTLSSource QIOChannelTLSSource;

View File

@ -36,3 +36,27 @@ QIOChannel *qio_channel_new_fd(int fd,
} }
return ioc; return ioc;
} }
void qio_channel_util_set_aio_fd_handler(int read_fd,
AioContext *read_ctx,
IOHandler *io_read,
int write_fd,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
if (read_fd == write_fd && read_ctx == write_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
NULL, NULL, opaque);
} else {
if (read_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
NULL, NULL, opaque);
}
if (write_ctx) {
aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
NULL, NULL, opaque);
}
}
}

View File

@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
} }
void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
{
ioc->follow_coroutine_ctx = enabled;
}
int qio_channel_close(QIOChannel *ioc, int qio_channel_close(QIOChannel *ioc,
Error **errp) Error **errp)
{ {
@ -388,14 +394,16 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
void qio_channel_set_aio_fd_handler(QIOChannel *ioc, void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
opaque);
} }
guint qio_channel_add_watch_full(QIOChannel *ioc, guint qio_channel_add_watch_full(QIOChannel *ioc,
@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
aio_co_wake(co); aio_co_wake(co);
} }
static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) static void coroutine_fn
qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
{ {
IOHandler *rd_handler = NULL, *wr_handler = NULL; AioContext *ctx = ioc->follow_coroutine_ctx ?
qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
iohandler_get_aio_context();
AioContext *read_ctx = NULL;
IOHandler *io_read = NULL;
AioContext *write_ctx = NULL;
IOHandler *io_write = NULL;
if (condition == G_IO_IN) {
ioc->read_coroutine = qemu_coroutine_self();
ioc->read_ctx = ctx;
read_ctx = ctx;
io_read = qio_channel_restart_read;
/*
* Thread safety: if the other coroutine is set and its AioContext
* matches ours, then there is mutual exclusion between read and write
* because they share a single thread and it's safe to set both read
* and write fd handlers here. If the AioContext does not match ours,
* then both threads may run in parallel but there is no shared state
* to worry about.
*/
if (ioc->write_coroutine && ioc->write_ctx == ctx) {
write_ctx = ctx;
io_write = qio_channel_restart_write;
}
} else if (condition == G_IO_OUT) {
ioc->write_coroutine = qemu_coroutine_self();
ioc->write_ctx = ctx;
write_ctx = ctx;
io_write = qio_channel_restart_write;
if (ioc->read_coroutine && ioc->read_ctx == ctx) {
read_ctx = ctx;
io_read = qio_channel_restart_read;
}
} else {
abort();
}
qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
write_ctx, io_write, ioc);
}
static void coroutine_fn
qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
{
AioContext *read_ctx = NULL;
IOHandler *io_read = NULL;
AioContext *write_ctx = NULL;
IOHandler *io_write = NULL;
AioContext *ctx; AioContext *ctx;
if (ioc->read_coroutine) { if (condition == G_IO_IN) {
rd_handler = qio_channel_restart_read; ctx = ioc->read_ctx;
read_ctx = ctx;
io_read = NULL;
if (ioc->write_coroutine && ioc->write_ctx == ctx) {
write_ctx = ctx;
io_write = qio_channel_restart_write;
} }
if (ioc->write_coroutine) { } else if (condition == G_IO_OUT) {
wr_handler = qio_channel_restart_write; ctx = ioc->write_ctx;
write_ctx = ctx;
io_write = NULL;
if (ioc->read_coroutine && ioc->read_ctx == ctx) {
read_ctx = ctx;
io_read = qio_channel_restart_read;
}
} else {
abort();
} }
ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); write_ctx, io_write, ioc);
}
void qio_channel_attach_aio_context(QIOChannel *ioc,
AioContext *ctx)
{
assert(!ioc->read_coroutine);
assert(!ioc->write_coroutine);
ioc->ctx = ctx;
}
void qio_channel_detach_aio_context(QIOChannel *ioc)
{
ioc->read_coroutine = NULL;
ioc->write_coroutine = NULL;
qio_channel_set_aio_fd_handlers(ioc);
ioc->ctx = NULL;
} }
void coroutine_fn qio_channel_yield(QIOChannel *ioc, void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition) GIOCondition condition)
{ {
AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context(); AioContext *ioc_ctx;
assert(qemu_in_coroutine()); assert(qemu_in_coroutine());
assert(in_aio_context_home_thread(ioc_ctx)); ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
if (condition == G_IO_IN) { if (condition == G_IO_IN) {
assert(!ioc->read_coroutine); assert(!ioc->read_coroutine);
ioc->read_coroutine = qemu_coroutine_self();
} else if (condition == G_IO_OUT) { } else if (condition == G_IO_OUT) {
assert(!ioc->write_coroutine); assert(!ioc->write_coroutine);
ioc->write_coroutine = qemu_coroutine_self();
} else { } else {
abort(); abort();
} }
qio_channel_set_aio_fd_handlers(ioc); qio_channel_set_fd_handlers(ioc, condition);
qemu_coroutine_yield(); qemu_coroutine_yield();
assert(in_aio_context_home_thread(ioc_ctx)); assert(in_aio_context_home_thread(ioc_ctx));
@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
* through the aio_fd_handlers. */ * through the aio_fd_handlers. */
if (condition == G_IO_IN) { if (condition == G_IO_IN) {
assert(ioc->read_coroutine == NULL); assert(ioc->read_coroutine == NULL);
qio_channel_set_aio_fd_handlers(ioc);
} else if (condition == G_IO_OUT) { } else if (condition == G_IO_OUT) {
assert(ioc->write_coroutine == NULL); assert(ioc->write_coroutine == NULL);
qio_channel_set_aio_fd_handlers(ioc);
} }
qio_channel_clear_fd_handlers(ioc, condition);
} }
void qio_channel_wake_read(QIOChannel *ioc) void qio_channel_wake_read(QIOChannel *ioc)
@ -653,6 +705,10 @@ static void qio_channel_finalize(Object *obj)
{ {
QIOChannel *ioc = QIO_CHANNEL(obj); QIOChannel *ioc = QIO_CHANNEL(obj);
/* Must not have coroutines in qio_channel_yield() */
assert(!ioc->read_coroutine);
assert(!ioc->write_coroutine);
g_free(ioc->name); g_free(ioc->name);
#ifdef _WIN32 #ifdef _WIN32

View File

@ -158,8 +158,9 @@ qio_channel_block_close(QIOChannel *ioc,
static void static void
qio_channel_block_set_aio_fd_handler(QIOChannel *ioc, qio_channel_block_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {

View File

@ -3103,22 +3103,23 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
} }
static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx, AioContext *read_ctx,
IOHandler *io_read, IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write, IOHandler *io_write,
void *opaque) void *opaque)
{ {
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
if (io_read) { if (io_read) {
aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd, io_read, aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
io_write, NULL, NULL, opaque); io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd, io_read, aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
io_write, NULL, NULL, opaque); io_read, io_write, NULL, NULL, opaque);
} else { } else {
aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd, io_read, aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
io_write, NULL, NULL, opaque); io_read, io_write, NULL, NULL, opaque);
aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd, io_read, aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
io_write, NULL, NULL, opaque); io_read, io_write, NULL, NULL, opaque);
} }
} }

View File

@ -146,8 +146,7 @@ static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr,
return 0; return 0;
} }
ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), tlscreds, ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, tlshostname,
tlshostname,
outioc, info, errp); outioc, info, errp);
if (ret < 0) { if (ret < 0) {
/* /*

View File

@ -877,8 +877,7 @@ static int nbd_list_meta_contexts(QIOChannel *ioc,
* Returns: negative errno: failure talking to server * Returns: negative errno: failure talking to server
* non-negative: enum NBDMode describing server abilities * non-negative: enum NBDMode describing server abilities
*/ */
static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc, static int nbd_start_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
QCryptoTLSCreds *tlscreds,
const char *hostname, QIOChannel **outioc, const char *hostname, QIOChannel **outioc,
bool structured_reply, bool *zeroes, bool structured_reply, bool *zeroes,
Error **errp) Error **errp)
@ -946,10 +945,6 @@ static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc,
return -EINVAL; return -EINVAL;
} }
ioc = *outioc; ioc = *outioc;
if (aio_context) {
qio_channel_set_blocking(ioc, false, NULL);
qio_channel_attach_aio_context(ioc, aio_context);
}
} else { } else {
error_setg(errp, "Server does not support STARTTLS"); error_setg(errp, "Server does not support STARTTLS");
return -EINVAL; return -EINVAL;
@ -1014,8 +1009,7 @@ static int nbd_negotiate_finish_oldstyle(QIOChannel *ioc, NBDExportInfo *info,
* Returns: negative errno: failure talking to server * Returns: negative errno: failure talking to server
* 0: server is connected * 0: server is connected
*/ */
int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc, int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
QCryptoTLSCreds *tlscreds,
const char *hostname, QIOChannel **outioc, const char *hostname, QIOChannel **outioc,
NBDExportInfo *info, Error **errp) NBDExportInfo *info, Error **errp)
{ {
@ -1027,7 +1021,7 @@ int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc,
assert(info->name && strlen(info->name) <= NBD_MAX_STRING_SIZE); assert(info->name && strlen(info->name) <= NBD_MAX_STRING_SIZE);
trace_nbd_receive_negotiate_name(info->name); trace_nbd_receive_negotiate_name(info->name);
result = nbd_start_negotiate(aio_context, ioc, tlscreds, hostname, outioc, result = nbd_start_negotiate(ioc, tlscreds, hostname, outioc,
info->structured_reply, &zeroes, errp); info->structured_reply, &zeroes, errp);
if (result < 0) { if (result < 0) {
return result; return result;
@ -1150,7 +1144,7 @@ int nbd_receive_export_list(QIOChannel *ioc, QCryptoTLSCreds *tlscreds,
QIOChannel *sioc = NULL; QIOChannel *sioc = NULL;
*info = NULL; *info = NULL;
result = nbd_start_negotiate(NULL, ioc, tlscreds, hostname, &sioc, true, result = nbd_start_negotiate(ioc, tlscreds, hostname, &sioc, true,
NULL, errp); NULL, errp);
if (tlscreds && sioc) { if (tlscreds && sioc) {
ioc = sioc; ioc = sioc;

View File

@ -1333,6 +1333,7 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
*/ */
qio_channel_set_blocking(client->ioc, false, NULL); qio_channel_set_blocking(client->ioc, false, NULL);
qio_channel_set_follow_coroutine_ctx(client->ioc, true);
trace_nbd_negotiate_begin(); trace_nbd_negotiate_begin();
memcpy(buf, "NBDMAGIC", 8); memcpy(buf, "NBDMAGIC", 8);
@ -1352,11 +1353,6 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
return ret; return ret;
} }
/* Attach the channel to the same AioContext as the export */
if (client->exp && client->exp->common.ctx) {
qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
}
assert(!client->optlen); assert(!client->optlen);
trace_nbd_negotiate_success(); trace_nbd_negotiate_success();
@ -1465,7 +1461,6 @@ void nbd_client_put(NBDClient *client)
*/ */
assert(client->closing); assert(client->closing);
qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc)); object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc)); object_unref(OBJECT(client->ioc));
if (client->tlscreds) { if (client->tlscreds) {
@ -1544,8 +1539,6 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->common.ctx = ctx; exp->common.ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) { QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_attach_aio_context(client->ioc, ctx);
assert(client->nb_requests == 0); assert(client->nb_requests == 0);
assert(client->recv_coroutine == NULL); assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL); assert(client->send_coroutine == NULL);
@ -1555,14 +1548,9 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
static void blk_aio_detach(void *opaque) static void blk_aio_detach(void *opaque)
{ {
NBDExport *exp = opaque; NBDExport *exp = opaque;
NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_detach_aio_context(client->ioc);
}
exp->common.ctx = NULL; exp->common.ctx = NULL;
} }

View File

@ -73,8 +73,6 @@
#define MBR_SIZE 512 #define MBR_SIZE 512
static char *srcpath;
static SocketAddress *saddr;
static int persistent = 0; static int persistent = 0;
static enum { RUNNING, TERMINATE, TERMINATED } state; static enum { RUNNING, TERMINATE, TERMINATED } state;
static int shared = 1; static int shared = 1;
@ -253,6 +251,29 @@ static int qemu_nbd_client_list(SocketAddress *saddr, QCryptoTLSCreds *tls,
} }
struct NbdClientOpts {
char *device;
char *srcpath;
SocketAddress *saddr;
int old_stderr;
bool fork_process;
bool verbose;
};
static void nbd_client_release_pipe(int old_stderr)
{
/* Close stderr so that the qemu-nbd process exits. */
if (dup2(old_stderr, STDERR_FILENO) < 0) {
error_report("Could not release pipe to parent: %s",
strerror(errno));
exit(EXIT_FAILURE);
}
if (old_stderr != STDOUT_FILENO && close(old_stderr) < 0) {
error_report("Could not release qemu-nbd: %s", strerror(errno));
exit(EXIT_FAILURE);
}
}
#if HAVE_NBD_DEVICE #if HAVE_NBD_DEVICE
static void *show_parts(void *arg) static void *show_parts(void *arg)
{ {
@ -271,12 +292,6 @@ static void *show_parts(void *arg)
return NULL; return NULL;
} }
struct NbdClientOpts {
char *device;
bool fork_process;
bool verbose;
};
static void *nbd_client_thread(void *arg) static void *nbd_client_thread(void *arg)
{ {
struct NbdClientOpts *opts = arg; struct NbdClientOpts *opts = arg;
@ -289,14 +304,14 @@ static void *nbd_client_thread(void *arg)
sioc = qio_channel_socket_new(); sioc = qio_channel_socket_new();
if (qio_channel_socket_connect_sync(sioc, if (qio_channel_socket_connect_sync(sioc,
saddr, opts->saddr,
&local_error) < 0) { &local_error) < 0) {
error_report_err(local_error); error_report_err(local_error);
goto out; goto out;
} }
if (nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), if (nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, NULL, NULL,
NULL, NULL, NULL, &info, &local_error) < 0) { &info, &local_error) < 0) {
if (local_error) { if (local_error) {
error_report_err(local_error); error_report_err(local_error);
} }
@ -320,14 +335,9 @@ static void *nbd_client_thread(void *arg)
if (opts->verbose && !opts->fork_process) { if (opts->verbose && !opts->fork_process) {
fprintf(stderr, "NBD device %s is now connected to %s\n", fprintf(stderr, "NBD device %s is now connected to %s\n",
opts->device, srcpath); opts->device, opts->srcpath);
} else { } else {
/* Close stderr so that the qemu-nbd process exits. */ nbd_client_release_pipe(opts->old_stderr);
if (dup2(STDOUT_FILENO, STDERR_FILENO) < 0) {
error_report("Could not set stderr to /dev/null: %s",
strerror(errno));
exit(EXIT_FAILURE);
}
} }
if (nbd_client(fd) < 0) { if (nbd_client(fd) < 0) {
@ -519,7 +529,6 @@ int main(int argc, char **argv)
const char *bindto = NULL; const char *bindto = NULL;
const char *port = NULL; const char *port = NULL;
char *sockpath = NULL; char *sockpath = NULL;
char *device = NULL;
QemuOpts *sn_opts = NULL; QemuOpts *sn_opts = NULL;
const char *sn_id_or_name = NULL; const char *sn_id_or_name = NULL;
const char *sopt = "hVb:o:p:rsnc:dvk:e:f:tl:x:T:D:AB:L"; const char *sopt = "hVb:o:p:rsnc:dvk:e:f:tl:x:T:D:AB:L";
@ -582,16 +591,19 @@ int main(int argc, char **argv)
const char *tlshostname = NULL; const char *tlshostname = NULL;
bool imageOpts = false; bool imageOpts = false;
bool writethrough = false; /* Client will flush as needed. */ bool writethrough = false; /* Client will flush as needed. */
bool verbose = false;
bool fork_process = false;
bool list = false; bool list = false;
unsigned socket_activation; unsigned socket_activation;
const char *pid_file_name = NULL; const char *pid_file_name = NULL;
const char *selinux_label = NULL; const char *selinux_label = NULL;
BlockExportOptions *export_opts; BlockExportOptions *export_opts;
#if HAVE_NBD_DEVICE struct NbdClientOpts opts = {
struct NbdClientOpts opts; .fork_process = false,
#endif .verbose = false,
.device = NULL,
.srcpath = NULL,
.saddr = NULL,
.old_stderr = STDOUT_FILENO,
};
#ifdef CONFIG_POSIX #ifdef CONFIG_POSIX
os_setup_early_signal_handling(); os_setup_early_signal_handling();
@ -719,7 +731,7 @@ int main(int argc, char **argv)
disconnect = true; disconnect = true;
break; break;
case 'c': case 'c':
device = optarg; opts.device = optarg;
break; break;
case 'e': case 'e':
if (qemu_strtoi(optarg, NULL, 0, &shared) < 0 || if (qemu_strtoi(optarg, NULL, 0, &shared) < 0 ||
@ -750,7 +762,7 @@ int main(int argc, char **argv)
} }
break; break;
case 'v': case 'v':
verbose = true; opts.verbose = true;
break; break;
case 'V': case 'V':
version(argv[0]); version(argv[0]);
@ -782,7 +794,7 @@ int main(int argc, char **argv)
tlsauthz = optarg; tlsauthz = optarg;
break; break;
case QEMU_NBD_OPT_FORK: case QEMU_NBD_OPT_FORK:
fork_process = true; opts.fork_process = true;
break; break;
case 'L': case 'L':
list = true; list = true;
@ -802,12 +814,12 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (export_name || export_description || dev_offset || if (export_name || export_description || dev_offset ||
device || disconnect || fmt || sn_id_or_name || bitmaps || opts.device || disconnect || fmt || sn_id_or_name || bitmaps ||
alloc_depth || seen_aio || seen_discard || seen_cache) { alloc_depth || seen_aio || seen_discard || seen_cache) {
error_report("List mode is incompatible with per-device settings"); error_report("List mode is incompatible with per-device settings");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (fork_process) { if (opts.fork_process) {
error_report("List mode is incompatible with forking"); error_report("List mode is incompatible with forking");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -832,7 +844,8 @@ int main(int argc, char **argv)
} }
} else { } else {
/* Using socket activation - check user didn't use -p etc. */ /* Using socket activation - check user didn't use -p etc. */
const char *err_msg = socket_activation_validate_opts(device, sockpath, const char *err_msg = socket_activation_validate_opts(opts.device,
sockpath,
bindto, port, bindto, port,
selinux_label, selinux_label,
list); list);
@ -850,7 +863,7 @@ int main(int argc, char **argv)
} }
if (tlscredsid) { if (tlscredsid) {
if (device) { if (opts.device) {
error_report("TLS is not supported with a host device"); error_report("TLS is not supported with a host device");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -880,7 +893,7 @@ int main(int argc, char **argv)
if (selinux_label) { if (selinux_label) {
#ifdef CONFIG_SELINUX #ifdef CONFIG_SELINUX
if (sockpath == NULL && device == NULL) { if (sockpath == NULL && opts.device == NULL) {
error_report("--selinux-label is not permitted without --socket"); error_report("--selinux-label is not permitted without --socket");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -891,13 +904,13 @@ int main(int argc, char **argv)
} }
if (list) { if (list) {
saddr = nbd_build_socket_address(sockpath, bindto, port); opts.saddr = nbd_build_socket_address(sockpath, bindto, port);
return qemu_nbd_client_list(saddr, tlscreds, return qemu_nbd_client_list(opts.saddr, tlscreds,
tlshostname ? tlshostname : bindto); tlshostname ? tlshostname : bindto);
} }
#if !HAVE_NBD_DEVICE #if !HAVE_NBD_DEVICE
if (disconnect || device) { if (disconnect || opts.device) {
error_report("Kernel /dev/nbdN support not available"); error_report("Kernel /dev/nbdN support not available");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -919,7 +932,7 @@ int main(int argc, char **argv)
} }
#endif #endif
if ((device && !verbose) || fork_process) { if ((opts.device && !opts.verbose) || opts.fork_process) {
#ifndef WIN32 #ifndef WIN32
g_autoptr(GError) err = NULL; g_autoptr(GError) err = NULL;
int stderr_fd[2]; int stderr_fd[2];
@ -944,6 +957,16 @@ int main(int argc, char **argv)
close(stderr_fd[0]); close(stderr_fd[0]);
/* Remember parent's stderr if we will be restoring it. */
if (opts.verbose /* fork_process is set */) {
opts.old_stderr = dup(STDERR_FILENO);
if (opts.old_stderr < 0) {
error_report("Could not dup original stderr: %s",
strerror(errno));
exit(EXIT_FAILURE);
}
}
ret = qemu_daemon(1, 0); ret = qemu_daemon(1, 0);
saved_errno = errno; /* dup2 will overwrite error below */ saved_errno = errno; /* dup2 will overwrite error below */
@ -1002,9 +1025,9 @@ int main(int argc, char **argv)
#endif /* WIN32 */ #endif /* WIN32 */
} }
if (device != NULL && sockpath == NULL) { if (opts.device != NULL && sockpath == NULL) {
sockpath = g_malloc(128); sockpath = g_malloc(128);
snprintf(sockpath, 128, SOCKET_PATH, basename(device)); snprintf(sockpath, 128, SOCKET_PATH, basename(opts.device));
} }
server = qio_net_listener_new(); server = qio_net_listener_new();
@ -1023,8 +1046,8 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
#endif #endif
saddr = nbd_build_socket_address(sockpath, bindto, port); opts.saddr = nbd_build_socket_address(sockpath, bindto, port);
if (qio_net_listener_open_sync(server, saddr, backlog, if (qio_net_listener_open_sync(server, opts.saddr, backlog,
&local_err) < 0) { &local_err) < 0) {
object_unref(OBJECT(server)); object_unref(OBJECT(server));
error_report_err(local_err); error_report_err(local_err);
@ -1059,19 +1082,19 @@ int main(int argc, char **argv)
bdrv_init(); bdrv_init();
atexit(qemu_nbd_shutdown); atexit(qemu_nbd_shutdown);
srcpath = argv[optind]; opts.srcpath = argv[optind];
if (imageOpts) { if (imageOpts) {
QemuOpts *opts; QemuOpts *o;
if (fmt) { if (fmt) {
error_report("--image-opts and -f are mutually exclusive"); error_report("--image-opts and -f are mutually exclusive");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
opts = qemu_opts_parse_noisily(&file_opts, srcpath, true); o = qemu_opts_parse_noisily(&file_opts, opts.srcpath, true);
if (!opts) { if (!o) {
qemu_opts_reset(&file_opts); qemu_opts_reset(&file_opts);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
options = qemu_opts_to_qdict(opts, NULL); options = qemu_opts_to_qdict(o, NULL);
qemu_opts_reset(&file_opts); qemu_opts_reset(&file_opts);
blk = blk_new_open(NULL, NULL, options, flags, &local_err); blk = blk_new_open(NULL, NULL, options, flags, &local_err);
} else { } else {
@ -1079,7 +1102,7 @@ int main(int argc, char **argv)
options = qdict_new(); options = qdict_new();
qdict_put_str(options, "driver", fmt); qdict_put_str(options, "driver", fmt);
} }
blk = blk_new_open(srcpath, NULL, options, flags, &local_err); blk = blk_new_open(opts.srcpath, NULL, options, flags, &local_err);
} }
if (!blk) { if (!blk) {
@ -1145,15 +1168,9 @@ int main(int argc, char **argv)
blk_exp_add(export_opts, &error_fatal); blk_exp_add(export_opts, &error_fatal);
qapi_free_BlockExportOptions(export_opts); qapi_free_BlockExportOptions(export_opts);
if (device) { if (opts.device) {
#if HAVE_NBD_DEVICE #if HAVE_NBD_DEVICE
int ret; int ret;
opts = (struct NbdClientOpts) {
.device = device,
.fork_process = fork_process,
.verbose = verbose,
};
ret = pthread_create(&client_thread, NULL, nbd_client_thread, &opts); ret = pthread_create(&client_thread, NULL, nbd_client_thread, &opts);
if (ret != 0) { if (ret != 0) {
error_report("Failed to create client thread: %s", strerror(ret)); error_report("Failed to create client thread: %s", strerror(ret));
@ -1179,12 +1196,8 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (fork_process) { if (opts.fork_process) {
if (dup2(STDOUT_FILENO, STDERR_FILENO) < 0) { nbd_client_release_pipe(opts.old_stderr);
error_report("Could not set stderr to /dev/null: %s",
strerror(errno));
exit(EXIT_FAILURE);
}
} }
state = RUNNING; state = RUNNING;
@ -1203,7 +1216,7 @@ int main(int argc, char **argv)
qemu_opts_del(sn_opts); qemu_opts_del(sn_opts);
if (device) { if (opts.device) {
void *ret; void *ret;
pthread_join(client_thread, &ret); pthread_join(client_thread, &ret);
exit(ret != NULL); exit(ret != NULL);

View File

@ -735,8 +735,7 @@ static void coroutine_fn prh_co_entry(void *opaque)
qio_channel_set_blocking(QIO_CHANNEL(client->ioc), qio_channel_set_blocking(QIO_CHANNEL(client->ioc),
false, NULL); false, NULL);
qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), qio_channel_set_follow_coroutine_ctx(QIO_CHANNEL(client->ioc), true);
qemu_get_aio_context());
/* A very simple negotiation for future extensibility. No features /* A very simple negotiation for future extensibility. No features
* are defined so write 0. * are defined so write 0.
@ -796,7 +795,6 @@ static void coroutine_fn prh_co_entry(void *opaque)
} }
out: out:
qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
object_unref(OBJECT(client->ioc)); object_unref(OBJECT(client->ioc));
g_free(client); g_free(client);
} }

View File

@ -136,18 +136,18 @@ IMGPROTO=file IMGFMT=qcow2 TEST_IMG_FILE="$TEST_WRAP" \
$QEMU_IO -c "write -P 0xaa 0 64k" "$TEST_IMG" | _filter_qemu_io $QEMU_IO -c "write -P 0xaa 0 64k" "$TEST_IMG" | _filter_qemu_io
# Allocate individual subclusters in the top image, and not the whole cluster # Allocate individual subclusters in the top image, and not the whole cluster
$QEMU_IO -c "write -P 0xbb 28K 2K" -c "write -P 0xcc 34K 2K" "$TEST_WRAP" \ $QEMU_IO -f qcow2 -c "write -P 0xbb 28K 2K" -c "write -P 0xcc 34K 2K" "$TEST_WRAP" \
| _filter_qemu_io | _filter_qemu_io
# Only 2 subclusters should be allocated in the top image at this point # Only 2 subclusters should be allocated in the top image at this point
$QEMU_IMG map "$TEST_WRAP" | _filter_qemu_img_map $QEMU_IO -f qcow2 -c map "$TEST_WRAP"
# Actual copy-on-read operation # Actual copy-on-read operation
$QEMU_IO -C -c "read -P 0xaa 30K 4K" "$TEST_WRAP" | _filter_qemu_io $QEMU_IO -f qcow2 -C -c "read -P 0xaa 30K 4K" "$TEST_WRAP" | _filter_qemu_io
# And here we should have 4 subclusters allocated right in the middle of the # And here we should have 4 subclusters allocated right in the middle of the
# top image. Make sure the whole cluster remains unallocated # top image. Make sure the whole cluster remains unallocated
$QEMU_IMG map "$TEST_WRAP" | _filter_qemu_img_map $QEMU_IO -f qcow2 -c map "$TEST_WRAP"
_check_test_img _check_test_img

View File

@ -42,17 +42,15 @@ wrote 2048/2048 bytes at offset 28672
2 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) 2 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
wrote 2048/2048 bytes at offset 34816 wrote 2048/2048 bytes at offset 34816
2 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) 2 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Offset Length File 28 KiB (0x7000) bytes not allocated at offset 0 bytes (0x0)
0 0x7000 TEST_DIR/t.IMGFMT 2 KiB (0x800) bytes allocated at offset 28 KiB (0x7000)
0x7000 0x800 TEST_DIR/t.wrap.IMGFMT 4 KiB (0x1000) bytes not allocated at offset 30 KiB (0x7800)
0x7800 0x1000 TEST_DIR/t.IMGFMT 2 KiB (0x800) bytes allocated at offset 34 KiB (0x8800)
0x8800 0x800 TEST_DIR/t.wrap.IMGFMT 28 KiB (0x7000) bytes not allocated at offset 36 KiB (0x9000)
0x9000 0x7000 TEST_DIR/t.IMGFMT
read 4096/4096 bytes at offset 30720 read 4096/4096 bytes at offset 30720
4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) 4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Offset Length File 28 KiB (0x7000) bytes not allocated at offset 0 bytes (0x0)
0 0x7000 TEST_DIR/t.IMGFMT 8 KiB (0x2000) bytes allocated at offset 28 KiB (0x7000)
0x7000 0x2000 TEST_DIR/t.wrap.IMGFMT 28 KiB (0x7000) bytes not allocated at offset 36 KiB (0x9000)
0x9000 0x7000 TEST_DIR/t.IMGFMT
No errors were found on the image. No errors were found on the image.
*** done *** done

View File

@ -571,7 +571,7 @@ static int sortelem_cmp_src_index(const void *a, const void *b)
*/ */
void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf) void qemu_iovec_clone(QEMUIOVector *dest, const QEMUIOVector *src, void *buf)
{ {
IOVectorSortElem sortelems[src->niov]; g_autofree IOVectorSortElem *sortelems = g_new(IOVectorSortElem, src->niov);
void *last_end; void *last_end;
int i; int i;

View File

@ -127,7 +127,14 @@ vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
if (rc < 0) { if (rc < 0) {
if (rc == QIO_CHANNEL_ERR_BLOCK) { if (rc == QIO_CHANNEL_ERR_BLOCK) {
assert(local_err == NULL); assert(local_err == NULL);
if (server->ctx) {
server->in_qio_channel_yield = true;
qio_channel_yield(ioc, G_IO_IN); qio_channel_yield(ioc, G_IO_IN);
server->in_qio_channel_yield = false;
} else {
/* Wait until attached to an AioContext again */
qemu_coroutine_yield();
}
continue; continue;
} else { } else {
error_report_err(local_err); error_report_err(local_err);
@ -278,7 +285,7 @@ set_watch(VuDev *vu_dev, int fd, int vu_evt,
vu_fd_watch->fd = fd; vu_fd_watch->fd = fd;
vu_fd_watch->cb = cb; vu_fd_watch->cb = cb;
qemu_socket_set_nonblock(fd); qemu_socket_set_nonblock(fd);
aio_set_fd_handler(server->ioc->ctx, fd, kick_handler, aio_set_fd_handler(server->ctx, fd, kick_handler,
NULL, NULL, NULL, vu_fd_watch); NULL, NULL, NULL, vu_fd_watch);
vu_fd_watch->vu_dev = vu_dev; vu_fd_watch->vu_dev = vu_dev;
vu_fd_watch->pvt = pvt; vu_fd_watch->pvt = pvt;
@ -299,7 +306,7 @@ static void remove_watch(VuDev *vu_dev, int fd)
if (!vu_fd_watch) { if (!vu_fd_watch) {
return; return;
} }
aio_set_fd_handler(server->ioc->ctx, fd, NULL, NULL, NULL, NULL, NULL); aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
g_free(vu_fd_watch); g_free(vu_fd_watch);
@ -344,6 +351,8 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
/* TODO vu_message_write() spins if non-blocking! */ /* TODO vu_message_write() spins if non-blocking! */
qio_channel_set_blocking(server->ioc, false, NULL); qio_channel_set_blocking(server->ioc, false, NULL);
qio_channel_set_follow_coroutine_ctx(server->ioc, true);
server->co_trip = qemu_coroutine_create(vu_client_trip, server); server->co_trip = qemu_coroutine_create(vu_client_trip, server);
aio_context_acquire(server->ctx); aio_context_acquire(server->ctx);
@ -399,13 +408,12 @@ void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
return; return;
} }
qio_channel_attach_aio_context(server->ioc, ctx);
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL, aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
NULL, NULL, vu_fd_watch); NULL, NULL, vu_fd_watch);
} }
assert(!server->in_qio_channel_yield);
aio_co_schedule(ctx, server->co_trip); aio_co_schedule(ctx, server->co_trip);
} }
@ -419,11 +427,16 @@ void vhost_user_server_detach_aio_context(VuServer *server)
aio_set_fd_handler(server->ctx, vu_fd_watch->fd, aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
NULL, NULL, NULL, NULL, vu_fd_watch); NULL, NULL, NULL, NULL, vu_fd_watch);
} }
qio_channel_detach_aio_context(server->ioc);
} }
server->ctx = NULL; server->ctx = NULL;
if (server->ioc) {
if (server->in_qio_channel_yield) {
/* Stop receiving the next vhost-user message */
qio_channel_wake_read(server->ioc);
}
}
} }
bool vhost_user_server_start(VuServer *server, bool vhost_user_server_start(VuServer *server,