Skip to content

Execution

Code execution and caching module for MLIR benchmarks.

This module handles the execution of transformed MLIR code, including bufferization, lowering, and performance measurement. It manages an execution cache to avoid redundant computations and interfaces with the MLIR execution engine to measure actual execution times.

OutputsStructure

Bases: Protocol

Placeholder for structure used as output of MLIR execution.

Note

Used for type hinting only. The actual structure is defined inside create_params().

Attributes:

Name Type Description
delta int

Execution time in nanoseconds.

get_results()

Returns the output arrays as numpy arrays

Returns:

Type Description
list[ndarray]

List of numpy arrays

Source code in mlir_rl_artifact/execution.py
def get_results(self) -> list[np.ndarray]:
    """Returns the output arrays as numpy arrays

    Returns:
        List of numpy arrays
    """
    ...

free_outputs()

Frees the output arrays

Source code in mlir_rl_artifact/execution.py
def free_outputs(self):
    """Frees the output arrays"""
    ...

Execution(exec_data_file=None, main_exec_data=None)

Execution()
Execution(exec_data_file: str)
Execution(
    exec_data_file: str,
    main_exec_data: dict[str, dict[str, int]],
)

Class that deals with code execution and cache management

Attributes:

Name Type Description
exec_data_file str

Path to the local file where exec data is cached

main_exec_data dict[str, dict[str, int]] | None

External exec data that was read at the beginning of training

Parameters:

Name Type Description Default
exec_data_file str | None

Path to the local file where exec data is cached

None
main_exec_data dict[str, dict[str, int]] | None

External exec data that was read at the beginning of training

None
Source code in mlir_rl_artifact/execution.py
def __init__(self, exec_data_file: Optional[str] = None, main_exec_data: Optional[dict[str, dict[str, int]]] = None):
    """Initialize a new instance

    Args:
        exec_data_file: Path to the local file where exec data is cached
        main_exec_data: External exec data that was read at the beginning of training
    """
    if exec_data_file is None:
        raise Exception("No existing instance of class Execution has been found")

    self.exec_data_file = exec_data_file
    self.main_exec_data = main_exec_data

execute_code(module, bench_name, seq)

Executes the given MLIR module and measures execution time.

Checks the execution cache first for code matching this sequence. If not found, applies bufferization and lowering transforms before executing the code.

Parameters:

Name Type Description Default
module Module

The MLIR module to execute.

required
bench_name str

The benchmark name for cache management.

required
seq list[list[Action]]

The sequence of transformations applied to reach this code.

required

Returns:

Type Description
int

Execution time in nanoseconds.

bool

Boolean indicating if execution succeeded.

bool

Boolean indicating if this is a cache miss (True if executed, False if cached).

Source code in mlir_rl_artifact/execution.py
def execute_code(self, module: Module, bench_name: str, seq: list[list['Action']]) -> tuple[int, bool, bool]:
    """Executes the given MLIR module and measures execution time.

    Checks the execution cache first for code matching this sequence. If not found,
    applies bufferization and lowering transforms before executing the code.

    Args:
        module: The MLIR module to execute.
        bench_name: The benchmark name for cache management.
        seq: The sequence of transformations applied to reach this code.

    Returns:
        Execution time in nanoseconds.
        Boolean indicating if execution succeeded.
        Boolean indicating if this is a cache miss (True if executed, False if cached).
    """
    code_cache_key = self.get_code_cache_key(seq)
    cache_exec_time = self.__check_execution_cache(bench_name, code_cache_key)
    if cache_exec_time is not None:
        return cache_exec_time, True, False

    transform_bufferize_and_lower_v(module)
    real_exec_time, success = self.__execute_bufferized_code_wrapper(module)
    return real_exec_time, success, True

update_execution_cache(new_data)

Update the temp execution cache with the new data.

Parameters:

Name Type Description Default
new_data dict[str, dict[str, int]]

The new data to update.

required
Source code in mlir_rl_artifact/execution.py
def update_execution_cache(self, new_data: dict[str, dict[str, int]]):
    """Update the temp execution cache with the new data.

    Args:
        new_data: The new data to update.
    """
    if not self.exec_data_file:
        raise Exception("Execution data file not provided")

    with open(self.exec_data_file, "r") as file:
        data: dict[str, dict[str, int]] = json.load(file)

    for bench_name, bench_data in new_data.items():
        if bench_name not in data:
            data[bench_name] = {}
        data[bench_name].update(bench_data)

    try:
        with open(self.exec_data_file + ".tmp", "w") as file:
            json.dump(data, file, indent=2)
            file.flush()
            os.fsync(file.fileno())
        os.replace(self.exec_data_file + ".tmp", self.exec_data_file)
    finally:
        if os.path.exists(self.exec_data_file + ".tmp"):
            os.remove(self.exec_data_file + ".tmp")

get_code_cache_key(seq)

Get the code cache key for the given operation state.

