omniverse/replicator

옴니버스 레플리케이터 예제 11탄

luke12 2022. 6. 1. 02:49

Custom Writer

 

Custom Writer — Omniverse Extensions documentation

To implement a custom writer we will need to implement the class Writer. For the specifics of the implementation, a detailed description is below. End to end example Following the set up steps in Setting up the Script Editor, you can copy the script below

docs.omniverse.nvidia.com

커스텀 작성기(writer)

학습 목표

레플리케이터는 레이블이 지정된 출력을 제공하는 기본 작성기와 함께 제공됩니다. 기본 작성기가 일반적이지만 레플리케이터는 사용자 정의 출력 형식의 요구 사항을 해결하기 위해 커스텀 작성기를 작성할 수 있는 방법을 제공합니다. 이 예제은 커스텀 작성기를 만드는 과정을 보여줍니다.

커스텀 작성기 만들기

사용자 지정 작성기를 구현하려면 Writer 클래스를 구현해야 합니다. 구현에 대한 자세한 설명은 다음과 같습니다.

작성기를 구현

사용자 지정 Write를 만들려면 먼저 API 설명서에 자세히 나와 있는 Writer 클래스를 구현해야 합니다.

이 예에서는 원하는 클래스에 대해서만 바운딩 상자 정보를 작성하는 2D 바운딩 상자 작성기를 만듭니다. 이 경우 worker의 사용 사례에 따라 관심 있는 클래스가 다를 수 있습니다.

먼저 작성기를 초기화합니다. 이 경우 필수 매개 변수는 출력 디렉터리뿐입니다. 여기서 annotators의 사용에 주목하십시오. 주석은 복제자가 작성한 데이터 원본 주석의 유형입니다.

def __init__(
    self,
    output_dir,
    rgb: bool = True,
    bounding_box_2d_tight: bool = False,
    image_output_format="png",
):
    self._output_dir = output_dir
    self._backend = BackendDispatch({"paths": {"out_dir": output_dir}})
    self._frame_id = 0
    self._image_output_format = image_output_format

    self.annotators = []

    # RGB
    if rgb:
        self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))

    # Bounding Box 2D
    if bounding_box_2d_tight:
        self.annotators.append(AnnotatorRegistry.get_annotator("bounding_box_2d_tight",
                                                            init_params={"semanticTypes": ["class"]}))

그러면 도우미 기능이 있습니다. 여기서 경계 상자가 최소 크기 기준을 충족하는지 확인합니다.

def check_bbox_area(self, bbox_data, size_limit):
    length = abs(bbox_data['x_min'] - bbox_data['x_max'])
    width = abs(bbox_data['y_min'] - bbox_data['y_max'])

    area = length * width
    if area > size_limit:
        return True
    else:
        return False

마지막으로 가장 중요한 기능인 write가 있습니다. 이 경우 에서는 data 사전이 rgb이고 bounding_box_2d_tight인지 확인합니다. 여기에 있는 경우 경계 상자 정보와 레이블을 추출합니다. 그런 다음 각 레이블을 살펴보고 레이블이 worker일 경우 크기를 확인한 후 코코와 같은 형식으로 데이터를 작성합니다.

def write(self, data):
    if "rgb" in data and "bounding_box_2d_tight" in data:
        bbox_data = data["bounding_box_2d_tight"]["data"]
        id_to_labels = data["bounding_box_2d_tight"]["info"]["idToLabels"]

        for id, labels in id_to_labels.items():
            id = int(id)

            if 'worker' in labels:

                target_bbox_data = {'x_min': bbox_data['x_min'], 'y_min': bbox_data['y_min'],
                                    'x_max': bbox_data['x_max'], 'y_max': bbox_data['y_max']}

                if self.check_bbox_area(target_bbox_data, 0.5):
                    width = int(abs(target_bbox_data["x_max"][0] - target_bbox_data["x_min"][0]))
                    height = int(abs(target_bbox_data["y_max"][0] - target_bbox_data["y_min"][0]))

                    if width != 2147483647 and height != 2147483647:
                        filepath = f"rgb_{self._frame_id}.{self._image_output_format}"
                        self._backend.write_image(filepath, data["rgb"])

                        bbox_filepath = f"bbox_{self._frame_id}.json"

                        coco_bbox_data = {"x": int(target_bbox_data["x_max"][0]),
                                        "y": int(target_bbox_data["y_max"][0]),
                                        "width": width,
                                        "height": height}

                        buf = io.BytesIO()
                        buf.write(json.dumps(coco_bbox_data).encode())
                        self._backend.write_blob(bbox_filepath, buf.getvalue())

