qmp: Move dispatcher to a coroutine
This moves the QMP dispatcher to a coroutine and runs all QMP command handlers that declare 'coroutine': true in coroutine context so they can avoid blocking the main loop while doing I/O or waiting for other events. For commands that are not declared safe to run in a coroutine, the dispatcher drops out of coroutine context by calling the QMP command handler from a bottom half. Signed-off-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Markus Armbruster <armbru@redhat.com> Message-Id: <20201005155855.256490-10-kwolf@redhat.com> Reviewed-by: Markus Armbruster <armbru@redhat.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Markus Armbruster <armbru@redhat.com>
This commit is contained in:
		
							parent
							
								
									04f22362f1
								
							
						
					
					
						commit
						9ce44e2ce2
					
				| @ -31,6 +31,7 @@ typedef enum QmpCommandOptions | ||||
| typedef struct QmpCommand | ||||
| { | ||||
|     const char *name; | ||||
|     /* Runs in coroutine context if QCO_COROUTINE is set */ | ||||
|     QmpCommandFunc *fn; | ||||
|     QmpCommandOptions options; | ||||
|     QTAILQ_ENTRY(QmpCommand) node; | ||||
|  | ||||
| @ -155,7 +155,9 @@ static inline bool monitor_is_qmp(const Monitor *mon) | ||||
| 
 | ||||
| typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList; | ||||
| extern IOThread *mon_iothread; | ||||
| extern QEMUBH *qmp_dispatcher_bh; | ||||
| extern Coroutine *qmp_dispatcher_co; | ||||
| extern bool qmp_dispatcher_co_shutdown; | ||||
| extern bool qmp_dispatcher_co_busy; | ||||
| extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands; | ||||
| extern QemuMutex monitor_lock; | ||||
| extern MonitorList mon_list; | ||||
| @ -173,7 +175,7 @@ void monitor_fdsets_cleanup(void); | ||||
| 
 | ||||
| void qmp_send_response(MonitorQMP *mon, const QDict *rsp); | ||||
| void monitor_data_destroy_qmp(MonitorQMP *mon); | ||||
| void monitor_qmp_bh_dispatcher(void *data); | ||||
| void coroutine_fn monitor_qmp_dispatcher_co(void *data); | ||||
| 
 | ||||
| int get_monitor_def(int64_t *pval, const char *name); | ||||
| void help_cmd(Monitor *mon, const char *name); | ||||
|  | ||||
| @ -55,8 +55,32 @@ typedef struct { | ||||
| /* Shared monitor I/O thread */ | ||||
| IOThread *mon_iothread; | ||||
| 
 | ||||
| /* Bottom half to dispatch the requests received from I/O thread */ | ||||
| QEMUBH *qmp_dispatcher_bh; | ||||
| /* Coroutine to dispatch the requests received from I/O thread */ | ||||
| Coroutine *qmp_dispatcher_co; | ||||
| 
 | ||||
| /* Set to true when the dispatcher coroutine should terminate */ | ||||
| bool qmp_dispatcher_co_shutdown; | ||||
| 
 | ||||
| /*
 | ||||
|  * qmp_dispatcher_co_busy is used for synchronisation between the | ||||
|  * monitor thread and the main thread to ensure that the dispatcher | ||||
|  * coroutine never gets scheduled a second time when it's already | ||||
|  * scheduled (scheduling the same coroutine twice is forbidden). | ||||
|  * | ||||
|  * It is true if the coroutine is active and processing requests. | ||||
|  * Additional requests may then be pushed onto mon->qmp_requests, | ||||
|  * and @qmp_dispatcher_co_shutdown may be set without further ado. | ||||
|  * @qmp_dispatcher_co_busy must not be woken up in this case. | ||||
|  * | ||||
|  * If false, you also have to set @qmp_dispatcher_co_busy to true and | ||||
|  * wake up @qmp_dispatcher_co after pushing the new requests. | ||||
|  * | ||||
|  * The coroutine will automatically change this variable back to false | ||||
|  * before it yields.  Nobody else may set the variable to false. | ||||
|  * | ||||
|  * Access must be atomic for thread safety. | ||||
|  */ | ||||
| bool qmp_dispatcher_co_busy; | ||||
| 
 | ||||
| /*
 | ||||
|  * Protects mon_list, monitor_qapi_event_state, coroutine_mon, | ||||
| @ -623,9 +647,24 @@ void monitor_cleanup(void) | ||||
|     } | ||||
|     qemu_mutex_unlock(&monitor_lock); | ||||
| 
 | ||||
|     /* QEMUBHs needs to be deleted before destroying the I/O thread */ | ||||
|     qemu_bh_delete(qmp_dispatcher_bh); | ||||
|     qmp_dispatcher_bh = NULL; | ||||
|     /*
 | ||||
|      * The dispatcher needs to stop before destroying the I/O thread. | ||||
|      * | ||||
|      * We need to poll both qemu_aio_context and iohandler_ctx to make | ||||
|      * sure that the dispatcher coroutine keeps making progress and | ||||
|      * eventually terminates.  qemu_aio_context is automatically | ||||
|      * polled by calling AIO_WAIT_WHILE on it, but we must poll | ||||
|      * iohandler_ctx manually. | ||||
|      */ | ||||
|     qmp_dispatcher_co_shutdown = true; | ||||
|     if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { | ||||
|         aio_co_wake(qmp_dispatcher_co); | ||||
|     } | ||||
| 
 | ||||
|     AIO_WAIT_WHILE(qemu_get_aio_context(), | ||||
|                    (aio_poll(iohandler_get_aio_context(), false), | ||||
|                     qatomic_mb_read(&qmp_dispatcher_co_busy))); | ||||
| 
 | ||||
|     if (mon_iothread) { | ||||
|         iothread_destroy(mon_iothread); | ||||
|         mon_iothread = NULL; | ||||
| @ -649,9 +688,9 @@ void monitor_init_globals_core(void) | ||||
|      * have commands assuming that context.  It would be nice to get | ||||
|      * rid of those assumptions. | ||||
|      */ | ||||
|     qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(), | ||||
|                                    monitor_qmp_bh_dispatcher, | ||||
|                                    NULL); | ||||
|     qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL); | ||||
|     qatomic_mb_set(&qmp_dispatcher_co_busy, true); | ||||
|     aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); | ||||
| } | ||||
| 
 | ||||
