/ DBT

Running Models in Parallel with dbt-trino

dbt trino에서 파티션이 있는 모델을 병렬 실행하기 위한 매크로 수정 방법

dbt-trino

dbt-trino를 이용하면, dbt를 통해 trino 쿼리를 실행할 수 있습니다.

파티션 단위 모델링

일반적으로 dbt-trino를 사용할 때, 모델 작성을 파티션 단위로 하게 됩니다. 예를 들면 아래와 같습니다.


-- models/model_A
SELECT column_a
    , column_b
    , column_c
    , date '{{ var("date") }}' AS partition_date
FROM {{ ref("model_B" )}}
WHERE partition_date = date '{{ var("date") }}'

위 model_A는 model_B로부터 특정 파티션(logical_date)만을 참조하여 model_A에 적재합니다. 1개 파티션만을 참조하고, 1개 파티션에만 적재합니다.

동시 실행 불가능

dbt-trino에서는 위와 같이 파티션 단위의 모델을 작성한 경우 여러 개의 파티션에 대해서 동시에 모델 적재(dbt run/build)를 실행할 수 없습니다. 이는 dbt-trino가 view를 이용하여 insert를 실행하기 때문입니다.

-- dbt를 실행하면, dbt-trino는 먼저 적재하려는 데이터에 대응하는 view를 생성한다
create or replace view
    "hive"."default"."model_A_dbt_tmp"
  security definer
  as
SELECT column_a
    , column_b
    , column_c
    , date '2024-06-01' AS partition_date
FROM "hive"."default"."model_B"
WHERE partition_date = date '2024-06-01'
;

-- 위에서 생성된 view를 적재 대상 테이블에 insert한다
insert into "hive"."default"."model_A" ("column_a", "column_b", "column_c", "partition_date")
    (
        select "column_a", "column_b", "column_c", "partition_date"
        from "hive"."default"."model_A_dbt_tmp"
    )

dbt-trino는 적재 대상 테이블(model_A)에 적재하기 위한 데이터를 view(model_A_dbt_tmp)로 먼저 생성하고, 해당 view를 테이블에 insert into로 적재합니다.

이 과정에서 중간에 사용되는 view의 이름이 {table_name}_dbt_tmp로 고정되어있습니다. 이 부분이 문제가 됩니다.

만약 아래처럼 3개 모델을 동시에 실행한다고 가정해보겠습니다.

dbt run -m model_A --vars "{'date': '2024-06-01'}"
dbt run -m model_A --vars "{'date': '2024-06-02'}"
dbt run -m model_A --vars "{'date': '2024-06-03'}"

# 3개의 커맨드가 모두 동일한 view_name을 사용함 ("hive"."default"."model_A__dbt_tmp")

동시에 위 3개 명령을 실행하게 되면, model_A_dbt_tmp view는 create or replace에 의해 마지막 실행에 해당하는 데이터(2024-06-03)만 담고 있게 됩니다.
따라서 insert into가 실행되어도, 2024-06-01, 2024-06-02에 해당하는 데이터에 대응하는 view가 없기 때문에 해당 파티션은 적재할 수 없습니다.

해결책: temp view 이름을 변경하자

중간에 생성하는 temp view를 파티션 의존적으로 변경하면 이를 해결할 수 있습니다.

dbt-core v1.7.5 기준으로, temp view 이름을 생성하는 부분은 아래와 같습니다.(incremental(dbt-core))

macros/materializations/incremental.sql

{% materialization incremental, default -%}

  -- relations
  {%- set existing_relation = load_cached_relation(this) -%}
  {%- set target_relation = this.incorporate(type='table') -%}
  {%- set temp_relation = make_temp_relation(target_relation)-%}
  {%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
  {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
  {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

dbt-trino v1.7.0에서는 아래와 같은 매크로로 오버라이딩되어있습니다.(incremental(dbt-trino))


{% materialization incremental, adapter='trino', supported_languages=['sql'] -%}
  {#-- Set vars --#}
  {%- set full_refresh_mode = (should_full_refresh()) -%}
  {%- set language = model['language'] -%}
  {% set target_relation = this.incorporate(type='table') %}
  {% set existing_relation = load_relation(this) %}

  {#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
  {%- set unique_key = config.get('unique_key') -%}
  {% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
  {% set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) %}
  {% set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) %}

tmp_relation이 view name으로 사용되는데, make_temp_relation은 아래와 같습니다.(make_temp_relation)

-- dbt-core

{% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %}
  {{ return(adapter.dispatch('make_temp_relation', 'dbt')(base_relation, suffix)) }}
{% endmacro %}


adapter라는 처음보는 객체가 등장했습니다. adapter에 대한 설명을 읽어보면, adapter는 dbt가 접근하는 데이터베이스에 따라 동작을 다르게 하기 위해 만들어진 인터페이스입니다. 여기서 adapter.disaptch는 입력받은 macro_name(make_temp_relation)을 실행할 때, 현재 dbt가 접근 중인 데이터베이스에 해당하는 매크로를 반환해줍니다. trino용 매크로는 trino__make_temp_relation으로 작성되어 있어야하고, 만약 작성된 매크로가 없다면 default__make_temp_relation를 반환합니다.

trino__make_temp_relation는 존재하지 않으므로, default__make_temp_relation를 사용합니다.(default__make_temp_relation)


{% macro default__make_temp_relation(base_relation, suffix) %}
    {%- set temp_identifier = base_relation.identifier ~ suffix -%}
    {%- set temp_relation = base_relation.incorporate(
                                path={"identifier": temp_identifier}) -%}

    {{ return(temp_relation) }}
{% endmacro %}

결론적으로, 적재를 위해 생성되는 view name을 결정하는 부분은 make_temp_relation의 suffix입니다. suffix가 기본값이 __dbt_tmp이기 때문에 항상 같은 view name을 생성합니다. 그렇다면 make_temp_realtion를 오버라이딩해보겠습니다. dispatch에 의해 default 매크로를 호출할 때 사용하는 suffix를 수정합니다.


{% macro make_temp_relation(base_relation, suffix='__dbt_tmp') %}
    {%- set ymd = var('date') | replace('-', '') -%} {# date에서 -를 제거함 #}
    {%- set suffix = suffix + '_' + ymd -%} {# 기존 suffix에 ymd를 붙임 #}
  {{ return(adapter.dispatch('make_temp_relation', 'dbt')(base_relation, suffix)) }}
{% endmacro %}

make_temp_relation에서 suffix로 전달되는 값을 변경했습니다. 기존에는 __dbt_tmp로 고정되어있었으나, 위 매크로에서는 dbt 실행 시마다 입력되는 var('date')에 따라 suffix 값이 변경됩니다.

따라서 입력받은 vars 값에 따라 각기 다른 temp view를 생성하게 됩니다.

dbt run -m model_A --vars "{'date': '2024-06-01'}"
# view_name: "hive"."default"."model_A__dbt_tmp_20240601"
---
dbt run -m model_A --vars "{'date': '2024-06-02'}"
# view_name: "hive"."default"."model_A__dbt_tmp_20240602"
---
dbt run -m model_A --vars "{'date': '2024-06-03'}"
# view_name: "hive"."default"."model_A__dbt_tmp_20240603"

1개 모델 테이블에 대해서 여러개의 dbt run을 실행해도 각각 서로 다른 view를 생성하므로 병렬 실행이 가능해집니다.

위에서 정리한 macro들간의 관계를 정리하면 아래 그림과 같습니다.

tmp_relation

[참고]