 285a8f209a
			
		
	
	
		285a8f209a
		
	
	
	
	
		
			
			For union types, the tag member is known only after .check(). We used to code this in a simple way: QAPISchemaVariants attribute .tag_member was None for union types until .check(). Since this complicated typing, recent commit "qapi/schema: fix typing for QAPISchemaVariants.tag_member" hid it behind a property. The previous commit lets us treat .tag_member just like the other attributes that become known only in .check(): declare, but don't initialize it in .__init__(). Signed-off-by: Markus Armbruster <armbru@redhat.com>
		
			
				
	
	
		
			1493 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1493 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| #
 | |
| # QAPI schema internal representation
 | |
| #
 | |
| # Copyright (c) 2015-2019 Red Hat Inc.
 | |
| #
 | |
| # Authors:
 | |
| #  Markus Armbruster <armbru@redhat.com>
 | |
| #  Eric Blake <eblake@redhat.com>
 | |
| #  Marc-André Lureau <marcandre.lureau@redhat.com>
 | |
| #
 | |
| # This work is licensed under the terms of the GNU GPL, version 2.
 | |
| # See the COPYING file in the top-level directory.
 | |
| 
 | |
| # pylint: disable=too-many-lines
 | |
| 
 | |
| # TODO catching name collisions in generated code would be nice
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| from abc import ABC, abstractmethod
 | |
| from collections import OrderedDict
 | |
| import os
 | |
| import re
 | |
| from typing import (
 | |
|     Any,
 | |
|     Callable,
 | |
|     Dict,
 | |
|     List,
 | |
|     Optional,
 | |
|     Union,
 | |
|     cast,
 | |
| )
 | |
| 
 | |
| from .common import (
 | |
|     POINTER_SUFFIX,
 | |
|     c_name,
 | |
|     cgen_ifcond,
 | |
|     docgen_ifcond,
 | |
|     gen_endif,
 | |
|     gen_if,
 | |
| )
 | |
| from .error import QAPIError, QAPISemError, QAPISourceError
 | |
| from .expr import check_exprs
 | |
| from .parser import QAPIDoc, QAPIExpression, QAPISchemaParser
 | |
| from .source import QAPISourceInfo
 | |
| 
 | |
| 
 | |
| class QAPISchemaIfCond:
 | |
|     def __init__(
 | |
|         self,
 | |
|         ifcond: Optional[Union[str, Dict[str, object]]] = None,
 | |
|     ) -> None:
 | |
|         self.ifcond = ifcond
 | |
| 
 | |
|     def _cgen(self) -> str:
 | |
|         return cgen_ifcond(self.ifcond)
 | |
| 
 | |
|     def gen_if(self) -> str:
 | |
|         return gen_if(self._cgen())
 | |
| 
 | |
|     def gen_endif(self) -> str:
 | |
|         return gen_endif(self._cgen())
 | |
| 
 | |
|     def docgen(self) -> str:
 | |
|         return docgen_ifcond(self.ifcond)
 | |
| 
 | |
|     def is_present(self) -> bool:
 | |
|         return bool(self.ifcond)
 | |
| 
 | |
| 
 | |
| class QAPISchemaEntity:
 | |
|     """
 | |
|     A schema entity.
 | |
| 
 | |
|     This is either a directive, such as include, or a definition.
 | |
|     The latter uses sub-class `QAPISchemaDefinition`.
 | |
|     """
 | |
|     def __init__(self, info: Optional[QAPISourceInfo]):
 | |
|         self._module: Optional[QAPISchemaModule] = None
 | |
|         # For explicitly defined entities, info points to the (explicit)
 | |
|         # definition.  For builtins (and their arrays), info is None.
 | |
|         # For implicitly defined entities, info points to a place that
 | |
|         # triggered the implicit definition (there may be more than one
 | |
|         # such place).
 | |
|         self.info = info
 | |
|         self._checked = False
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return "<%s at 0x%x>" % (type(self).__name__, id(self))
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         # pylint: disable=unused-argument
 | |
|         self._checked = True
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         pass
 | |
| 
 | |
|     def _set_module(
 | |
|         self, schema: QAPISchema, info: Optional[QAPISourceInfo]
 | |
|     ) -> None:
 | |
|         assert self._checked
 | |
|         fname = info.fname if info else QAPISchemaModule.BUILTIN_MODULE_NAME
 | |
|         self._module = schema.module_by_fname(fname)
 | |
|         self._module.add_entity(self)
 | |
| 
 | |
|     def set_module(self, schema: QAPISchema) -> None:
 | |
|         self._set_module(schema, self.info)
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         # pylint: disable=unused-argument
 | |
|         assert self._checked
 | |
| 
 | |
| 
 | |
| class QAPISchemaDefinition(QAPISchemaEntity):
 | |
|     meta: str
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: Optional[QAPISchemaIfCond] = None,
 | |
|         features: Optional[List[QAPISchemaFeature]] = None,
 | |
|     ):
 | |
|         super().__init__(info)
 | |
|         for f in features or []:
 | |
|             f.set_defined_in(name)
 | |
|         self.name = name
 | |
|         self.doc = doc
 | |
|         self._ifcond = ifcond or QAPISchemaIfCond()
 | |
|         self.features = features or []
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return "<%s:%s at 0x%x>" % (type(self).__name__, self.name,
 | |
|                                     id(self))
 | |
| 
 | |
|     def c_name(self) -> str:
 | |
|         return c_name(self.name)
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         assert not self._checked
 | |
|         super().check(schema)
 | |
|         seen: Dict[str, QAPISchemaMember] = {}
 | |
|         for f in self.features:
 | |
|             f.check_clash(self.info, seen)
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         if doc:
 | |
|             for f in self.features:
 | |
|                 doc.connect_feature(f)
 | |
| 
 | |
|     @property
 | |
|     def ifcond(self) -> QAPISchemaIfCond:
 | |
|         assert self._checked
 | |
|         return self._ifcond
 | |
| 
 | |
|     def is_implicit(self) -> bool:
 | |
|         return not self.info
 | |
| 
 | |
|     def describe(self) -> str:
 | |
|         return "%s '%s'" % (self.meta, self.name)
 | |
| 
 | |
| 
 | |
| class QAPISchemaVisitor:
 | |