| int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp) | ||||
|  | ||||
							
								
								
									
										122
									
								
								monitor/qmp.c
									
									
									
									
									
								
							
							
						
						
									
										122
									
								
								monitor/qmp.c
									
									
									
									
									
								
							| @ -133,6 +133,10 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /*
 | ||||
|  * Runs outside of coroutine context for OOB commands, but in | ||||
|  * coroutine context for everything else. | ||||
|  */ | ||||
| static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req) | ||||
| { | ||||
|     QDict *rsp; | ||||
| @ -206,43 +210,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) | ||||
|     return req_obj; | ||||
| } | ||||
| 
 | ||||
| void monitor_qmp_bh_dispatcher(void *data) | ||||
| void coroutine_fn monitor_qmp_dispatcher_co(void *data) | ||||
| { | ||||
|     QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock(); | ||||
|     QMPRequest *req_obj = NULL; | ||||
|     QDict *rsp; | ||||
|     bool need_resume; | ||||
|     MonitorQMP *mon; | ||||
| 
 | ||||
|     if (!req_obj) { | ||||
|         return; | ||||
|     } | ||||
|     while (true) { | ||||
|         assert(qatomic_mb_read(&qmp_dispatcher_co_busy) == true); | ||||
| 
 | ||||
|     mon = req_obj->mon; | ||||
|     /*  qmp_oob_enabled() might change after "qmp_capabilities" */ | ||||
|     need_resume = !qmp_oob_enabled(mon) || | ||||
|         mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; | ||||
|     qemu_mutex_unlock(&mon->qmp_queue_lock); | ||||
|     if (req_obj->req) { | ||||
|         QDict *qdict = qobject_to(QDict, req_obj->req); | ||||
|         QObject *id = qdict ? qdict_get(qdict, "id") : NULL; | ||||
|         trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); | ||||
|         monitor_qmp_dispatch(mon, req_obj->req); | ||||
|     } else { | ||||
|         assert(req_obj->err); | ||||
|         rsp = qmp_error_response(req_obj->err); | ||||
|         req_obj->err = NULL; | ||||
|         monitor_qmp_respond(mon, rsp); | ||||
|         qobject_unref(rsp); | ||||
|     } | ||||
|         /*
 | ||||
|          * Mark the dispatcher as not busy already here so that we | ||||
|          * don't miss any new requests coming in the middle of our | ||||
|          * processing. | ||||
|          */ | ||||
|         qatomic_mb_set(&qmp_dispatcher_co_busy, false); | ||||
| 
 | ||||
|     if (need_resume) { | ||||
|         /* Pairs with the monitor_suspend() in handle_qmp_command() */ | ||||
|         monitor_resume(&mon->common); | ||||
|     } | ||||
|     qmp_request_free(req_obj); | ||||
|         while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) { | ||||
|             /*
 | ||||
|              * No more requests to process.  Wait to be reentered from | ||||
|              * handle_qmp_command() when it pushes more requests, or | ||||
|              * from monitor_cleanup() when it requests shutdown. | ||||
|              */ | ||||
|             if (!qmp_dispatcher_co_shutdown) { | ||||
|                 qemu_coroutine_yield(); | ||||
| 
 | ||||
|     /* Reschedule instead of looping so the main loop stays responsive */ | ||||
|     qemu_bh_schedule(qmp_dispatcher_bh); | ||||
|                 /*
 | ||||
|                  * busy must be set to true again by whoever | ||||
|                  * rescheduled us to avoid double scheduling | ||||
|                  */ | ||||
|                 assert(qatomic_xchg(&qmp_dispatcher_co_busy, false) == true); | ||||
|             } | ||||
| 
 | ||||
|             /*
 | ||||
|              * qmp_dispatcher_co_shutdown may have changed if we | ||||
|              * yielded and were reentered from monitor_cleanup() | ||||
|              */ | ||||
|             if (qmp_dispatcher_co_shutdown) { | ||||
|                 return; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         if (qatomic_xchg(&qmp_dispatcher_co_busy, true) == true) { | ||||
|             /*
 | ||||
|              * Someone rescheduled us (probably because a new requests | ||||
|              * came in), but we didn't actually yield. Do that now, | ||||
|              * only to be immediately reentered and removed from the | ||||
|              * list of scheduled coroutines. | ||||
|              */ | ||||
|             qemu_coroutine_yield(); | ||||
|         } | ||||
| 
 | ||||
|         /*
 | ||||
|          * Move the coroutine from iohandler_ctx to qemu_aio_context for | ||||
|          * executing the command handler so that it can make progress if it | ||||
|          * involves an AIO_WAIT_WHILE(). | ||||
|          */ | ||||
|         aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co); | ||||
|         qemu_coroutine_yield(); | ||||
| 
 | ||||
|         mon = req_obj->mon; | ||||
|         /* qmp_oob_enabled() might change after "qmp_capabilities" */ | ||||
|         need_resume = !qmp_oob_enabled(mon) || | ||||
|             mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; | ||||
|         qemu_mutex_unlock(&mon->qmp_queue_lock); | ||||
|         if (req_obj->req) { | ||||
|             QDict *qdict = qobject_to(QDict, req_obj->req); | ||||
|             QObject *id = qdict ? qdict_get(qdict, "id") : NULL; | ||||
|             trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: ""); | ||||
|             monitor_qmp_dispatch(mon, req_obj->req); | ||||
|         } else { | ||||
|             assert(req_obj->err); | ||||
|             rsp = qmp_error_response(req_obj->err); | ||||
|             req_obj->err = NULL; | ||||
|             monitor_qmp_respond(mon, rsp); | ||||
|             qobject_unref(rsp); | ||||
|         } | ||||
| 
 | ||||