Parameters:

Name Type Description Default
seq list[list[Action]]

The sequence of transformations applied to reach this code.

required

Returns:

Type Description
str

the code cache key.

Source code in mlir_rl_artifact/execution.py
def get_code_cache_key(self, seq: list[list['Action']]) -> str:
    """Get the code cache key for the given operation state.

    Args:
        seq: The sequence of transformations applied to reach this code.

    Returns:
        the code cache key.
    """
    ops_codes = []
    for op_seq in seq:
        # TODO: There might be edge cases where part of a seq is invalid `env.py:301`
        ops_codes.append(''.join(map(str, op_seq)))

    return '|'.join(ops_codes)

__execute_bufferized_code(module)

Lowers and runs the given MLIR code using Python bindings, then returns the execution time and assertion result (if the executed code returns the correct result).

Parameters:

Name Type Description Default
module Module

The MLIR module to execute.

required

Returns:

Type Description
int

The execution time in nanoseconds.

bool

The assertion result.

Source code in mlir_rl_artifact/execution.py
def __execute_bufferized_code(self, module: Module) -> tuple[int, bool]:
    """Lowers and runs the given MLIR code using Python bindings, then returns the execution time and assertion
    result (if the executed code returns the correct result).

    Args:
        module: The MLIR module to execute.

    Returns:
        The execution time in nanoseconds.
        The assertion result.
    """

    pass_pipeline = """builtin.module(
        canonicalize,
        buffer-deallocation-pipeline,
        convert-bufferization-to-memref,
        convert-linalg-to-loops,
        scf-forall-to-parallel,
        convert-scf-to-openmp,
        expand-strided-metadata,
        finalize-memref-to-llvm,
        convert-scf-to-cf,
        lower-affine,

        convert-openmp-to-llvm,
        convert-vector-to-llvm,
        convert-math-to-llvm,
        convert-math-to-libm,
        finalize-memref-to-llvm,
        convert-func-to-llvm,
        convert-index-to-llvm,
        convert-arith-to-llvm,
        convert-cf-to-llvm,

        reconcile-unrealized-casts,
        canonicalize,
        cse
    )"""

    pm = PassManager.parse(pass_pipeline, module.context)

    inputs, outs_struct = self.__create_params(module)
    args = self.__convert_to_args(inputs, outs_struct)

    pm.run(module.operation)
    execution_engine = ExecutionEngine(
        module,
        opt_level=3,
        shared_libs=os.getenv("MLIR_SHARED_LIBS", "").split(","),
    )

    try:
        times = []
        for _ in range(5):
            execution_engine.invoke("main", *args)
            # If output tensors are needed call `get_results` before `free_outputs`
            outs_struct.free_outputs()
            times.append(outs_struct.delta)
    finally:
        outs_struct.free_outputs()

    return median(times), True

__check_execution_cache(bench_name, cache_key)

Check the execution cache for the given operation state.

Parameters:

Name Type Description Default
bench_name str

The benchmark name to check.

required
cache_key str

The cache key to check.

required

Returns:

Type Description
int | None

the execution time in nanoseconds if the operation is found in the cache, otherwise None.

Source code in mlir_rl_artifact/execution.py
def __check_execution_cache(self, bench_name: str, cache_key: str) -> Optional[int]:
    """Check the execution cache for the given operation state.

    Args:
        bench_name: The benchmark name to check.
        cache_key: The cache key to check.

    Returns:
        the execution time in nanoseconds if the operation is found in the cache, otherwise None.
    """
    # Start by checking the main execution data
    if self.main_exec_data and bench_name in self.main_exec_data and cache_key in self.main_exec_data[bench_name]:
        return self.main_exec_data[bench_name][cache_key]

    # If no hit in the main cache file, check the temporary cache file
    if not self.exec_data_file:
        return None

    with open(self.exec_data_file, "r") as file:
        data: dict[str, dict[str, int]] = json.load(file)

    if bench_name in data and cache_key in data[bench_name]:
        return data[bench_name][cache_key]

    # No hit in both cache files
    return None

__create_params(module) staticmethod

Creates the input and output parameters for the given MLIR module.

Parameters:

Name Type Description Default
module Module

The MLIR module to create the parameters for.

required

Returns:

Type Description
list[ndarray]

The list of inputs as numpy arrays

OutputsStructure

The outputs structure (output arrays + delta)

