# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Efficient nms.
"""
import itertools
import logging
import os
import re
from collections import OrderedDict
from operator import itemgetter
from typing import List
import jinja2
from aitemplate import backend
from aitemplate.backend import registry
from aitemplate.compiler.base import IntImm, Operator, Tensor
from aitemplate.utils import shape_utils
# pylint: disable=C0103,W0221,W0102,W0223
_LOGGER = logging.getLogger(__name__)
# TODO: change to column last
SHAPE_FUNC_TEMPLATE = jinja2.Template(
"""
{{indent}}{{dtype}}BS = {{x_dim0}};
{{indent}}{{dtype}}NB = {{x_dim1}};
{{indent}}{{dtype}}NC = {{x_dim2}};
{{indent}}{{dtype}}SZ = {{x_dim3}};
{{indent}}{{dtype}}NO = BS;
{{indent}}{{dtype}}CO = {{nmsMaxOut}};
{{indent}}{{dtype}}HO = SZ;
"""
)
EXEC_KEY_TEMPLATE = jinja2.Template(
"""
num_batch == {{x_dim0}} && num_rois == {{x_dim1}} && num_classes == {{x_dim2}}
"""
)
[docs]class efficient_nms(Operator):
r"""
Performs non-maximum suppression (NMS) on the boxes according to their intersection-over-union (IoU).
NMS iteratively removes lower scoring boxes which have an IoU greater than iou_threshold with another (higher scoring) box.
Note: if multiple boxes have the exact same score and satisfy the IoU criterion with respect to a reference box, the selected box is not guaranteed to be the same for different backends.
* :attr:`preNmsTop` identifies the maximum number of boxes to take.
* :attr:`nmsMaxOut` identifies the maximum number of boxes to reserve after the operation.
* :attr:`iouThreshold` identifies the intersection-over-union (IoU) threshold which is used to discards all overlapping boxes with IoU > iouThreshold.
* :attr:`minBoxSize` identifies the minimum box size, if a box has size less than this value, it will be removed before the non-maximum suppression.
Args:
boxes (Tensor[N, 4])): boxes to perform NMS on. They
are expected to be in ``(x1, y1, x2, y2)`` format with ``0 <= x1 < x2`` and
``0 <= y1 < y2``.
scores (Tensor[N]): scores for each one of the boxes
Returns:
Tensor: int64 tensor with the indices of the elements that have been kept
by NMS, sorted in decreasing order of scores
"""
def __init__(
self, preNmsTop=2000, nmsMaxOut=200, iouThreshold=0.5, minBoxSize=0
) -> None:
"""Initializes efficient_nms"""
super().__init__()
self._attrs["op"] = "efficient_nms"
self._attrs["preNmsTop"] = preNmsTop
self._attrs["nmsMaxOut"] = nmsMaxOut
self._attrs["iouThreshold"] = iouThreshold
self._attrs["minBoxSize"] = minBoxSize
self._attrs["has_profiler"] = True
self._attrs["workspace"] = 0
self.exec_key_template = EXEC_KEY_TEMPLATE
self.shape_eval_template = SHAPE_FUNC_TEMPLATE
def _infer_shape(self, x: List[int], w: List[int]):
"""infer the output shape for nms op"""
eval_func = self.shape_eval_template.render(
indent="",
dtype="",
div="//",
nmsMaxOut=self._attrs["nmsMaxOut"],
x_dim0=x[0],
x_dim1=x[1],
x_dim2=x[2],
x_dim3=x[3],
)
output = {}
exec(eval_func, output) # noqa: P204 # noqa: P204
return [int(output["NO"]), int(output["CO"]), int(output["HO"])]
def _infer_shapes(self, x: Tensor, w: Tensor):
"""infer the output shape for nms op"""
x_shape_values = [var._attrs["values"] for var in x._attrs["shape"]]
x_shapes = itertools.product(*x_shape_values)
w_shape = [var._attrs["values"][0] for var in w._attrs["shape"]]
self._attrs["KH"] = w_shape[0]
self._attrs["KW"] = w_shape[1]
# run infershape for each
y_shapes = []
for x_shape in x_shapes:
y_shape = self._infer_shape(x_shape, w_shape)
y_shapes.append(y_shape)
def unique(vector):
return sorted(set(vector))
output_shape = [
shape_utils.gen_int_var(unique([d[0] for d in y_shapes])),
shape_utils.gen_int_var(unique([d[1] for d in y_shapes])),
shape_utils.gen_int_var(unique([d[2] for d in y_shapes])),
]
return output_shape
def __call__(self, boxes: Tensor, scores: Tensor) -> Tensor:
"""Performs shape inference and returns an output tensor."""
self._attrs["inputs"] = [boxes, scores]
self._set_depth()
self._extract_exec_path(boxes)
output_shape = self._infer_shapes(boxes, scores)
x = boxes
num_detections = Tensor(
[output_shape[0], IntImm(1)], dtype="int64", src_ops={self}
)
detection_boxes = Tensor(
output_shape,
src_ops={self},
dtype=x._attrs["dtype"],
)
detection_scores = Tensor(
output_shape[:-1],
src_ops={self},
dtype=x._attrs["dtype"],
)
detection_classes = Tensor(output_shape[:-1], dtype="int64", src_ops={self})
output = (num_detections, detection_boxes, detection_scores, detection_classes)
self._attrs["outputs"] = [
num_detections,
detection_boxes,
detection_scores,
detection_classes,
]
return output
def _get_op_attributes(self):
return {
"iouThreshold": self._attrs["iouThreshold"],
"minBoxSize": self._attrs["minBoxSize"],
"nmsMaxOut": self._attrs["nmsMaxOut"],
"preNmsTop": self._attrs["preNmsTop"],
}
def _gen_exec_key(self, shape):
"""rendering shape info"""
return self.exec_key_template.render(
x_dim0=shape[0],
x_dim1=shape[1] * shape[2],
x_dim2=shape[2],
).replace("\n", "")
def _extract_exec_path(self, x: Tensor):
x_shape_values = [var._attrs["values"] for var in x._attrs["shape"]]
x_shapes = itertools.product(*x_shape_values)
self._attrs["exec_path"] = OrderedDict()
for x_shape in x_shapes:
key = self._gen_exec_key(x_shape)
self._attrs["exec_path"][key] = ""
[docs] def gen_function(self) -> str:
"""call backend functions"""
target = backend.target.Target.current()
func_key = "{target}.{op}.gen_function".format(
target=target.name(), op=self._attrs["op"]
)
func = registry.get(func_key)
return func(self._attrs)
[docs] def gen_profiler(
self, workdir: str = None, dynamic_profiling_strategy=None
) -> None:
target = backend.target.Target.current()
func_key = "{target}.{op}.gen_profiler".format(
target=target.name(), op=self._attrs["op"]
)
func = registry.get(func_key)
return func(self._attrs, workdir)
def _invert_exec_key(self, key):
tmp = re.findall(r"(\d+)", key)
return [int(x) for x in tmp]
def _gen_profile_cmd(self, profiler_prefix, cfg, x_shape):
exe_path = os.path.join(profiler_prefix, cfg)
if not os.access(exe_path, os.X_OK):
raise RuntimeError("Profiler %s is not executable" % exe_path)
cmd = [exe_path]
cmd.append(x_shape[0])
cmd.append(x_shape[1] * x_shape[2])
cmd.append(x_shape[2])
command = [str(x) for x in cmd]
_LOGGER.info("profiling cmd: {}".format(command))
return command
def _profile_single_workload(self, profiler_prefix, exec_key, devices):
runner = backend.profiler_runner.Runner(devices, self._attrs["name"])
cfg = self._attrs["op"]
x_shape = self._invert_exec_key(exec_key)
command = self._gen_profile_cmd(profiler_prefix, cfg, x_shape)
runner.push(cfg, command)
runner.join()
result = runner.pull()
if len(result) == 0:
raise RuntimeError(
"Profile workload: " f"{exec_key}" " failed. " f"Results: {result}."
)
out = min(result, key=itemgetter(1))
workspace = out[1].workspace
return workspace
[docs] def profile(
self,
workdir="./",
devices=None,
dynamic_profiling_strategy=None,
):
"""Profile to compute the NMS Op workspace size."""
if devices is None:
devices = [0]
workloads = list(self._attrs["exec_path"].keys())
profiler_prefix = os.path.join(workdir, "profiler", self._attrs["op"])
for wkl in workloads:
_LOGGER.info(
"Profile: {name}: {wkl}".format(name=self._attrs["name"], wkl=wkl),
)
workspace = self._profile_single_workload(profiler_prefix, wkl, devices)
self._attrs["workspace"] = max(self._attrs["workspace"], workspace)