|     def visit_begin(self, schema: QAPISchema) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_end(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_module(self, name: str) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_needed(self, entity: QAPISchemaEntity) -> bool:
 | |
|         # pylint: disable=unused-argument
 | |
|         # Default to visiting everything
 | |
|         return True
 | |
| 
 | |
|     def visit_include(self, name: str, info: Optional[QAPISourceInfo]) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_builtin_type(
 | |
|         self, name: str, info: Optional[QAPISourceInfo], json_type: str
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_enum_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         members: List[QAPISchemaEnumMember],
 | |
|         prefix: Optional[str],
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_array_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         element_type: QAPISchemaType,
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_object_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         base: Optional[QAPISchemaObjectType],
 | |
|         members: List[QAPISchemaObjectTypeMember],
 | |
|         branches: Optional[QAPISchemaBranches],
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_object_type_flat(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         members: List[QAPISchemaObjectTypeMember],
 | |
|         branches: Optional[QAPISchemaBranches],
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_alternate_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         alternatives: QAPISchemaAlternatives,
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_command(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         arg_type: Optional[QAPISchemaObjectType],
 | |
|         ret_type: Optional[QAPISchemaType],
 | |
|         gen: bool,
 | |
|         success_response: bool,
 | |
|         boxed: bool,
 | |
|         allow_oob: bool,
 | |
|         allow_preconfig: bool,
 | |
|         coroutine: bool,
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
|     def visit_event(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         arg_type: Optional[QAPISchemaObjectType],
 | |
|         boxed: bool,
 | |
|     ) -> None:
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class QAPISchemaModule:
 | |
| 
 | |
|     BUILTIN_MODULE_NAME = './builtin'
 | |
| 
 | |
|     def __init__(self, name: str):
 | |
|         self.name = name
 | |
|         self._entity_list: List[QAPISchemaEntity] = []
 | |
| 
 | |
|     @staticmethod
 | |
|     def is_system_module(name: str) -> bool:
 | |
|         """
 | |
|         System modules are internally defined modules.
 | |
| 
 | |
|         Their names start with the "./" prefix.
 | |
|         """
 | |
|         return name.startswith('./')
 | |
| 
 | |
|     @classmethod
 | |
|     def is_user_module(cls, name: str) -> bool:
 | |
|         """
 | |
|         User modules are those defined by the user in qapi JSON files.
 | |
| 
 | |
|         They do not start with the "./" prefix.
 | |
|         """
 | |
|         return not cls.is_system_module(name)
 | |
| 
 | |
|     @classmethod
 | |
|     def is_builtin_module(cls, name: str) -> bool:
 | |
|         """
 | |
|         The built-in module is a single System module for the built-in types.
 | |
| 
 | |
|         It is always "./builtin".
 | |
|         """
 | |
|         return name == cls.BUILTIN_MODULE_NAME
 | |
| 
 | |
|     def add_entity(self, ent: QAPISchemaEntity) -> None:
 | |
|         self._entity_list.append(ent)
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         visitor.visit_module(self.name)
 | |
|         for entity in self._entity_list:
 | |
|             if visitor.visit_needed(entity):
 | |
|                 entity.visit(visitor)
 | |
| 
 | |
| 
 | |
| class QAPISchemaInclude(QAPISchemaEntity):
 | |
|     def __init__(self, sub_module: QAPISchemaModule, info: QAPISourceInfo):
 | |
|         super().__init__(info)
 | |
|         self._sub_module = sub_module
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_include(self._sub_module.name, self.info)
 | |
| 
 | |
| 
 | |
| class QAPISchemaType(QAPISchemaDefinition, ABC):
 | |
|     # Return the C type for common use.
 | |
|     # For the types we commonly box, this is a pointer type.
 | |
|     @abstractmethod
 | |
|     def c_type(self) -> str:
 | |
|         pass
 | |
| 
 | |
|     # Return the C type to be used in a parameter list.
 | |
|     def c_param_type(self) -> str:
 | |
|         return self.c_type()
 | |
| 
 | |
|     # Return the C type to be used where we suppress boxing.
 | |
|     def c_unboxed_type(self) -> str:
 | |
|         return self.c_type()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def json_type(self) -> str:
 | |
|         pass
 | |
| 
 | |
|     def alternate_qtype(self) -> Optional[str]:
 | |
|         json2qtype = {
 | |
|             'null':    'QTYPE_QNULL',
 | |
|             'string':  'QTYPE_QSTRING',
 | |
|             'number':  'QTYPE_QNUM',
 | |
|             'int':     'QTYPE_QNUM',
 | |
|             'boolean': 'QTYPE_QBOOL',
 | |
|             'array':   'QTYPE_QLIST',
 | |
|             'object':  'QTYPE_QDICT'
 | |
|         }
 | |
|         return json2qtype.get(self.json_type())
 | |
| 
 | |
|     def doc_type(self) -> Optional[str]:
 | |
|         if self.is_implicit():
 | |
|             return None
 | |
|         return self.name
 | |
| 
 | |
|     def need_has_if_optional(self) -> bool:
 | |
|         # When FOO is a pointer, has_FOO == !!FOO, i.e. has_FOO is redundant.
 | |
|         # Except for arrays; see QAPISchemaArrayType.need_has_if_optional().
 | |
|         return not self.c_type().endswith(POINTER_SUFFIX)
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         super().check(schema)
 | |
|         for feat in self.features:
 | |
|             if feat.is_special():
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     f"feature '{feat.name}' is not supported for types")
 | |
| 
 | |
|     def describe(self) -> str:
 | |
|         return "%s type '%s'" % (self.meta, self.name)
 | |
| 
 | |
| 
 | |
| class QAPISchemaBuiltinType(QAPISchemaType):
 | |
|     meta = 'built-in'
 | |
| 
 | |
|     def __init__(self, name: str, json_type: str, c_type: str):
 | |
|         super().__init__(name, None, None)
 | |
|         assert json_type in ('string', 'number', 'int', 'boolean', 'null',
 | |
|                              'value')
 | |
|         self._json_type_name = json_type
 | |
|         self._c_type_name = c_type
 | |
| 
 | |
|     def c_name(self) -> str:
 | |
|         return self.name
 | |
| 
 | |
|     def c_type(self) -> str:
 | |
|         return self._c_type_name
 | |
| 
 | |
|     def c_param_type(self) -> str:
 | |
|         if self.name == 'str':
 | |
|             return 'const ' + self._c_type_name
 | |
|         return self._c_type_name
 | |
| 
 | |
|     def json_type(self) -> str:
 | |
|         return self._json_type_name
 | |
| 
 | |
|     def doc_type(self) -> str:
 | |
|         return self.json_type()
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_builtin_type(self.name, self.info, self.json_type())
 | |
| 
 | |
| 
 | |
| class QAPISchemaEnumType(QAPISchemaType):
 | |
|     meta = 'enum'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: Optional[QAPISchemaIfCond],
 | |
|         features: Optional[List[QAPISchemaFeature]],
 | |
|         members: List[QAPISchemaEnumMember],
 | |
|         prefix: Optional[str],
 | |
|     ):
 | |
|         super().__init__(name, info, doc, ifcond, features)
 | |
|         for m in members:
 | |
|             m.set_defined_in(name)
 | |
|         self.members = members
 | |
|         self.prefix = prefix
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         super().check(schema)
 | |
|         seen: Dict[str, QAPISchemaMember] = {}
 | |
|         for m in self.members:
 | |
|             m.check_clash(self.info, seen)
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         for m in self.members:
 | |
|             m.connect_doc(doc)
 | |
| 
 | |
|     def is_implicit(self) -> bool:
 | |
|         # See QAPISchema._def_predefineds()
 | |
|         return self.name == 'QType'
 | |
| 
 | |
|     def c_type(self) -> str:
 | |
|         return c_name(self.name)
 | |
| 
 | |
|     def member_names(self) -> List[str]:
 | |
|         return [m.name for m in self.members]
 | |
| 
 | |
|     def json_type(self) -> str:
 | |
|         return 'string'
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_enum_type(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.members, self.prefix)
 | |
| 
 | |
| 
 | |
| class QAPISchemaArrayType(QAPISchemaType):
 | |
|     meta = 'array'
 | |
| 
 | |
|     def __init__(
 | |
|         self, name: str, info: Optional[QAPISourceInfo], element_type: str
 | |
|     ):
 | |
|         super().__init__(name, info, None)
 | |
|         self._element_type_name = element_type
 | |
|         self.element_type: QAPISchemaType
 | |
| 
 | |
|     def need_has_if_optional(self) -> bool:
 | |
|         # When FOO is an array, we still need has_FOO to distinguish
 | |
|         # absent (!has_FOO) from present and empty (has_FOO && !FOO).
 | |
|         return True
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         super().check(schema)
 | |
|         self.element_type = schema.resolve_type(
 | |
|             self._element_type_name, self.info,
 | |
|             self.info.defn_meta if self.info else None)
 | |
|         assert not isinstance(self.element_type, QAPISchemaArrayType)
 | |
| 
 | |
|     def set_module(self, schema: QAPISchema) -> None:
 | |
|         self._set_module(schema, self.element_type.info)
 | |
| 
 | |
|     @property
 | |
|     def ifcond(self) -> QAPISchemaIfCond:
 | |
|         assert self._checked
 | |
|         return self.element_type.ifcond
 | |
| 
 | |
|     def is_implicit(self) -> bool:
 | |
|         return True
 | |
| 
 | |
|     def c_type(self) -> str:
 | |
|         return c_name(self.name) + POINTER_SUFFIX
 | |
| 
 | |
|     def json_type(self) -> str:
 | |
|         return 'array'
 | |
| 
 | |
|     def doc_type(self) -> Optional[str]:
 | |
|         elt_doc_type = self.element_type.doc_type()
 | |
|         if not elt_doc_type:
 | |
|             return None
 | |
|         return 'array of ' + elt_doc_type
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_array_type(self.name, self.info, self.ifcond,
 | |
|                                  self.element_type)
 | |
| 
 | |
|     def describe(self) -> str:
 | |
|         return "%s type ['%s']" % (self.meta, self._element_type_name)
 | |
| 
 | |
| 
 | |
| class QAPISchemaObjectType(QAPISchemaType):
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: Optional[QAPISchemaIfCond],
 | |
|         features: Optional[List[QAPISchemaFeature]],
 | |
|         base: Optional[str],
 | |
|         local_members: List[QAPISchemaObjectTypeMember],
 | |
|         branches: Optional[QAPISchemaBranches],
 | |
|     ):
 | |
|         # struct has local_members, optional base, and no branches
 | |
|         # union has base, branches, and no local_members
 | |
|         super().__init__(name, info, doc, ifcond, features)
 | |
|         self.meta = 'union' if branches else 'struct'
 | |
|         for m in local_members:
 | |
|             m.set_defined_in(name)
 | |
|         if branches is not None:
 | |
|             branches.set_defined_in(name)
 | |
|         self._base_name = base
 | |
|         self.base = None
 | |
|         self.local_members = local_members
 | |
|         self.branches = branches
 | |
|         self.members: List[QAPISchemaObjectTypeMember]
 | |
|         self._check_complete = False
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         # This calls another type T's .check() exactly when the C
 | |
|         # struct emitted by gen_object() contains that T's C struct
 | |
|         # (pointers don't count).
 | |
|         if self._check_complete:
 | |
|             # A previous .check() completed: nothing to do
 | |
|             return
 | |
|         if self._checked:
 | |
|             # Recursed: C struct contains itself
 | |
|             raise QAPISemError(self.info,
 | |
|                                "object %s contains itself" % self.name)
 | |
| 
 | |
|         super().check(schema)
 | |
|         assert self._checked and not self._check_complete
 | |
| 
 | |
|         seen = OrderedDict()
 | |
|         if self._base_name:
 | |
|             self.base = schema.resolve_type(self._base_name, self.info,
 | |
|                                             "'base'")
 | |
|             if (not isinstance(self.base, QAPISchemaObjectType)
 | |
|                     or self.base.branches):
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "'base' requires a struct type, %s isn't"
 | |
|                     % self.base.describe())
 | |