|         if (need_resume) { | ||||
|             /* Pairs with the monitor_suspend() in handle_qmp_command() */ | ||||
|             monitor_resume(&mon->common); | ||||
|         } | ||||
|         qmp_request_free(req_obj); | ||||
| 
 | ||||
|         /*
 | ||||
|          * Yield and reschedule so the main loop stays responsive. | ||||
|          * | ||||
|          * Move back to iohandler_ctx so that nested event loops for | ||||
|          * qemu_aio_context don't start new monitor commands. | ||||
|          */ | ||||
|         aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co); | ||||
|         qemu_coroutine_yield(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| static void handle_qmp_command(void *opaque, QObject *req, Error *err) | ||||
| @ -303,7 +363,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) | ||||
|     qemu_mutex_unlock(&mon->qmp_queue_lock); | ||||
| 
 | ||||
|     /* Kick the dispatcher routine */ | ||||
|     qemu_bh_schedule(qmp_dispatcher_bh); | ||||
|     if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) { | ||||
|         aio_co_wake(qmp_dispatcher_co); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) | ||||
|  | ||||
| @ -12,12 +12,16 @@ | ||||
|  */ | ||||
| 
 | ||||
| #include "qemu/osdep.h" | ||||
| 
 | ||||
| #include "block/aio.h" | ||||
| #include "qapi/error.h" | ||||
| #include "qapi/qmp/dispatch.h" | ||||
| #include "qapi/qmp/qdict.h" | ||||
| #include "qapi/qmp/qjson.h" | ||||
| #include "sysemu/runstate.h" | ||||
| #include "qapi/qmp/qbool.h" | ||||
| #include "qemu/coroutine.h" | ||||
| #include "qemu/main-loop.h" | ||||
| 
 | ||||
| static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob, | ||||
|                                      Error **errp) | ||||
| @ -88,6 +92,30 @@ bool qmp_is_oob(const QDict *dict) | ||||
|         && !qdict_haskey(dict, "execute"); | ||||
| } | ||||
| 
 | ||||