다음은 구현된 전체 코드입니다.

import time
import asyncio
import json
import io

import omni.kit
import omni.usd
import omni.replicator.core as rep

from omni.replicator import Writer, AnnotatorRegistry, BackendDispatch

class WorkerWriter(Writer):
    def __init__(
        self,
        output_dir,
        rgb: bool = True,
        bounding_box_2d_tight: bool = False,
        image_output_format="png",
    ):
        self._output_dir = output_dir
        self._backend = BackendDispatch({"paths": {"out_dir": output_dir}})
        self._frame_id = 0
        self._image_output_format = image_output_format

        self.annotators = []

        # RGB
        if rgb:
            self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))

        # Bounding Box 2D
        if bounding_box_2d_tight:
            self.annotators.append(AnnotatorRegistry.get_annotator("bounding_box_2d_tight",
                                                                init_params={"semanticTypes": ["class"]}))

    def check_bbox_area(self, bbox_data, size_limit):
        length = abs(bbox_data['x_min'] - bbox_data['x_max'])
        width = abs(bbox_data['y_min'] - bbox_data['y_max'])

        area = length * width
        if area > size_limit:
            return True
        else:
            return False

    def write(self, data):
        if "rgb" in data and "bounding_box_2d_tight" in data:
            bbox_data = data["bounding_box_2d_tight"]["data"]
            id_to_labels = data["bounding_box_2d_tight"]["info"]["idToLabels"]

            for id, labels in id_to_labels.items():
                id = int(id)

                if 'worker' in labels:

                    target_bbox_data = {'x_min': bbox_data['x_min'], 'y_min': bbox_data['y_min'],
                                        'x_max': bbox_data['x_max'], 'y_max': bbox_data['y_max']}

                    if self.check_bbox_area(target_bbox_data, 0.5):
                        width = int(abs(target_bbox_data["x_max"][0] - target_bbox_data["x_min"][0]))
                        height = int(abs(target_bbox_data["y_max"][0] - target_bbox_data["y_min"][0]))

                        if width != 2147483647 and height != 2147483647:
                            filepath = f"rgb_{self._frame_id}.{self._image_output_format}"
                            self._backend.write_image(filepath, data["rgb"])

                            bbox_filepath = f"bbox_{self._frame_id}.json"

                            coco_bbox_data = {"x": int(target_bbox_data["x_max"][0]),
                                            "y": int(target_bbox_data["y_max"][0]),
                                            "width": width,
                                            "height": height}

                            buf = io.BytesIO()
                            buf.write(json.dumps(coco_bbox_data).encode())
                            self._backend.write_blob(bbox_filepath, buf.getvalue())

        self._frame_id += 1

처음부터 끝까지의 예제

스크립트 편집기 설정의 설정 단계에 따라 아래 스크립트를 편집기에서 복사할 수 있습니다. 편집기에서 스크립트를 실행한 후 레플리케이터 실행 및 미리보기에 따라 레플리케이터를 실행합니다. 스크립트의 랜덤화에 대한 자세한 내용은 내장된 작성기를 사용하여 기존 3D 자산의 모양, 배치 및 방향을 랜덤화하십시오. 작성기의 더 많은 예를 보려면 레플리케이터 스크립트 폴더에서 BasicWriter 및 기타 작성기를 정의하는 스크립트를 확인할 수 있습니다. 해당 폴더를 찾으려면 레플리케이터용 스크립트에 표시된 단계를 수행하십시오.

import time
import asyncio
import json
import io

import omni.kit
import omni.usd
import omni.replicator.core as rep

from omni.replicator import Writer, AnnotatorRegistry, BackendDispatch


WORKER = 'omniverse://localhost/NVIDIA/Assets/Characters/Reallusion/Worker/Worker.usd'
PROPS = 'omniverse://localhost/NVIDIA/Assets/Vegetation/Shrub'
ENVS = 'omniverse://localhost/NVIDIA/Assets/Scenes/Templates/Outdoor/Puddles.usd'
SURFACE = 'omniverse://localhost/NVIDIA/Assets/Scenes/Templates/Basic/display_riser.usd'