|             self.base.check(schema)
 | |
|             self.base.check_clash(self.info, seen)
 | |
|         for m in self.local_members:
 | |
|             m.check(schema)
 | |
|             m.check_clash(self.info, seen)
 | |
| 
 | |
|         # self.check_clash() works in terms of the supertype, but
 | |
|         # self.members is declared List[QAPISchemaObjectTypeMember].
 | |
|         # Cast down to the subtype.
 | |
|         members = cast(List[QAPISchemaObjectTypeMember], list(seen.values()))
 | |
| 
 | |
|         if self.branches:
 | |
|             self.branches.check(schema, seen)
 | |
|             self.branches.check_clash(self.info, seen)
 | |
| 
 | |
|         self.members = members
 | |
|         self._check_complete = True  # mark completed
 | |
| 
 | |
|     # Check that the members of this type do not cause duplicate JSON members,
 | |
|     # and update seen to track the members seen so far. Report any errors
 | |
|     # on behalf of info, which is not necessarily self.info
 | |
|     def check_clash(
 | |
|         self,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         seen: Dict[str, QAPISchemaMember],
 | |
|     ) -> None:
 | |
|         assert self._checked
 | |
|         for m in self.members:
 | |
|             m.check_clash(info, seen)
 | |
|         if self.branches:
 | |
|             self.branches.check_clash(info, seen)
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         if self.base and self.base.is_implicit():
 | |
|             self.base.connect_doc(doc)
 | |
|         for m in self.local_members:
 | |
|             m.connect_doc(doc)
 | |
| 
 | |
|     def is_implicit(self) -> bool:
 | |
|         # See QAPISchema._make_implicit_object_type(), as well as
 | |
|         # _def_predefineds()
 | |
|         return self.name.startswith('q_')
 | |
| 
 | |
|     def is_empty(self) -> bool:
 | |
|         return not self.members and not self.branches
 | |
| 
 | |
|     def has_conditional_members(self) -> bool:
 | |
|         return any(m.ifcond.is_present() for m in self.members)
 | |
| 
 | |
|     def c_name(self) -> str:
 | |
|         assert self.name != 'q_empty'
 | |
|         return super().c_name()
 | |
| 
 | |
|     def c_type(self) -> str:
 | |
|         assert not self.is_implicit()
 | |
|         return c_name(self.name) + POINTER_SUFFIX
 | |
| 
 | |
|     def c_unboxed_type(self) -> str:
 | |
|         return c_name(self.name)
 | |
| 
 | |
|     def json_type(self) -> str:
 | |