| typedef struct QmpDispatchBH { | ||||
|     const QmpCommand *cmd; | ||||
|     Monitor *cur_mon; | ||||
|     QDict *args; | ||||
|     QObject **ret; | ||||
|     Error **errp; | ||||
|     Coroutine *co; | ||||
| } QmpDispatchBH; | ||||
| 
 | ||||
| static void do_qmp_dispatch_bh(void *opaque) | ||||
| { | ||||
|     QmpDispatchBH *data = opaque; | ||||
| 
 | ||||
|     assert(monitor_cur() == NULL); | ||||
|     monitor_set_cur(qemu_coroutine_self(), data->cur_mon); | ||||
|     data->cmd->fn(data->args, data->ret, data->errp); | ||||
|     monitor_set_cur(qemu_coroutine_self(), NULL); | ||||
|     aio_co_wake(data->co); | ||||
| } | ||||
| 
 | ||||
| /*
 | ||||
|  * Runs outside of coroutine context for OOB commands, but in coroutine | ||||
|  * context for everything else. | ||||
|  */ | ||||
| QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, | ||||
|                     bool allow_oob, Monitor *cur_mon) | ||||
| { | ||||
| @ -153,12 +181,39 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request, | ||||
|         qobject_ref(args); | ||||
|     } | ||||
| 
 | ||||
|     assert(!(oob && qemu_in_coroutine())); | ||||
|     assert(monitor_cur() == NULL); | ||||
|     monitor_set_cur(qemu_coroutine_self(), cur_mon); | ||||
|     if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) { | ||||
|         monitor_set_cur(qemu_coroutine_self(), cur_mon); | ||||
|         cmd->fn(args, &ret, &err); | ||||
|         monitor_set_cur(qemu_coroutine_self(), NULL); | ||||
|     } else { | ||||
|        /*
 | ||||
|         * Actual context doesn't match the one the command needs. | ||||
|         * | ||||
|         * Case 1: we are in coroutine context, but command does not | ||||
|         * have QCO_COROUTINE.  We need to drop out of coroutine | ||||
|         * context for executing it. | ||||
|         * | ||||
|         * Case 2: we are outside coroutine context, but command has | ||||
|         * QCO_COROUTINE.  Can't actually happen, because we get here | ||||
|         * outside coroutine context only when executing a command | ||||
|         * out of band, and OOB commands never have QCO_COROUTINE. | ||||
|         */ | ||||
|         assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE)); | ||||
| 
 | ||||
|     cmd->fn(args, &ret, &err); | ||||
| 
 | ||||
|     monitor_set_cur(qemu_coroutine_self(), NULL); | ||||
|         QmpDispatchBH data = { | ||||
|             .cur_mon    = cur_mon, | ||||
|             .cmd        = cmd, | ||||
|             .args       = args, | ||||
|             .ret        = &ret, | ||||
|             .errp       = &err, | ||||
|             .co         = qemu_coroutine_self(), | ||||
|         }; | ||||
|         aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh, | ||||
|                                 &data); | ||||
|         qemu_coroutine_yield(); | ||||
|     } | ||||
|     qobject_unref(args); | ||||
|     if (err) { | ||||
|         /* or assert(!ret) after reviewing all handlers: */ | ||||
|  | ||||
| @ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name, | ||||
| { | ||||
|     QmpCommand *cmd = g_malloc0(sizeof(*cmd)); | ||||
| 
 | ||||
|     /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */ | ||||
|     assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB))); | ||||
| 
 | ||||
|     cmd->name = name; | ||||
|     cmd->fn = fn; | ||||
|     cmd->enabled = true; | ||||
|  | ||||
| @ -15,6 +15,7 @@ | ||||
| 
 | ||||
| #include "qemu/osdep.h" | ||||
| #include "block/block.h" | ||||
| #include "qemu/main-loop.h" | ||||
| #include "qemu/rcu.h" | ||||
| #include "qemu/rcu_queue.h" | ||||
| #include "qemu/sockets.h" | ||||
| @ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking) | ||||
|      * There cannot be two concurrent aio_poll calls for the same AioContext (or | ||||
|      * an aio_poll concurrent with a GSource prepare/check/dispatch callback). | ||||
|      * We rely on this below to avoid slow locked accesses to ctx->notify_me. | ||||
|      * | ||||
|      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx | ||||
|      * is special in that it runs in the main thread, but that thread's context | ||||
|      * is qemu_aio_context. | ||||
|      */ | ||||
|     assert(in_aio_context_home_thread(ctx)); | ||||
|     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? | ||||
|                                       qemu_get_aio_context() : ctx)); | ||||
| 
 | ||||
|     qemu_lockcnt_inc(&ctx->list_lock); | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Kevin Wolf
						Kevin Wolf