Source code for aitemplate.backend.codegen

#  Copyright (c) Meta Platforms, Inc. and affiliates.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
"""
This module is for generating the final C++
source code in files from Tensor and Operators.
Functions in this module will be used for generating
function source code files, profiler source code files,
and model driver source code files.
"""

from __future__ import annotations

import io
import json
import logging
import os
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import jinja2

from aitemplate.backend import registry

from aitemplate.backend.main_templates import MODEL_CONTAINER_TEMPLATE, MODEL_TEMPLATE
from aitemplate.backend.target import Target

from aitemplate.compiler.base import IntImm, IntVar, IntVarTensor, Operator, Tensor
from aitemplate.compiler.dtype import dtype_to_enumerator, get_dtype_size
from aitemplate.compiler.tensor_accessor import TensorAccessor

from aitemplate.compiler.transform.memory_planning import Workspace
from aitemplate.utils.debug_settings import AITDebugSettings
from aitemplate.utils.environ import (
    multistream_additional_streams,
    multistream_max_mem_parallel_ops,
    multistream_mode,
)
from aitemplate.utils.graph_utils import split_simple_multistream_parallel_ops
from aitemplate.utils.misc import is_debug

# pylint: disable=C0103,W0613,C0301


_LOGGER = logging.getLogger(__name__)

DTYPE_TO_POINTERTYPE: Dict[str, str] = {
    "float32": "float*",
    "float": "float*",
    "int": "int32_t*",
    "int32": "int32_t*",
    "int64": "int64_t*",
    "bool": "bool*",
}


CONSTANT_FOLDER_MODEL_NAME = "ConstantFolder"
MODEL_NAME = "Model"