|         return 'object'
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_object_type(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.base, self.local_members, self.branches)
 | |
|         visitor.visit_object_type_flat(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.members, self.branches)
 | |
| 
 | |
| 
 | |
| class QAPISchemaAlternateType(QAPISchemaType):
 | |
|     meta = 'alternate'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: Optional[QAPISchemaIfCond],
 | |
|         features: List[QAPISchemaFeature],
 | |
|         alternatives: QAPISchemaAlternatives,
 | |
|     ):
 | |
|         super().__init__(name, info, doc, ifcond, features)
 | |
|         assert alternatives.tag_member
 | |
|         alternatives.set_defined_in(name)
 | |
|         alternatives.tag_member.set_defined_in(self.name)
 | |
|         self.alternatives = alternatives
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         super().check(schema)
 | |
|         self.alternatives.tag_member.check(schema)
 | |
|         # Not calling self.alternatives.check_clash(), because there's
 | |
|         # nothing to clash with
 | |
|         self.alternatives.check(schema, {})
 | |
|         # Alternate branch names have no relation to the tag enum values;
 | |
|         # so we have to check for potential name collisions ourselves.
 | |
|         seen: Dict[str, QAPISchemaMember] = {}
 | |
|         types_seen: Dict[str, str] = {}
 | |
|         for v in self.alternatives.variants:
 | |
|             v.check_clash(self.info, seen)
 | |
|             qtype = v.type.alternate_qtype()
 | |
|             if not qtype:
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "%s cannot use %s"
 | |
|                     % (v.describe(self.info), v.type.describe()))
 | |
|             conflicting = set([qtype])
 | |
|             if qtype == 'QTYPE_QSTRING':
 | |
|                 if isinstance(v.type, QAPISchemaEnumType):
 | |
|                     for m in v.type.members:
 | |
|                         if m.name in ['on', 'off']:
 | |
|                             conflicting.add('QTYPE_QBOOL')
 | |
|                         if re.match(r'[-+0-9.]', m.name):
 | |
|                             # lazy, could be tightened
 | |
|                             conflicting.add('QTYPE_QNUM')
 | |
|                 else:
 | |
|                     conflicting.add('QTYPE_QNUM')
 | |
|                     conflicting.add('QTYPE_QBOOL')
 | |
|             for qt in conflicting:
 | |
|                 if qt in types_seen:
 | |
|                     raise QAPISemError(
 | |
|                         self.info,
 | |
|                         "%s can't be distinguished from '%s'"
 | |
|                         % (v.describe(self.info), types_seen[qt]))
 | |
|                 types_seen[qt] = v.name
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         for v in self.alternatives.variants:
 | |
|             v.connect_doc(doc)
 | |
| 
 | |
|     def c_type(self) -> str:
 | |
|         return c_name(self.name) + POINTER_SUFFIX
 | |
| 
 | |
|     def json_type(self) -> str:
 | |
|         return 'value'
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_alternate_type(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.alternatives)
 | |
| 
 | |
| 
 | |
| class QAPISchemaVariants:
 | |
|     def __init__(
 | |
|         self,
 | |
|         info: QAPISourceInfo,
 | |
|         variants: List[QAPISchemaVariant],
 | |
|     ):
 | |
|         self.info = info
 | |
|         self.tag_member: QAPISchemaObjectTypeMember
 | |
|         self.variants = variants
 | |
| 
 | |
|     def set_defined_in(self, name: str) -> None:
 | |
|         for v in self.variants:
 | |
|             v.set_defined_in(name)
 | |
| 
 | |
|     def check(
 | |
|             self, schema: QAPISchema, seen: Dict[str, QAPISchemaMember]
 | |
|     ) -> None:
 | |
|         for v in self.variants:
 | |
|             v.check(schema)
 | |
| 
 | |
| 
 | |
| class QAPISchemaBranches(QAPISchemaVariants):
 | |
|     def __init__(self,
 | |
|                  info: QAPISourceInfo,
 | |
|                  variants: List[QAPISchemaVariant],
 | |
|                  tag_name: str):
 | |
|         super().__init__(info, variants)
 | |
|         self._tag_name = tag_name
 | |
| 
 | |
|     def check(
 | |
|             self, schema: QAPISchema, seen: Dict[str, QAPISchemaMember]
 | |
|     ) -> None:
 | |
|         # We need to narrow the member type:
 | |
|         tag_member = seen.get(c_name(self._tag_name))
 | |
|         assert (tag_member is None
 | |
|                 or isinstance(tag_member, QAPISchemaObjectTypeMember))
 | |
| 
 | |
|         base = "'base'"
 | |
|         # Pointing to the base type when not implicit would be
 | |
|         # nice, but we don't know it here
 | |
|         if not tag_member or self._tag_name != tag_member.name:
 | |
|             raise QAPISemError(
 | |
|                 self.info,
 | |
|                 "discriminator '%s' is not a member of %s"
 | |
|                 % (self._tag_name, base))
 | |
|         self.tag_member = tag_member
 | |
|         # Here we do:
 | |
|         assert tag_member.defined_in
 | |
|         base_type = schema.lookup_type(tag_member.defined_in)
 | |
|         assert base_type
 | |
|         if not base_type.is_implicit():
 | |
|             base = "base type '%s'" % tag_member.defined_in
 | |
|         if not isinstance(tag_member.type, QAPISchemaEnumType):
 | |
|             raise QAPISemError(
 | |
|                 self.info,
 | |
|                 "discriminator member '%s' of %s must be of enum type"
 | |
|                 % (self._tag_name, base))
 | |
|         if tag_member.optional:
 | |
|             raise QAPISemError(
 | |
|                 self.info,
 | |
|                 "discriminator member '%s' of %s must not be optional"
 | |
|                 % (self._tag_name, base))
 | |
|         if tag_member.ifcond.is_present():
 | |
|             raise QAPISemError(
 | |
|                 self.info,
 | |
|                 "discriminator member '%s' of %s must not be conditional"
 | |
|                 % (self._tag_name, base))
 | |
|         # branches that are not explicitly covered get an empty type
 | |
|         assert tag_member.defined_in
 | |
|         cases = {v.name for v in self.variants}
 | |
|         for m in tag_member.type.members:
 | |
|             if m.name not in cases:
 | |
|                 v = QAPISchemaVariant(m.name, self.info,
 | |
|                                       'q_empty', m.ifcond)
 | |
|                 v.set_defined_in(tag_member.defined_in)
 | |
|                 self.variants.append(v)
 | |
|         if not self.variants:
 | |
|             raise QAPISemError(self.info, "union has no branches")
 | |
|         for v in self.variants:
 | |
|             v.check(schema)
 | |
|             # Union names must match enum values; alternate names are
 | |
|             # checked separately. Use 'seen' to tell the two apart.
 | |
|             if seen:
 | |
|                 if v.name not in tag_member.type.member_names():
 | |
|                     raise QAPISemError(
 | |
|                         self.info,
 | |
|                         "branch '%s' is not a value of %s"
 | |
|                         % (v.name, tag_member.type.describe()))
 | |