class WorkerWriter(Writer):
    def __init__(
        self,
        output_dir,
        rgb: bool = True,
        bounding_box_2d_tight: bool = False,
        image_output_format="png",
    ):
        self._output_dir = output_dir
        self._backend = BackendDispatch({"paths": {"out_dir": output_dir}})
        self._frame_id = 0
        self._image_output_format = image_output_format

        self.annotators = []

        # RGB
        if rgb:
            self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))

        # Bounding Box 2D
        if bounding_box_2d_tight:
            self.annotators.append(AnnotatorRegistry.get_annotator("bounding_box_2d_tight",
                                                                init_params={"semanticTypes": ["class"]}))

    def check_bbox_area(self, bbox_data, size_limit):
        length = abs(bbox_data['x_min'] - bbox_data['x_max'])
        width = abs(bbox_data['y_min'] - bbox_data['y_max'])

        area = length * width
        if area > size_limit:
            return True
        else:
            return False

    def write(self, data):
        if "rgb" in data and "bounding_box_2d_tight" in data:
            bbox_data = data["bounding_box_2d_tight"]["data"]
            id_to_labels = data["bounding_box_2d_tight"]["info"]["idToLabels"]

            for id, labels in id_to_labels.items():
                id = int(id)

                if 'worker' in labels:

                    target_bbox_data = {'x_min': bbox_data['x_min'], 'y_min': bbox_data['y_min'],
                                        'x_max': bbox_data['x_max'], 'y_max': bbox_data['y_max']}

                    if self.check_bbox_area(target_bbox_data, 0.5):
                        width = int(abs(target_bbox_data["x_max"][0] - target_bbox_data["x_min"][0]))
                        height = int(abs(target_bbox_data["y_max"][0] - target_bbox_data["y_min"][0]))

                        if width != 2147483647 and height != 2147483647:
                            filepath = f"rgb_{self._frame_id}.{self._image_output_format}"
                            self._backend.write_image(filepath, data["rgb"])

                            bbox_filepath = f"bbox_{self._frame_id}.json"

                            coco_bbox_data = {"x": int(target_bbox_data["x_max"][0]),
                                            "y": int(target_bbox_data["y_max"][0]),
                                            "width": width,
                                            "height": height}

                            buf = io.BytesIO()
                            buf.write(json.dumps(coco_bbox_data).encode())
                            self._backend.write_blob(bbox_filepath, buf.getvalue())

        self._frame_id += 1


rep.WriterRegistry.register(WorkerWriter)

with  rep.new_layer():

    def env_props(size=50):
        instances = rep.randomizer.instantiate(rep.utils.get_usd_files(PROPS), size=size, mode='point_instance')
        with instances:
            rep.modify.pose(
                position=rep.distribution.uniform((-500, 0, -500), (500, 0, 500)),
                rotation=rep.distribution.uniform((-90, -180, 0), (-90, 180, 0)),
            )
        return instances.node


    def worker():
        worker = rep.create.from_usd(WORKER, semantics=[('class', 'worker')])

        with worker:
            rep.modify.semantics([('class', 'worker')])
            rep.modify.pose(
                position=rep.distribution.uniform((-500, 0, -500), (500, 0, 500)),
                rotation=rep.distribution.uniform((-90, -45, 0), (-90, 45, 0)),
            )
        return worker


    rep.randomizer.register(env_props)
    rep.randomizer.register(worker)

    # Setup the static elements
    env = rep.create.from_usd(ENVS)
    table = rep.create.from_usd(SURFACE)

    # Setup camera and attach it to render product
    camera = rep.create.camera(
        focus_distance=800,
        f_stop=0.5
    )

    # Setup randomization
    with rep.trigger.on_frame(num_frames=10):
        rep.randomizer.env_props(10)
        rep.randomizer.worker()
        with camera:
            rep.modify.pose(position=rep.distribution.uniform((-500, 200, 1000), (500, 500, 1500)), look_at=table)

    render_product = rep.create.render_product(camera, resolution=(1024, 1024))

    # Initialize and attach writer
    # writer = rep.WriterRegistry.get("OmniWriter")
    writer = rep.WriterRegistry.get("WorkerWriter")
    out_dir = "custom_writer_output"
    writer.initialize(output_dir=out_dir, rgb=True, bounding_box_2d_tight=True)
    writer.attach([render_product])