[docs]def gen_profiler(sorted_graph: List[Tensor], workdir: str, dynamic_profiling_strategy): """Generate operator profiler source code files for the given graph Parameters ---------- sorted_graph : List[Tensor] The network after running toposort transformation workdir : str Target directory for generated C++ source code files dynamic_profiling_strategy: DynamicProfileStrategy, optional A dynamic profiling strategy, used to filter generated profiles at compile time. Pass-through to gen_profiler kernels of nodes in the graph. See also: :func:`~aitemplate.compiler.transform.profile.profile` """ results = [] for node in sorted_graph: for func in node.src_ops(): if "has_profiler" in func._attrs and func._attrs["has_profiler"]: results.append(func.gen_profiler(workdir, dynamic_profiling_strategy)) return results
[docs]def gen_function_src( sorted_graph: List[Tensor], workdir: str, model_name: str = "" ) -> List[Tuple[str, str]]: """Generate functions source code files for the given graph Parameters ---------- sorted_graph : List[Tensor] The network after running toposort transformation workdir : str Target directory for generated C++ source code files model_name : str, optional Sub working directory in the workdir for the given model, by default "" Returns ------- List[Tuple[str, str]] List of tuple (source file path, object file path) """ target = Target.current() file_pairs = [] exist_func = set() prefix = os.path.join(workdir, model_name) for node in sorted_graph: for func in node.src_ops(): fname = func._attrs["name"] if fname not in exist_func: src_path = os.path.join(prefix, fname + target.src_extension()) obj_path = os.path.join(prefix, fname + ".obj") file_pairs.append((src_path, obj_path)) with open(src_path, "w") as fo: fo.write(func.gen_function()) exist_func.add(fname) _LOGGER.info(f"generated {len(file_pairs)} function srcs") return file_pairs
[docs]def map_set( map_name: str, key_name: str, value_name: Optional[str] = None, indent: str = " ", ) -> str: """Generate a string setting a value in a map. If value name is given, sets map_name["key_name"] = value_name. Else, sets map_name["key_name"] = key_name. Special maps like dim_map may make additional modificiations to the LHS of this expression. Parameters ---------- map_name : str The map to use key_name : str The key to set. Will be put into quotes. value_name : Optional[str] If set, force map_name["key_name"] = value_name indent : str For formatting Returns ------- str The formatted map set statement. """ if value_name is not None: value = value_name else: value = key_name if map_name == "dim_map": # Because ROCM backend uses int64_t while CUDA uses int, # this is a temporary workaround to cast int64_t* to int*. # FIXME: After we unified the two backends, # reinterpret_cast<int *> should be removed. value = f"reinterpret_cast<int64_t *>(&{value})" return f'{indent}{map_name}["{key_name}"] = {value};'
def set_value(lhs: Any, rhs: Any, indent: str = " ") -> str: return f"{indent}{lhs} = {rhs};"
[docs]def set_value_from_map(map_name: Any, var_name: Any, indent: str = " ") -> str: """Generate a string that sets a value to something stored in a map. Parameters ---------- map_name : str The map to use var_name : str The var_name, used as the name of the value and the key. indent : str For formatting Returns ------- str The formatted statement. """ key = var_name value = var_name return f'{indent}{value} = static_cast<decltype({value})>({map_name}["{key}"]);'
def count_inputs_outputs(graph): n_inputs = n_outputs = 0 for node in graph: if node._attrs["is_input"]: n_inputs += 1 if node._attrs["is_output"]: n_outputs += 1 return n_inputs, n_outputs
[docs]def check_not_null( tensor: Tensor, tensor_idx: Optional[int] = None, skip_if_lower_bound_is_zero: bool = False, ) -> str: """ Generate a nullptr check to be used by pointer initialization code. If skip_if_lower_bound_is_zero == True, no code will be generated when the Tensor has at least one dynamic dim with a lower bound of zero. This is most useful for outputs; we put the nullptr checks at the start of the inference, but we won't know output shapes until after Run() finishes. We therefore just relax the check for these outputs - only allow them to be null if their lower bound is zero, otherwise never allow them to be null. """ name = tensor._attrs["name"] if tensor_idx is None: check = name else: check = f"params_[{tensor_idx}].ptr" shape = ["1"] lower_bound_is_zero = False for dim in tensor._attrs["shape"]: lower_bound_is_zero |= dim.lower_bound() == 0 if skip_if_lower_bound_is_zero and lower_bound_is_zero: return "" if isinstance(dim, IntImm): shape.append(str(dim._attrs["values"][0])) else: shape.append(dim._attrs["name"]) nullptr_check = f"{check} == nullptr" condition = ( nullptr_check # If the lower bound of the shape is positive, never allow # the tensor to be null. if not lower_bound_is_zero # Otherwise, allow it to be null only if the (possibly dynamic) # size is zero. else f"{nullptr_check} && {'*'.join(shape)} != 0" ) return f""" if ({condition}) {{ throw std::runtime_error("Constant {name} was not set! Set the value with set_constant."); }} """
def extract_input_output_shapes(func_attrs): if "input_accessors" in func_attrs: input_shape = [ [v.pseudo_code() for v in acc.original_shapes] for acc in func_attrs["input_accessors"] ] else: input_shape = [ [v.pseudo_code() for v in t.shape()] for t in func_attrs["inputs"] ] if "output_accessors" in func_attrs: output_shape = [ [v.pseudo_code() for v in acc.original_shapes] for acc in func_attrs["output_accessors"] ] else: output_shape = [ [v.pseudo_code() for v in t.shape()] for t in func_attrs["outputs"] ] return input_shape, output_shape def device_copy(dst_tensor: Tensor, src_tensor: Tensor, dst_idx: int) -> str: src_name = src_tensor._attrs["name"] dst_ptr = f"params_[{dst_idx}].ptr" shape = ["1"] for dim in dst_tensor._attrs["shape"]: if isinstance(dim, IntImm): shape.append(str(dim._attrs["values"][0])) else: shape.append(dim._attrs["name"]) shape = "*".join(shape) size = f"{shape} * {get_dtype_size(dst_tensor._attrs['dtype'])}" return f"DEVICE_CHECK(DeviceToDeviceCopy({dst_ptr}, {src_name}, {size}, stream));" def _construct_output_name_to_index_map( sorted_graph: List[Tensor], output_tensors: List[Tensor] ) -> Dict[str, int]: """ Use the given output ordering to construct a name -> index map to be used for constructing an internal ordering during codegen. The indices in the map are propagated to an output's entire alias set. If two outputs are part of the same alias set, only one of them propagates its output index. """ result = {tensor._attrs["name"]: i for i, tensor in enumerate(output_tensors)} # Mark alias sets for tensor in reversed(sorted_graph): name = tensor._attrs["name"] orig = tensor._attrs["is_view_of"] if orig is None: continue orig_name = orig._attrs["name"] if name in result and orig_name not in result: result[orig_name] = result[name] return result class ModelContainerGenerator: def __init__( self, max_blob_size: int, max_constant_blob_size: int, workspace: Workspace, constants_data_file: Optional[io.BytesIO], graph: List[Tensor], output_tensors: List[Tensor], model_name: str = MODEL_NAME, additional_unbound_constants: Optional[List[Tensor]] = None, debug_settings: Optional[AITDebugSettings] = None, model_dir: Optional[str] = None, ): self.target = Target.current() self.f_var_decl = registry.get(self.target.name() + ".lib.var_decl") self.f_ptr_decl = registry.get(self.target.name() + ".lib.void_ptr_decl") self.dtype_to_backend_type = registry.get( self.target.name() + ".lib.dtype_to_backend_type" ) self.constants_data_file = constants_data_file self.exist_funcs = set() self.func_decl = [] self.tensor_slice = [] self.tensor_map_set = [] self.set_inputs = [] self.func_name_seq = [] self.func_seq = [] self._input_shape_seq = [] self._output_shape_seq = [] self.func_prop_seq = [] self.tensor_decl = [] self.dim_decl = [] self.jagged_decl = [] self.device_to_device_copies = [] self.function_state = [] self.set_up_constants = [] self.set_up_param_names = [] self.set_up_param_dtypes = [] self.set_up_bound_constant_dtypes = [] self.set_up_bound_constant_size = [] self.set_up_output_shapes = [] self.set_up_param_dynamic_shapes = [] self.state_record = set() self.visited_func = set() self.visited_dims = set() self.set_up_constant_names = [] self.set_up_constant_original_names = [] self.num_constants = 0 self.constants_data_size = 0 self.owned_constants_init = [] self.reset_constants = [] self.set_up_bound_constant_offsets = [] self.set_up_constant_folding_outputs_offsets = [] self.input_idx = 0 self.bound_constant_idx = 0 self.unbound_constant_idx = 0 self.output_name_to_idx = _construct_output_name_to_index_map( graph, output_tensors ) self.graph = graph self.num_inputs, self.num_outputs = count_inputs_outputs(graph) self.max_blob_size = max_blob_size self.max_constant_blob_size = max_constant_blob_size self.workspace = workspace self.debug_settings = ( AITDebugSettings() if debug_settings is None else debug_settings ) # This records whether or not we should debug header. self.debug_header = False self.model_name = model_name # This is needed for logging activities only. self.model_dir = model_dir # additional_unbound_constants stores tensors that are used in constant folding # but are not used in the main graph. We need this info so we can codegen SetConstant # correctly; when we call SetConstant for one of these special names, we want to forward # to constant_folder_->SetConstant(). self.additional_unbound_constants = additional_unbound_constants self.set_up_constant_folding_inputs = [] # This is used to handle a corner case; if we have an owned tensor that is used as an input # for constant folding, we need to allocate space for it in our constant buffer, but its # size won't be found during memory planning. self.extra_owned_constant_size = 0 # This is a temporary dictionary that holds the rendered C++ code for operators. self._rendered_func_code: Dict[Operator, str] = {} # This is a temporary list that holds rendered C++ code for checks. self._rendered_checks_func_code: List[str] = [] def _tensor_slice_func( self, node: Tensor, blob_name: str, indent=" ", ) -> str: offset = node._attrs["offset"] name = node._attrs["name"] return f"{indent}{name} = reinterpret_cast<decltype({name})>({blob_name} + {offset});" def _record_param_tensor_info(self, tensor: Tensor, idx: int) -> None: def max_value(var_or_imm): if isinstance(var_or_imm, IntImm): return var_or_imm.value() else: assert isinstance(var_or_imm, IntVar) return var_or_imm.upper_bound() shape_init = ", ".join(str(max_value(dim)) for dim in tensor._attrs["shape"]) param_shape_init = ", ".join( f'&{dim._attrs["name"]}' for dim in tensor._attrs["shape"] ) self.set_up_output_shapes.append( set_value(f"max_param_shapes_[{idx}]", f"{{{shape_init}}}") ) param_shape_init = ", ".join( f'ParamDim({dim.lower_bound()}, {dim.upper_bound()}, &{dim._attrs["name"]})' for dim in tensor._attrs["shape"] ) self.set_up_param_dynamic_shapes.append( set_value(f"params_[{idx}].shape_ptrs", f"{{{param_shape_init}}}") ) name = tensor._attrs["name"] self.set_up_param_names.append(set_value(f"param_names_[{idx}]", f'"{name}"')) self.set_up_param_dtypes.append( set_value( f"param_dtypes_[{idx}]", dtype_to_enumerator(tensor.dtype()), ) ) def _add_owned_constant(self, tensor: Tensor) -> None: """ Add an owned constant, e.g. one with a bound "data" attribute. Here, we codegen some extra logic to load it into memory from the .so. """ assert ( self.constants_data_file is not None ), "Cannot add owned constants without a data file" name = tensor._attrs["name"] data = tensor._attrs["data"] assert ( tensor._attrs["offset"] >= 0 ), f"Constant node '{name}' must have non-negative offset" num_bytes = len(data) self.constants_data_file.write(data.to_bytes()) constant_info = f'ConstantInfo{{"{name}", {self.constants_data_size}, {tensor._attrs["offset"]}, {num_bytes}}}' self.owned_constants_init.append(constant_info) self.constants_data_size += num_bytes self.num_constants += 1 def _codegen_bound_constant(self, tensor: Tensor) -> None: if tensor._attrs.get("is_internal_constant", False): return name = tensor._attrs["name"] self.set_up_constant_names.append( set_value( f'bound_constant_name_to_idx_["{name}"]', self.bound_constant_idx, ) ) original_name = tensor._attrs.get("original_name") if original_name is not None: self.set_up_constant_original_names.append( set_value( f'constant_name_to_original_name_["{name}"]', f'"{original_name}"', ) ) self.set_up_bound_constant_dtypes.append( set_value( f"bound_constant_dtypes_[{self.bound_constant_idx}]", dtype_to_enumerator(tensor.dtype()), ) ) self.set_up_bound_constant_size.append( set_value( f"bound_constant_size_[{self.bound_constant_idx}]", len(tensor._attrs["data"]), ) ) self.set_up_bound_constant_offsets.append( set_value( f"bound_constant_offsets_[{self.bound_constant_idx}]", tensor._attrs["offset"], ) ) self.bound_constant_idx += 1 def _codegen_param_setup( self, tensor: Tensor, ) -> None: """ Generate code needed for setting up a constant in Model/ModelContainer. """ name = tensor._attrs["name"] data = tensor._attrs["data"] const_slice = self._tensor_slice_func(tensor, "constants") if data is not None: # Owned constant. Set up logic for copying the constant in from *.so. self.set_up_constants.append(const_slice) self.set_up_constants.append( set_value( f'constant_name_to_ptr_["{name}"]', f"const_cast<const void**>(reinterpret_cast<void**>(&{name}))", ) ) self._codegen_bound_constant(tensor) if not tensor._attrs.get("is_internal_constant", False): self.reset_constants.append(const_slice) if self.constants_data_file is not None: self._add_owned_constant(tensor) elif tensor._attrs["constant_folding_output_idx"] is not None: self.set_up_constant_folding_outputs_offsets.append( set_value( f'constant_folding_outputs_offsets_[{tensor._attrs["constant_folding_output_idx"]}]', tensor._attrs["offset"], ) ) self.tensor_slice.append(const_slice) if not tensor._attrs.get("is_internal_constant", False): self.reset_constants.append(const_slice) elif not isinstance(tensor, IntVarTensor): # Unbound constant. We will expect the user to set this via SetConstant. self.set_up_constant_names.append( set_value( f'unbound_constant_name_to_idx_["{name}"]', self.unbound_constant_idx, ) ) original_name = tensor._attrs.get("original_name") if original_name is not None: self.set_up_constant_original_names.append( set_value( f'constant_name_to_original_name_["{name}"]', f'"{original_name}"', ) ) self._record_param_tensor_info( tensor, self.unbound_constant_idx + self.num_inputs + self.num_outputs, ) self.unbound_constant_idx += 1 self.set_inputs.append(check_not_null(tensor)) self.set_up_constants.append( set_value( f'constant_name_to_ptr_["{name}"]', f"const_cast<const void**>(reinterpret_cast<void**>(&{name}))", ) ) def _codegen_input_tensor(self, tensor: Tensor) -> None: name = tensor._attrs["name"] view = tensor._attrs["is_view_of"] assert ( view is None ), f"_codegen_input_tensor cannot be called with a view; expected a non-view tensor with is_input=True, got: {tensor}" self.set_inputs.append( set_value( name, f"static_cast<decltype({name})>(params_[{self.input_idx}].ptr)", ) ) self.set_inputs.append(check_not_null(tensor)) self._record_param_tensor_info(tensor, self.input_idx) self.input_idx += 1 def _get_output_idx(self, name: str) -> int: assert ( name in self.output_name_to_idx ), f"Tensor {name} was marked as an output, but its index was not found in output_name_to_index" # Add num_inputs since we internally store outputs in the same array as inputs w/ # inputs first return self.output_name_to_idx[name] + self.num_inputs def _codegen_output_aliases_tensor(self, tensor: Tensor) -> None: name = tensor._attrs["name"] view = tensor._attrs["is_view_of"] external_tensor = tensor._attrs["external_tensor"] if external_tensor is not None: assert not external_tensor._attrs[ "is_param" ], "Views of constants should be folded." self.set_inputs.append(set_value(name, view._attrs["name"])) return if view: # View is already initialized, assign to view. self.set_inputs.append(set_value(name, view._attrs["name"])) else: # Original tensor, initialize it. output_idx = self._get_output_idx(name) self.set_inputs.append( set_value( name, f"static_cast<decltype({name})>(params_[{output_idx}].ptr)", ) ) def _codegen_output_tensor(self, tensor: Tensor) -> None: is_param = tensor._attrs["is_param"] is_input = tensor._attrs["is_input"] view = tensor._attrs["is_view_of"] is_view = view is not None external_tensor = tensor._attrs["external_tensor"] name = tensor._attrs["name"] output_idx = self._get_output_idx(name) if is_param: self._codegen_param_setup(tensor) self.device_to_device_copies.append(device_copy(tensor, tensor, output_idx)) elif external_tensor is not None: # Special view cases for outputs; we can hit this case if the output # is a view of a constant, input, or another output. assert ( is_view ), f"orig_tensor is not None, but node {name} is not marked as a view! Node: {tensor}" self.set_inputs.append( check_not_null(tensor, output_idx, skip_if_lower_bound_is_zero=True) ) self.set_inputs.append(set_value(name, view._attrs["name"])) self.device_to_device_copies.append( device_copy(tensor, external_tensor, output_idx) ) elif is_input: # Inputs that are also outputs require an extra copy self.set_inputs.append( set_value( name, f"static_cast<decltype({name})>(params_[{self.input_idx}].ptr)", ) ) self._record_param_tensor_info(tensor, self.input_idx) self.device_to_device_copies.append(device_copy(tensor, tensor, output_idx)) self.input_idx += 1 elif is_view: self.set_inputs.append(set_value(name, view._attrs["name"])) self.set_inputs.append( check_not_null(tensor, skip_if_lower_bound_is_zero=True) ) else: self.set_inputs.append( set_value( name, f"static_cast<decltype({name})>(params_[{output_idx}].ptr)", ) ) self.set_inputs.append( check_not_null(tensor, skip_if_lower_bound_is_zero=True) ) self._record_param_tensor_info(tensor, output_idx) def _process_dims(self, shape: List[IntVar]) -> None: for dim in shape: if dim._attrs["name"] in self.visited_dims: continue intimm = 0 if len(dim._attrs["values"]) == 1: intimm = dim._attrs["values"][0] self.dim_decl.append(self.f_var_decl(dim._attrs["name"], intimm)) self.visited_dims.add(dim._attrs["name"]) def _process_jagged_dims(self, node: Tensor) -> None: # JaggedIntVars are processed separately here (besides being processed # like normal IntVars in _process_dims above), as they require adding # the offset structure declaration into the Model codegen, as well as # the batch_dim if it's not set when processing other tensors that # directly contain the batch_dim it in their shapes jagged_int_var = node._attrs["shape"][0] name = jagged_int_var._attrs["name"] # we use the key with a prefix here, as the JaggedIntVar's name # is identical to the name of the total_length it is based on, # which might have been traversed already key = f"jagged_int_var_{name}" if key not in self.visited_dims: for i, jagged_dim in enumerate(jagged_int_var.jagged_dims()): if jagged_dim.offsets() is None: raise RuntimeError( f"No offsets Tensor is associated with the JaggedDim {i} in " f"the JaggedIntVar {name}: can't generate offset-related code." ) self.jagged_decl.append( f" {jagged_int_var.offsets_struct_type()} " f"{jagged_int_var.offsets_var_name()};" ) self.visited_dims.add(key) batch_dim_name = jagged_int_var.batch_dim()._attrs["name"] if batch_dim_name not in self.visited_dims: batch_dim_value = ( 0 if not isinstance(jagged_int_var.batch_dim(), IntImm) else jagged_int_var.batch_dim().value() ) self.dim_decl.append(self.f_var_decl(batch_dim_name, batch_dim_value)) self.visited_dims.add(batch_dim_name) def _process_dims_for_tensor(self, node: Tensor) -> None: self._process_dims(node._attrs["shape"]) def _process_dims_for_tensor_accessors( self, tensor_accessors: List[TensorAccessor] ) -> None: if tensor_accessors is None: return for accessor in tensor_accessors: self._process_dims(accessor.original_shapes) def _process_dims_for_op(self, node: Operator) -> None: self._process_dims_for_tensor_accessors(node._attrs.get("input_accessors")) self._process_dims_for_tensor_accessors(node._attrs.get("output_accessors")) def _process_src_ops(self, node: Tensor) -> None: funcs = node.src_ops() if len(funcs) == 0: return for func in funcs: f_func_decl = registry.get( ".".join((self.target.name(), func._attrs["op"], "func_decl")) ) f_func_call = registry.get( ".".join((self.target.name(), func._attrs["op"], "func_call")) ) if func._attrs["name"] not in self.exist_funcs: self.func_decl.append(f_func_decl(func._attrs)) self.exist_funcs.add(func._attrs["name"]) # Only code gen func once for ops with multiple outputs # The func can get renamed during refine_graph pass. # We use original_name here because it's unique. if func._attrs["original_name"] not in self.visited_func: self.visited_func.add(func._attrs["original_name"]) seq = f_func_call(func._attrs, indent=" ") if self.debug_settings.gen_profiler_annotation: seq = f' {{\n RAII_ProfilerRange _raiiOpProfilerRange("{func._attrs["outputs"][0]._attrs["name"]}");\n{seq}\n }}' self.func_name_seq.append(func._attrs["original_name"]) self.func_seq.append(seq) input_shape, output_shape = extract_input_output_shapes(func._attrs) self._input_shape_seq.append(input_shape) self._output_shape_seq.append(output_shape) props = {} for item in func._args_for_pseudo_code(): res = item.split("=") if len(res) == 2: res[1] = res[1].replace("\n", " ") props[res[0]] = res[1] self.func_prop_seq.append(props) # save the rendered code for the future self._rendered_func_code[func] = seq if "int_state_flag" in func._attrs: if func._attrs["name"] not in self.state_record: self.function_state.append( f' int64_t {func._attrs["name"]}_state {{0}};' ) self.state_record.add(func._attrs["name"]) self._process_dims_for_op(func) if ( self.debug_settings.check_all_nan_and_inf or node._attrs.get("check_nan_and_inf", False) ) and (not isinstance(node, IntVarTensor)): self._append_check_nan_and_inf(node) if ( self.debug_settings.check_all_outputs or node._attrs.get("check_outputs", False) ) and (not isinstance(node, IntVarTensor)): self._append_check_outputs(node) def _append_check_nan_and_inf(self, node: Tensor): self.debug_header = True tensor_name = node._attrs["name"] elem_cnt = "*".join([shape.pseudo_code() for shape in node.shape()]) self.func_name_seq.append("nan_and_inf_check") code_text = f' InvokeInfAndNanChecker(reinterpret_cast<half*>({tensor_name}), "{tensor_name}", {elem_cnt}, stream);\n' self.func_seq.append(code_text) self._rendered_checks_func_code.append(code_text) def _append_check_outputs(self, node: Tensor): self.debug_header = True tensor_name = node._attrs["name"] elem_cnt = "*".join([shape.pseudo_code() for shape in node.shape()]) self.func_name_seq.append("output_check") backend_type = self.dtype_to_backend_type(node._attrs["dtype"]) code_text = ( f" InvokeOutputsChecker((const {backend_type}*)({tensor_name}), " f'"{tensor_name}", {elem_cnt}, stream);\n' ) self.func_seq.append(code_text) self._rendered_checks_func_code.append(code_text) def append_tensor(self, node: Tensor) -> None: if node._attrs["nop"]: return name = node._attrs["name"] dtype = node._attrs["dtype"] if isinstance(node, IntVarTensor): # Check to prevent duplicate declaration in case IntVarTensor is already declared from dims for another tensor if node._attrs["name"] in self.visited_dims: return int_var = node._attrs["int_var"] if isinstance(int_var, IntImm): self.tensor_decl.append( self.f_var_decl(name=name, value=int_var._attrs["values"][0]) ) else: self.tensor_decl.append(self.f_var_decl(name=name)) # IntVarTensor could be used as dim too, add to visited to prevent duplicated declaration. self.visited_dims.add(name) else: self.tensor_decl.append(self.f_ptr_decl(name=name, dtype=dtype)) is_param = node._attrs["is_param"] is_output = node._attrs["is_output"] has_output_aliases = node._attrs["has_output_aliases"] is_input = node._attrs["is_input"] view = node._attrs["is_view_of"] is_view = view is not None if is_output: # Outputs have a ton of special cases that depend on # is_input, is_view, etc, so this condition needs to # be checked before all the others self._codegen_output_tensor(node) elif is_param: self._codegen_param_setup(node) elif is_input: self._codegen_input_tensor(node) elif has_output_aliases: # Special case: internal tensor that aliases an output. self._codegen_output_aliases_tensor(node) elif not is_view and not isinstance(node, IntVarTensor): # Normal, internal tensor that is not a view: point it to the # internal blob of memory assert ( node._attrs["offset"] >= 0 ), f"Non-parameter node '{name}' must have non-negative offset" self.tensor_slice.append(self._tensor_slice_func(node, "blob_ptr")) elif not isinstance(node, IntVarTensor): # Normal view, point it to the same memory as whatever it # aliases self.set_inputs.append(set_value(name, view._attrs["name"])) self._process_dims_for_tensor(node) self._process_src_ops(node) if node.is_jagged(): self._process_jagged_dims(node) def _generate_simple_multistream_ops( self, ) -> List[List[Operator]]: from aitemplate.utils.graph_utils import track_graph_timings # track the sequence time_stats = track_graph_timings(self.graph, {}) # sort all operators by parallel execution order ops_by_order = defaultdict(list) for op, tracking in time_stats.op_parallel_trackers.items(): ops_by_order[tracking.execution_order].append(op) # convert Dict[int, List[Operator]] into List[List[Operator]] max_parallel_ops = multistream_max_mem_parallel_ops() ops = split_simple_multistream_parallel_ops(ops_by_order, max_parallel_ops) # done return ops def _write_simple_multistream_debug_info( self, par_ops_seq: List[List[Operator]] ) -> None: # store simple multistream information to log # render ops into names ops_names = [ [op._attrs["original_name"] for op in par_ops] for par_ops in par_ops_seq ] # write text log_filename_txt = ( Path(self.model_dir) / f"simple_multistream_{self.model_name}.txt" ) with open(log_filename_txt, "w") as log_f: for idx, ops_list in enumerate(ops_names): ops_string = " ".join(ops_list) log_f.write(f"{idx}: {ops_string}\n") _LOGGER.info(f"Wrote text simple multistream info into {log_filename_txt}") # write json log_filename_json = ( Path(self.model_dir) / f"simple_multistream_{self.model_name}.json" ) with open(log_filename_json, "w") as log_f: log_f.write(f"{json.dumps(ops_names)}\n") _LOGGER.info(f"Wrote json simple multistream info into {log_filename_json}") def generate_model(self) -> str: # Disable graph mode on ROCM because the updating operations # are not supported target_has_graph_mode = "true" if self.target.name() == "cuda" else "false" run_impl_mode = multistream_mode() if run_impl_mode == 0: # no multistream mode is used n_additional_streams = 0 n_additional_events = 0 par_function_seq = None par_check_function_seq = [] elif run_impl_mode == 1: # spawn additional streams. Total number of streams will be # n_additional_streams + 1. n_additional_streams = multistream_additional_streams() n_additional_events = n_additional_streams # generate List[List[Operator]] par_ops_seq = self._generate_simple_multistream_ops() for par_ops in par_ops_seq: _LOGGER.info( f"Executing in parallel: {' '.join([op._attrs['original_name'] for op in par_ops])}" ) # convert List[List[Operator]] into List[List[str]] par_function_seq = [ [self._rendered_func_code[op] for op in par_ops] for par_ops in par_ops_seq ] # prepare after-ops checks par_check_function_seq = self._rendered_checks_func_code # dump info to files for further debugging, if needed if is_debug() and self.model_dir is not None: self._write_simple_multistream_debug_info(par_ops_seq) else: raise Exception(f"Unsupported multistream mode ({run_impl_mode})") per_op_profiler_seq = zip( self.func_name_seq, self.func_seq, self._input_shape_seq, self._output_shape_seq, self.func_prop_seq, ) return MODEL_TEMPLATE.render( model_name=self.model_name, function_decl="\n".join(self.func_decl), set_inputs="\n".join(self.set_inputs), tensor_slice="\n".join(self.tensor_slice), tensor_map_set="\n".join(self.tensor_map_set), set_up_constants="\n".join(self.set_up_constants), device_to_device_copies="\n".join(self.device_to_device_copies), set_up_param_dynamic_shapes="\n".join(self.set_up_param_dynamic_shapes), function_seq=self.func_seq, per_op_profiler_seq=per_op_profiler_seq, tensor_decl="\n".join(self.tensor_decl), dim_decl="\n".join(self.dim_decl), jagged_decl="\n".join(self.jagged_decl), function_state="\n".join(self.function_state), target_has_graph_mode=target_has_graph_mode, unique_workspace_size=self.workspace.unique_size, debug_header=self.debug_header, blob_size=self.max_blob_size, workspace_size=self.workspace.total_size(), num_inputs=self.num_inputs, num_outputs=self.num_outputs, param_size=self.max_constant_blob_size + self.extra_owned_constant_size, num_unbound_constants=self.unbound_constant_idx, reset_constants="\n".join(self.reset_constants), profiler_annotation=self.debug_settings.gen_profiler_annotation, n_additional_streams=n_additional_streams, n_additional_events=n_additional_events, par_function_seq=par_function_seq, par_check_function_seq=par_check_function_seq, run_impl_mode=run_impl_mode, ) def _create_set_up_constant_offsets(self) -> str: """ bound_constant_offsets_ stores a map for each constant to the offset in constant buffer, constant_folding_outputs_offsets_ stores a map from each output of constant folding to its offset inside the constant buffer. When the model is loaded, we use these offsets to wire up the constant folding output pointers to the outputs of the constant folder. """ constant_offsets = "" if self.set_up_constant_folding_outputs_offsets: constant_offsets = jinja2.Template( """ constant_folding_outputs_offsets_.resize({{num_constant_folding_outputs}}); {{set_up_statements}} """ ).render( num_constant_folding_outputs=len( self.set_up_constant_folding_outputs_offsets ), set_up_statements="\n".join( self.set_up_constant_folding_outputs_offsets ), ) constant_offsets += "\n" if self.set_up_bound_constant_offsets: constant_offsets += jinja2.Template( """ bound_constant_offsets_.resize({{num_bound_constant_offsets}}); {{set_up_statements}} """ ).render( num_bound_constant_offsets=len(self.set_up_bound_constant_offsets), set_up_statements="\n".join(self.set_up_bound_constant_offsets), ) return constant_offsets def generate_source(self) -> Dict[str, str]: """ Perform the codegen after adding all tensors. The dictionary returned is a map from filename -> contents. """ device_functions_header_name = f"{self.target.name()}_device_functions.h" result = {} result["device_functions-generated.h"] = ( f'#include "{device_functions_header_name}"' ) result["model-generated.h"] = self.generate_model() model_container_src_fname = f"model_container_base{self.target.src_extension()}" model_container_base_src = MODEL_CONTAINER_TEMPLATE.render( num_inputs=self.num_inputs, num_outputs=self.num_outputs, param_size=self.max_constant_blob_size + self.extra_owned_constant_size, set_up_constant_names="\n".join(self.set_up_constant_names), set_up_constant_original_names="\n".join( self.set_up_constant_original_names ), set_up_param_dtypes="\n".join(self.set_up_param_dtypes), set_up_bound_constant_dtypes="\n".join(self.set_up_bound_constant_dtypes), set_up_bound_constant_size="\n".join(self.set_up_bound_constant_size), set_up_output_shapes="\n".join(self.set_up_output_shapes), set_up_param_names="\n".join(self.set_up_param_names), num_constants=self.num_constants, num_bound_constants=self.bound_constant_idx, num_unbound_constants=self.unbound_constant_idx, owned_constants_init=",".join(self.owned_constants_init), set_up_constant_offsets=self._create_set_up_constant_offsets(), set_up_constant_folding_inputs="\n".join( self.set_up_constant_folding_inputs ), # # todo: enable once this feature is fully available # is_windows=is_windows(), ) result[model_container_src_fname] = model_container_base_src return result def add_constant_folding_input(self, tensor: Tensor): """ Handle an input to constant fold Handle an input to constant folding, e.g. a constant that is no longer part of the main graph """ name = tensor._attrs["name"] if tensor._attrs["data"] is None: self.set_up_constant_names.append( set_value( f'unbound_constant_name_to_idx_["{name}"]', self.unbound_constant_idx, ) ) original_name = tensor._attrs.get("original_name") if original_name is not None: self.set_up_constant_original_names.append( set_value( f'constant_name_to_original_name_["{name}"]', f'"{original_name}"', ) ) self._record_param_tensor_info( tensor, self.unbound_constant_idx + self.num_inputs + self.num_outputs, ) self.unbound_constant_idx += 1 self.set_up_constant_folding_inputs.append( f'constant_folding_inputs_.insert("{name}");' ) else: self._add_owned_constant(tensor) self._codegen_bound_constant(tensor) self.set_up_constant_folding_inputs.append( f'constant_folding_optional_inputs_.insert("{name}");' ) self._process_dims_for_tensor(tensor) def append_all_tensors(self) -> None: if self.additional_unbound_constants is not None: for tensor in self.additional_unbound_constants: self.add_constant_folding_input(tensor) self.extra_owned_constant_size += tensor.size_bytes(alignment=64) for tensor in self.graph: if tensor._attrs["is_param"] and tensor._attrs["offset"] is not None: # Make sure we leave room for the tensors that constant folding # needs. These have been excluded from the final graph, so # the memory planning pass will not have known about them. tensor._attrs["offset"] += self.extra_owned_constant_size self.append_tensor(tensor) _DEBUG_SETTINGS = AITDebugSettings()
[docs]def gen_library_src( # noqa: C901 sorted_graph: List[Tensor], max_blob_size: int, max_constant_blob_size: int, workspace: Workspace, workdir: str, output_tensors: List[Tensor], model_name: str = "", debug_settings: AITDebugSettings = _DEBUG_SETTINGS, additional_unbound_constants: Optional[List[Tensor]] = None, ) -> List[Tuple[str, str]]: """Generate model driver source code files for the given graph Parameters ---------- sorted_graph : List[Tensor] The network after running toposort transformation max_blob_size : int Total memory for input/output tensor and intermediate results, calculated by memory planning transformation workspace : Workspace Workspace sizes, computed by memory planning workdir : str Target directory for generated C++ source code files model_name : str, optional Sub working directory in the workdir for the given model, by default "" debug_settings : AITDebugSettings specify debug settings such as where to dump AITemplate model Python file, etc. Returns ------- List[Tuple[str, str]] List of tuple (source file path, object file path) """ def to_obj_name(name: str): name, _ = os.path.splitext(name) return f"{name}.obj" prefix = os.path.join(workdir, model_name) constants_fname = os.path.join(prefix, "constants.bin") constants_data_file = open(constants_fname, "wb") model_container_generator = ModelContainerGenerator( max_blob_size, max_constant_blob_size, workspace, constants_data_file, sorted_graph, output_tensors, additional_unbound_constants=additional_unbound_constants, debug_settings=debug_settings, model_dir=prefix, ) model_container_generator.append_all_tensors() constants_data_file.close() files = model_container_generator.generate_source() to_build = [(constants_fname, to_obj_name(constants_fname))] for fname, contents in files.items(): fname_full = os.path.join(prefix, fname) with open(fname_full, "w") as fo: fo.write(contents) if not fname_full.endswith(".h"): to_build.append((fname_full, to_obj_name(fname_full))) # Copy over static csrc/headers sources = model_container_generator.target.copy_headers_and_csrc_to_workdir(prefix) for fname in sources: to_build.append((fname, to_obj_name(fname))) _LOGGER.info(f"generated {len(to_build)} library srcs") return to_build