|                 if not isinstance(v.type, QAPISchemaObjectType):
 | |
|                     raise QAPISemError(
 | |
|                         self.info,
 | |
|                         "%s cannot use %s"
 | |
|                         % (v.describe(self.info), v.type.describe()))
 | |
|                 v.type.check(schema)
 | |
| 
 | |
|     def check_clash(
 | |
|         self,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         seen: Dict[str, QAPISchemaMember],
 | |
|     ) -> None:
 | |
|         for v in self.variants:
 | |
|             # Reset seen map for each variant, since qapi names from one
 | |
|             # branch do not affect another branch.
 | |
|             #
 | |
|             # v.type's typing is enforced in check() above.
 | |
|             assert isinstance(v.type, QAPISchemaObjectType)
 | |
|             v.type.check_clash(info, dict(seen))
 | |
| 
 | |
| 
 | |
| class QAPISchemaAlternatives(QAPISchemaVariants):
 | |
|     def __init__(self,
 | |
|                  info: QAPISourceInfo,
 | |
|                  variants: List[QAPISchemaVariant],
 | |
|                  tag_member: QAPISchemaObjectTypeMember):
 | |
|         super().__init__(info, variants)
 | |
|         self.tag_member = tag_member
 | |
| 
 | |
|     def check(
 | |
|             self, schema: QAPISchema, seen: Dict[str, QAPISchemaMember]
 | |
|     ) -> None:
 | |
|         super().check(schema, seen)
 | |
|         assert isinstance(self.tag_member.type, QAPISchemaEnumType)
 | |
|         assert not self.tag_member.optional
 | |
|         assert not self.tag_member.ifcond.is_present()
 | |
| 
 | |
| 
 | |
| class QAPISchemaMember:
 | |
|     """ Represents object members, enum members and features """
 | |
|     role = 'member'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: Optional[QAPISchemaIfCond] = None,
 | |
|     ):
 | |
|         self.name = name
 | |
|         self.info = info
 | |
|         self.ifcond = ifcond or QAPISchemaIfCond()
 | |
|         self.defined_in: Optional[str] = None
 | |
| 
 | |
|     def set_defined_in(self, name: str) -> None:
 | |
|         assert not self.defined_in
 | |
|         self.defined_in = name
 | |
| 
 | |
|     def check_clash(
 | |
|         self,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         seen: Dict[str, QAPISchemaMember],
 | |
|     ) -> None:
 | |
|         cname = c_name(self.name)
 | |
|         if cname in seen:
 | |
|             raise QAPISemError(
 | |
|                 info,
 | |
|                 "%s collides with %s"
 | |
|                 % (self.describe(info), seen[cname].describe(info)))
 | |
|         seen[cname] = self
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
 | |
|         if doc:
 | |
|             doc.connect_member(self)
 | |
| 
 | |
|     def describe(self, info: Optional[QAPISourceInfo]) -> str:
 | |
|         role = self.role
 | |
|         meta = 'type'
 | |
|         defined_in = self.defined_in
 | |
|         assert defined_in
 | |
| 
 | |
|         if defined_in.startswith('q_obj_'):
 | |
|             # See QAPISchema._make_implicit_object_type() - reverse the
 | |
|             # mapping there to create a nice human-readable description
 | |
|             defined_in = defined_in[6:]
 | |
|             if defined_in.endswith('-arg'):
 | |
|                 # Implicit type created for a command's dict 'data'
 | |
|                 assert role == 'member'
 | |
|                 role = 'parameter'
 | |
|                 meta = 'command'
 | |
|                 defined_in = defined_in[:-4]
 | |
|             elif defined_in.endswith('-base'):
 | |
|                 # Implicit type created for a union's dict 'base'
 | |
|                 role = 'base ' + role
 | |
|                 defined_in = defined_in[:-5]
 | |
|             else:
 | |
|                 assert False
 | |
| 
 | |
|         assert info is not None
 | |
|         if defined_in != info.defn_name:
 | |
|             return "%s '%s' of %s '%s'" % (role, self.name, meta, defined_in)
 | |
|         return "%s '%s'" % (role, self.name)
 | |
| 
 | |
| 
 | |
| class QAPISchemaEnumMember(QAPISchemaMember):
 | |
|     role = 'value'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         ifcond: Optional[QAPISchemaIfCond] = None,
 | |
|         features: Optional[List[QAPISchemaFeature]] = None,
 | |
|     ):
 | |
|         super().__init__(name, info, ifcond)
 | |
|         for f in features or []:
 | |
|             f.set_defined_in(name)
 | |
|         self.features = features or []
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         if doc:
 | |
|             for f in self.features:
 | |
|                 doc.connect_feature(f)
 | |
| 
 | |
| 
 | |
| class QAPISchemaFeature(QAPISchemaMember):
 | |
|     role = 'feature'
 | |
| 
 | |
|     def is_special(self) -> bool:
 | |
|         return self.name in ('deprecated', 'unstable')
 | |
| 
 | |
| 
 | |
| class QAPISchemaObjectTypeMember(QAPISchemaMember):
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         typ: str,
 | |
|         optional: bool,
 | |
|         ifcond: Optional[QAPISchemaIfCond] = None,
 | |
|         features: Optional[List[QAPISchemaFeature]] = None,
 | |
|     ):
 | |
|         super().__init__(name, info, ifcond)
 | |
|         for f in features or []:
 | |
|             f.set_defined_in(name)
 | |
|         self._type_name = typ
 | |
|         self.type: QAPISchemaType  # set during check()
 | |
|         self.optional = optional
 | |
|         self.features = features or []
 | |
| 
 | |
|     def need_has(self) -> bool:
 | |
|         return self.optional and self.type.need_has_if_optional()
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         assert self.defined_in
 | |
|         self.type = schema.resolve_type(self._type_name, self.info,
 | |
|                                         self.describe)
 | |
|         seen: Dict[str, QAPISchemaMember] = {}
 | |
|         for f in self.features:
 | |
|             f.check_clash(self.info, seen)
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc]) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         if doc:
 | |
|             for f in self.features:
 | |
|                 doc.connect_feature(f)
 | |
| 
 | |
| 
 | |
| class QAPISchemaVariant(QAPISchemaObjectTypeMember):
 | |
|     role = 'branch'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         typ: str,
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|     ):
 | |
|         super().__init__(name, info, typ, False, ifcond)
 | |
| 
 | |
| 
 | |
| class QAPISchemaCommand(QAPISchemaDefinition):
 | |
|     meta = 'command'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         arg_type: Optional[str],
 | |
|         ret_type: Optional[str],
 | |
|         gen: bool,
 | |
|         success_response: bool,
 | |
|         boxed: bool,
 | |
|         allow_oob: bool,
 | |
|         allow_preconfig: bool,
 | |
|         coroutine: bool,
 | |
|     ):
 | |
|         super().__init__(name, info, doc, ifcond, features)
 | |
|         self._arg_type_name = arg_type
 | |