Source code in mlir_rl_artifact/execution.py
@staticmethod
def __create_params(module: Module) -> tuple[list[np.ndarray], OutputsStructure]:
    """Creates the input and output parameters for the given MLIR module.

    Args:
        module: The MLIR module to create the parameters for.

    Returns:
        The list of inputs as numpy arrays
        The outputs structure (output arrays + delta)
    """
    def __get_dtype(memref_type: MemRefType):
        et = memref_type.element_type
        match et:
            case F32Type():
                np_dtype = np.float32
            case F64Type():
                np_dtype = np.float64
            case IntegerType():
                match et.width:
                    case 32:
                        np_dtype = np.int32
                    case 64:
                        np_dtype = np.int64
                    case _:
                        raise Exception(f'unexpected element type {et}')
            case _:
                raise Exception(f'unexpected element type {et}')
        return np_dtype

    # Get the main function
    main_func = next(op for op in module.body.operations if isinstance(op, FuncOp) and (op.name.value == 'main'))

    # Create input params
    inputs: list[np.ndarray] = []
    for input_type in main_func.type.inputs:
        assert isinstance(input_type, MemRefType), f'unexpected input type {input_type}'
        in_arr = np.zeros(input_type.shape, dtype=__get_dtype(input_type))
        inputs.append(in_arr)

    # Create results arg
    res_types = main_func.type.results

    exec_time_type = res_types[-1]
    if not (isinstance(exec_time_type, IntegerType) and exec_time_type.width == 64):
        raise Exception(f'unexpected exec time type {exec_time_type}')

    out_fields: list[tuple[str, type[ctypes.Structure]]] = []
    for i, out_type in enumerate(res_types[:-1]):
        assert isinstance(out_type, MemRefType), f'unexpected output type {out_type}'
        descriptor_type = make_nd_memref_descriptor(out_type.rank, as_ctype(__get_dtype(out_type)))
        out_fields.append((f'out_{i}', descriptor_type))

    class _OutputsStructure(ctypes.Structure):
        _fields_ = [
            *out_fields,
            ("delta", ctypes.c_int64)
        ]
        delta: int

        def get_results(self):
            res: list[np.ndarray] = []
            for field_name, _ in out_fields:
                out_array = ranked_memref_to_numpy([getattr(self, field_name)])
                res.append(out_array.copy())
            return res

        def free_outputs(self):
            for field_name, mem_desc_T in out_fields:
                memref_descriptor: ctypes.Structure = getattr(self, field_name)
                allocated_ptr: Optional[ctypes.c_longlong] = getattr(memref_descriptor, 'allocated', None)

                if allocated_ptr:
                    address = ctypes.cast(allocated_ptr, ctypes.c_void_p)
                    if address.value:
                        Execution.free_pointer(address)
                        setattr(self, field_name, mem_desc_T())

    outputs_structure = _OutputsStructure()
    for i, (field_name, field_type) in enumerate(out_fields):
        out_arg = field_type()
        setattr(outputs_structure, field_name, out_arg)

    return inputs, outputs_structure

__convert_to_args(inputs, outputs_structure) staticmethod

Converts input arrays and output structure into ctypes arguments for MLIR execution.

Prepares arguments in the format required by the MLIR execution engine. Each argument is a double pointer (pointer to pointer) to allow proper handling in the C calling convention.

Parameters:

Name Type Description Default
inputs list[ndarray]

List of input numpy arrays to be passed to the MLIR kernel.

required
outputs_structure OutputsStructure

ctypes Structure containing output memref descriptors and execution time.

required

Returns:

Type Description
list

List of double pointers to ctypes Structures suitable for passing to ExecutionEngine.invoke().

Source code in mlir_rl_artifact/execution.py
@staticmethod
def __convert_to_args(inputs: list[np.ndarray], outputs_structure: OutputsStructure) -> list:
    """Converts input arrays and output structure into ctypes arguments for MLIR execution.

    Prepares arguments in the format required by the MLIR execution engine. Each argument
    is a double pointer (pointer to pointer) to allow proper handling in the C calling
    convention.

    Args:
        inputs: List of input numpy arrays to be passed to the MLIR kernel.
        outputs_structure: ctypes Structure containing output memref descriptors and
            execution time.

    Returns:
        List of double pointers to ctypes Structures suitable for passing to ExecutionEngine.invoke().
    """
    args: list[ctypes._Pointer[ctypes._Pointer[ctypes.Structure]]] = []
    args.append(ctypes.pointer(ctypes.pointer(outputs_structure)))
    for in_arr in inputs:
        args.append(ctypes.pointer(ctypes.pointer(
            get_ranked_memref_descriptor(in_arr)
        )))
    return args

free_pointer(ptr) staticmethod

Free the memory pointed to by the given pointer using the C standard library.

Parameters:

Name Type Description Default
ptr c_void_p

The pointer to free.

required
Source code in mlir_rl_artifact/execution.py
@staticmethod
def free_pointer(ptr: ctypes.c_void_p):
    """Free the memory pointed to by the given pointer using the C standard library.

    Args:
        ptr: The pointer to free.
    """
    # Find the C standard library
    libc_path = ctypes.util.find_library('c')
    if not libc_path:
        raise RuntimeError("C standard library not found.")
    libc = ctypes.CDLL(libc_path)

    # Define the signature for free
    free = libc.free
    free.argtypes = [ctypes.c_void_p]
    free.restype = None

    # Call free
    free(ptr)