|         self.arg_type: Optional[QAPISchemaObjectType] = None
 | |
|         self._ret_type_name = ret_type
 | |
|         self.ret_type: Optional[QAPISchemaType] = None
 | |
|         self.gen = gen
 | |
|         self.success_response = success_response
 | |
|         self.boxed = boxed
 | |
|         self.allow_oob = allow_oob
 | |
|         self.allow_preconfig = allow_preconfig
 | |
|         self.coroutine = coroutine
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         assert self.info is not None
 | |
|         super().check(schema)
 | |
|         if self._arg_type_name:
 | |
|             arg_type = schema.resolve_type(
 | |
|                 self._arg_type_name, self.info, "command's 'data'")
 | |
|             if not isinstance(arg_type, QAPISchemaObjectType):
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "command's 'data' cannot take %s"
 | |
|                     % arg_type.describe())
 | |
|             self.arg_type = arg_type
 | |
|             if self.arg_type.branches and not self.boxed:
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "command's 'data' can take %s only with 'boxed': true"
 | |
|                     % self.arg_type.describe())
 | |
|             self.arg_type.check(schema)
 | |
|             if self.arg_type.has_conditional_members() and not self.boxed:
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "conditional command arguments require 'boxed': true")
 | |
|         if self._ret_type_name:
 | |
|             self.ret_type = schema.resolve_type(
 | |
|                 self._ret_type_name, self.info, "command's 'returns'")
 | |
|             if self.name not in self.info.pragma.command_returns_exceptions:
 | |
|                 typ = self.ret_type
 | |
|                 if isinstance(typ, QAPISchemaArrayType):
 | |
|                     typ = typ.element_type
 | |
|                 if not isinstance(typ, QAPISchemaObjectType):
 | |
|                     raise QAPISemError(
 | |
|                         self.info,
 | |
|                         "command's 'returns' cannot take %s"
 | |
|                         % self.ret_type.describe())
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         if doc:
 | |
|             if self.arg_type and self.arg_type.is_implicit():
 | |
|                 self.arg_type.connect_doc(doc)
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_command(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.arg_type, self.ret_type, self.gen, self.success_response,
 | |
|             self.boxed, self.allow_oob, self.allow_preconfig,
 | |
|             self.coroutine)
 | |
| 
 | |
| 
 | |
| class QAPISchemaEvent(QAPISchemaDefinition):
 | |
|     meta = 'event'
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         doc: Optional[QAPIDoc],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: List[QAPISchemaFeature],
 | |
|         arg_type: Optional[str],
 | |
|         boxed: bool,
 | |
|     ):
 | |
|         super().__init__(name, info, doc, ifcond, features)
 | |
|         self._arg_type_name = arg_type
 | |
|         self.arg_type: Optional[QAPISchemaObjectType] = None
 | |
|         self.boxed = boxed
 | |
| 
 | |
|     def check(self, schema: QAPISchema) -> None:
 | |
|         super().check(schema)
 | |
|         if self._arg_type_name:
 | |
|             typ = schema.resolve_type(
 | |
|                 self._arg_type_name, self.info, "event's 'data'")
 | |
|             if not isinstance(typ, QAPISchemaObjectType):
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "event's 'data' cannot take %s"
 | |
|                     % typ.describe())
 | |
|             self.arg_type = typ
 | |
|             if self.arg_type.branches and not self.boxed:
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "event's 'data' can take %s only with 'boxed': true"
 | |
|                     % self.arg_type.describe())
 | |
|             self.arg_type.check(schema)
 | |
|             if self.arg_type.has_conditional_members() and not self.boxed:
 | |
|                 raise QAPISemError(
 | |
|                     self.info,
 | |
|                     "conditional event arguments require 'boxed': true")
 | |
| 
 | |
|     def connect_doc(self, doc: Optional[QAPIDoc] = None) -> None:
 | |
|         super().connect_doc(doc)
 | |
|         doc = doc or self.doc
 | |
|         if doc:
 | |
|             if self.arg_type and self.arg_type.is_implicit():
 | |
|                 self.arg_type.connect_doc(doc)
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         super().visit(visitor)
 | |
|         visitor.visit_event(
 | |
|             self.name, self.info, self.ifcond, self.features,
 | |
|             self.arg_type, self.boxed)
 | |
| 
 | |
| 
 | |
| class QAPISchema:
 | |
|     def __init__(self, fname: str):
 | |
|         self.fname = fname
 | |
| 
 | |
|         try:
 | |
|             parser = QAPISchemaParser(fname)
 | |
|         except OSError as err:
 | |
|             raise QAPIError(
 | |
|                 f"can't read schema file '{fname}': {err.strerror}"
 | |
|             ) from err
 | |
| 
 | |
|         exprs = check_exprs(parser.exprs)
 | |
|         self.docs = parser.docs
 | |
|         self._entity_list: List[QAPISchemaEntity] = []
 | |
|         self._entity_dict: Dict[str, QAPISchemaDefinition] = {}
 | |
|         self._module_dict: Dict[str, QAPISchemaModule] = OrderedDict()
 | |
|         self._schema_dir = os.path.dirname(fname)
 | |
|         self._make_module(QAPISchemaModule.BUILTIN_MODULE_NAME)
 | |
|         self._make_module(fname)
 | |
|         self._predefining = True
 | |
|         self._def_predefineds()
 | |
|         self._predefining = False
 | |
|         self._def_exprs(exprs)
 | |
|         self.check()
 | |
| 
 | |
|     def _def_entity(self, ent: QAPISchemaEntity) -> None:
 | |
|         self._entity_list.append(ent)
 | |
| 
 | |
|     def _def_definition(self, defn: QAPISchemaDefinition) -> None:
 | |
|         # Only the predefined types are allowed to not have info
 | |
|         assert defn.info or self._predefining
 | |
|         self._def_entity(defn)
 | |
|         # TODO reject names that differ only in '_' vs. '.'  vs. '-',
 | |
|         # because they're liable to clash in generated C.
 | |
|         other_defn = self._entity_dict.get(defn.name)
 | |
|         if other_defn:
 | |
|             if other_defn.info:
 | |
|                 where = QAPISourceError(other_defn.info, "previous definition")
 | |
|                 raise QAPISemError(
 | |
|                     defn.info,
 | |
|                     "'%s' is already defined\n%s" % (defn.name, where))
 | |
|             raise QAPISemError(
 | |
|                 defn.info, "%s is already defined" % other_defn.describe())
 | |
|         self._entity_dict[defn.name] = defn
 | |
| 
 | |
|     def lookup_entity(self,name: str) -> Optional[QAPISchemaEntity]:
 | |
|         return self._entity_dict.get(name)
 | |
| 
 | |
|     def lookup_type(self, name: str) -> Optional[QAPISchemaType]:
 | |
|         typ = self.lookup_entity(name)
 | |
|         if isinstance(typ, QAPISchemaType):
 | |
|             return typ
 | |
|         return None
 | |
| 
 | |
|     def resolve_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: Optional[QAPISourceInfo],
 | |
|         what: Union[None, str, Callable[[QAPISourceInfo], str]],
 | |
|     ) -> QAPISchemaType:
 | |
|         typ = self.lookup_type(name)
 | |
|         if not typ:
 | |
|             assert info and what  # built-in types must not fail lookup
 | |
|             if callable(what):
 | |
|                 what = what(info)
 | |
|             raise QAPISemError(
 | |
|                 info, "%s uses unknown type '%s'" % (what, name))
 | |
|         return typ
 | |
| 
 | |
|     def _module_name(self, fname: str) -> str:
 | |
|         if QAPISchemaModule.is_system_module(fname):
 | |
|             return fname
 | |
|         return os.path.relpath(fname, self._schema_dir)
 | |
| 
 | |
|     def _make_module(self, fname: str) -> QAPISchemaModule:
 | |
|         name = self._module_name(fname)
 | |
|         if name not in self._module_dict:
 | |
|             self._module_dict[name] = QAPISchemaModule(name)
 | |
|         return self._module_dict[name]
 | |
| 
 | |
|     def module_by_fname(self, fname: str) -> QAPISchemaModule:
 | |
|         name = self._module_name(fname)
 | |
|         return self._module_dict[name]
 | |
| 
 | |
|     def _def_include(self, expr: QAPIExpression) -> None:
 | |
|         include = expr['include']
 | |
|         assert expr.doc is None
 | |
|         self._def_entity(
 | |
|             QAPISchemaInclude(self._make_module(include), expr.info))
 | |
| 
 | |
|     def _def_builtin_type(
 | |
|         self, name: str, json_type: str, c_type: str
 | |
|     ) -> None:
 | |
|         self._def_definition(QAPISchemaBuiltinType(name, json_type, c_type))
 | |
|         # Instantiating only the arrays that are actually used would
 | |
|         # be nice, but we can't as long as their generated code
 | |
|         # (qapi-builtin-types.[ch]) may be shared by some other
 | |
|         # schema.
 | |
|         self._make_array_type(name, None)
 | |
| 
 | |
|     def _def_predefineds(self) -> None:
 | |
|         for t in [('str',    'string',  'char' + POINTER_SUFFIX),
 | |
|                   ('number', 'number',  'double'),
 | |
|                   ('int',    'int',     'int64_t'),
 | |
|                   ('int8',   'int',     'int8_t'),
 | |
|                   ('int16',  'int',     'int16_t'),
 | |
|                   ('int32',  'int',     'int32_t'),
 | |
|                   ('int64',  'int',     'int64_t'),
 | |
|                   ('uint8',  'int',     'uint8_t'),
 | |
|                   ('uint16', 'int',     'uint16_t'),
 | |
|                   ('uint32', 'int',     'uint32_t'),
 | |
|                   ('uint64', 'int',     'uint64_t'),
 | |
|                   ('size',   'int',     'uint64_t'),
 | |
|                   ('bool',   'boolean', 'bool'),
 | |
|                   ('any',    'value',   'QObject' + POINTER_SUFFIX),
 | |
|                   ('null',   'null',    'QNull' + POINTER_SUFFIX)]:
 | |
|             self._def_builtin_type(*t)
 | |
|         self.the_empty_object_type = QAPISchemaObjectType(
 | |
|             'q_empty', None, None, None, None, None, [], None)
 | |
|         self._def_definition(self.the_empty_object_type)
 | |
| 
 | |
|         qtypes = ['none', 'qnull', 'qnum', 'qstring', 'qdict', 'qlist',
 | |
|                   'qbool']
 | |
|         qtype_values = self._make_enum_members(
 | |
|             [{'name': n} for n in qtypes], None)
 | |
| 
 | |
|         self._def_definition(QAPISchemaEnumType(
 | |
|             'QType', None, None, None, None, qtype_values, 'QTYPE'))
 | |
| 
 | |
|     def _make_features(
 | |
|         self,
 | |
|         features: Optional[List[Dict[str, Any]]],
 | |
|         info: Optional[QAPISourceInfo],
 | |
|     ) -> List[QAPISchemaFeature]:
 | |
|         if features is None:
 | |
|             return []
 | |
|         return [QAPISchemaFeature(f['name'], info,
 | |
|                                   QAPISchemaIfCond(f.get('if')))
 | |
|                 for f in features]
 | |
| 
 | |
|     def _make_enum_member(
 | |
|         self,
 | |
|         name: str,
 | |
|         ifcond: Optional[Union[str, Dict[str, Any]]],
 | |
|         features: Optional[List[Dict[str, Any]]],
 | |
|         info: Optional[QAPISourceInfo],
 | |
|     ) -> QAPISchemaEnumMember:
 | |
|         return QAPISchemaEnumMember(name, info,
 | |
|                                     QAPISchemaIfCond(ifcond),
 | |
|                                     self._make_features(features, info))
 | |
| 
 | |
|     def _make_enum_members(
 | |
|         self, values: List[Dict[str, Any]], info: Optional[QAPISourceInfo]
 | |
|     ) -> List[QAPISchemaEnumMember]:
 | |
|         return [self._make_enum_member(v['name'], v.get('if'),
 | |
|                                        v.get('features'), info)
 | |
|                 for v in values]
 | |
| 
 | |
|     def _make_array_type(
 | |
|         self, element_type: str, info: Optional[QAPISourceInfo]
 | |
|     ) -> str:
 | |
|         name = element_type + 'List'    # reserved by check_defn_name_str()
 | |
|         if not self.lookup_type(name):
 | |
|             self._def_definition(QAPISchemaArrayType(
 | |
|                 name, info, element_type))
 | |
|         return name
 | |
| 
 | |
|     def _make_implicit_object_type(
 | |
|         self,
 | |
|         name: str,
 | |
|         info: QAPISourceInfo,
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         role: str,
 | |
|         members: List[QAPISchemaObjectTypeMember],
 | |
|     ) -> Optional[str]:
 | |
|         if not members:
 | |
|             return None
 | |
|         # See also QAPISchemaObjectTypeMember.describe()
 | |
|         name = 'q_obj_%s-%s' % (name, role)
 | |
|         typ = self.lookup_entity(name)
 | |
|         if typ:
 | |
|             assert(isinstance(typ, QAPISchemaObjectType))
 | |
|             # The implicit object type has multiple users.  This can
 | |
|             # only be a duplicate definition, which will be flagged
 | |
|             # later.
 | |
|             pass
 | |
|         else:
 | |
|             self._def_definition(QAPISchemaObjectType(
 | |
|                 name, info, None, ifcond, None, None, members, None))
 | |
|         return name
 | |
| 
 | |
|     def _def_enum_type(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['enum']
 | |
|         data = expr['data']
 | |
|         prefix = expr.get('prefix')
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         info = expr.info
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         self._def_definition(QAPISchemaEnumType(
 | |
|             name, info, expr.doc, ifcond, features,
 | |
|             self._make_enum_members(data, info), prefix))
 | |
| 
 | |
|     def _make_member(
 | |
|         self,
 | |
|         name: str,
 | |
|         typ: Union[List[str], str],
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         features: Optional[List[Dict[str, Any]]],
 | |
|         info: QAPISourceInfo,
 | |
|     ) -> QAPISchemaObjectTypeMember:
 | |
|         optional = False
 | |
|         if name.startswith('*'):
 | |
|             name = name[1:]
 | |
|             optional = True
 | |
|         if isinstance(typ, list):
 | |
|             assert len(typ) == 1
 | |
|             typ = self._make_array_type(typ[0], info)
 | |
|         return QAPISchemaObjectTypeMember(name, info, typ, optional, ifcond,
 | |
|                                           self._make_features(features, info))
 | |
| 
 | |
|     def _make_members(
 | |
|         self,
 | |
|         data: Dict[str, Any],
 | |
|         info: QAPISourceInfo,
 | |
|     ) -> List[QAPISchemaObjectTypeMember]:
 | |
|         return [self._make_member(key, value['type'],
 | |
|                                   QAPISchemaIfCond(value.get('if')),
 | |
|                                   value.get('features'), info)
 | |
|                 for (key, value) in data.items()]
 | |
| 
 | |
|     def _def_struct_type(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['struct']
 | |
|         base = expr.get('base')
 | |
|         data = expr['data']
 | |
|         info = expr.info
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         self._def_definition(QAPISchemaObjectType(
 | |
|             name, info, expr.doc, ifcond, features, base,
 | |
|             self._make_members(data, info),
 | |
|             None))
 | |
| 
 | |
|     def _make_variant(
 | |
|         self,
 | |
|         case: str,
 | |
|         typ: str,
 | |
|         ifcond: QAPISchemaIfCond,
 | |
|         info: QAPISourceInfo,
 | |
|     ) -> QAPISchemaVariant:
 | |
|         if isinstance(typ, list):
 | |
|             assert len(typ) == 1
 | |
|             typ = self._make_array_type(typ[0], info)
 | |
|         return QAPISchemaVariant(case, info, typ, ifcond)
 | |
| 
 | |
|     def _def_union_type(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['union']
 | |
|         base = expr['base']
 | |
|         tag_name = expr['discriminator']
 | |
|         data = expr['data']
 | |
|         assert isinstance(data, dict)
 | |
|         info = expr.info
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         if isinstance(base, dict):
 | |
|             base = self._make_implicit_object_type(
 | |
|                 name, info, ifcond,
 | |
|                 'base', self._make_members(base, info))
 | |
|         variants = [
 | |
|             self._make_variant(key, value['type'],
 | |
|                                QAPISchemaIfCond(value.get('if')),
 | |
|                                info)
 | |
|             for (key, value) in data.items()]
 | |
|         members: List[QAPISchemaObjectTypeMember] = []
 | |
|         self._def_definition(
 | |
|             QAPISchemaObjectType(name, info, expr.doc, ifcond, features,
 | |
|                                  base, members,
 | |
|                                  QAPISchemaBranches(
 | |
|                                      info, variants, tag_name)))
 | |
| 
 | |
|     def _def_alternate_type(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['alternate']
 | |
|         data = expr['data']
 | |
|         assert isinstance(data, dict)
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         info = expr.info
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         variants = [
 | |
|             self._make_variant(key, value['type'],
 | |
|                                QAPISchemaIfCond(value.get('if')),
 | |
|                                info)
 | |
|             for (key, value) in data.items()]
 | |
|         tag_member = QAPISchemaObjectTypeMember('type', info, 'QType', False)
 | |
|         self._def_definition(
 | |
|             QAPISchemaAlternateType(
 | |
|                 name, info, expr.doc, ifcond, features,
 | |
|                 QAPISchemaAlternatives(info, variants, tag_member)))
 | |
| 
 | |
|     def _def_command(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['command']
 | |
|         data = expr.get('data')
 | |
|         rets = expr.get('returns')
 | |
|         gen = expr.get('gen', True)
 | |
|         success_response = expr.get('success-response', True)
 | |
|         boxed = expr.get('boxed', False)
 | |
|         allow_oob = expr.get('allow-oob', False)
 | |
|         allow_preconfig = expr.get('allow-preconfig', False)
 | |
|         coroutine = expr.get('coroutine', False)
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         info = expr.info
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         if isinstance(data, OrderedDict):
 | |
|             data = self._make_implicit_object_type(
 | |
|                 name, info, ifcond,
 | |
|                 'arg', self._make_members(data, info))
 | |
|         if isinstance(rets, list):
 | |
|             assert len(rets) == 1
 | |
|             rets = self._make_array_type(rets[0], info)
 | |
|         self._def_definition(
 | |
|             QAPISchemaCommand(name, info, expr.doc, ifcond, features, data,
 | |
|                               rets, gen, success_response, boxed, allow_oob,
 | |
|                               allow_preconfig, coroutine))
 | |
| 
 | |
|     def _def_event(self, expr: QAPIExpression) -> None:
 | |
|         name = expr['event']
 | |
|         data = expr.get('data')
 | |
|         boxed = expr.get('boxed', False)
 | |
|         ifcond = QAPISchemaIfCond(expr.get('if'))
 | |
|         info = expr.info
 | |
|         features = self._make_features(expr.get('features'), info)
 | |
|         if isinstance(data, OrderedDict):
 | |
|             data = self._make_implicit_object_type(
 | |
|                 name, info, ifcond,
 | |
|                 'arg', self._make_members(data, info))
 | |
|         self._def_definition(QAPISchemaEvent(name, info, expr.doc, ifcond,
 | |
|                                              features, data, boxed))
 | |
| 
 | |
|     def _def_exprs(self, exprs: List[QAPIExpression]) -> None:
 | |
|         for expr in exprs:
 | |
|             if 'enum' in expr:
 | |
|                 self._def_enum_type(expr)
 | |
|             elif 'struct' in expr:
 | |
|                 self._def_struct_type(expr)
 | |
|             elif 'union' in expr:
 | |
|                 self._def_union_type(expr)
 | |
|             elif 'alternate' in expr:
 | |
|                 self._def_alternate_type(expr)
 | |
|             elif 'command' in expr:
 | |
|                 self._def_command(expr)
 | |
|             elif 'event' in expr:
 | |
|                 self._def_event(expr)
 | |
|             elif 'include' in expr:
 | |
|                 self._def_include(expr)
 | |
|             else:
 | |
|                 assert False
 | |
| 
 | |
|     def check(self) -> None:
 | |
|         for ent in self._entity_list:
 | |
|             ent.check(self)
 | |
|             ent.connect_doc()
 | |
|         for ent in self._entity_list:
 | |
|             ent.set_module(self)
 | |
|         for doc in self.docs:
 | |
|             doc.check()
 | |
| 
 | |
|     def visit(self, visitor: QAPISchemaVisitor) -> None:
 | |
|         visitor.visit_begin(self)
 | |
|         for mod in self._module_dict.values():
 | |
|             mod.visit(visitor)
 | |
|         visitor.visit_end()
 |