Files
mars-elt/dbt/macros/control_tables/control_run_end.sql
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

49 lines
3.2 KiB
SQL

{% macro control_run_end(results) %}
{% if flags.WHICH in ('run', 'seed', 'snapshot', 'build') %}
{{ elog("START macro -> control_run_end", "INFO", 0)}}
{% for result in results %}
{% if result.status != 'skipped' %}
{% set model_name = result.node.name %}
{% set rows_affected = result.adapter_response.get('rows_affected', 0) %}
{% if result.status == 'success' %} {% if rows_affected == 0 %}{% set status = 'W' %}{% else %}{% set status = 'Y' %}{% endif %} {% else %} {% set status = 'N' %} {% endif %}
{% set completed_at = result.timing[-1].completed_at.strftime("%Y-%m-%d %H:%M:%S") %}
{% set table_name = result.node.relation_name %}
{%set full_model_key = result.node.unique_id%}
{%set model = graph.nodes.get(full_model_key)%}
{%if model is not none and (result.node.path.startswith('mopdb') or result.node.path.startswith('rar')) %}
{%set model_tags = model.config.get('tags',[])%}
{%set input_service_name_var = var('input_service_name') %}
{%if 'A_TASK_HISTORY_TARGET_insert' not in model_tags and input_service_name_var == 'RAR'%}
{# Skip the following lines and go straight to the end #}
{{elog("RAR model without a A_TASK_HISTORY_TARGET_insert tag. Skip inserting into A_TASK_HISTORY_TARGET for" ~ model_name, "INFO", 1)}}
{%elif model.config.materialized == 'table_if_rows_exist' %}
{{ elog("Get subprocess_key for model " ~ model_name, "INFO", 1)}}
{% set a_task_history_key = get_task_history_key(model_name) %}
{%if a_task_history_key != null and rows_affected > 0%}
{{ elog("Run macro insert_A_TASK_HISTORY_TARGET for model " ~ model_name, "INFO", 1)}}
{{ insert_A_TASK_HISTORY_TARGET(a_task_history_key, table_name, status, rows_affected) }}
{% do run_query('COMMIT') %}
{% endif %}
{%else%}
{{ elog("Get subprocess_key for model " ~ model_name, "INFO", 1)}}
{% set a_task_history_key = get_task_history_key(model_name) %}
{{ check_null("a_task_history_key", a_task_history_key) }}
{{ elog("Run macro insert_A_TASK_HISTORY_TARGET for model " ~ model_name, "INFO", 1)}}
{{ insert_A_TASK_HISTORY_TARGET(a_task_history_key, table_name, status, rows_affected) }}
{% do run_query('COMMIT') %}
{% endif %}
{%endif%}
{#
{% if status == 'N' %}
{{ update_A_TASK_HISTORY(a_task_history_key, convert_timezone(completed_at,'YYYY-MM-DD HH24:MI:SS','UTC'), 'N') }}
{% endif %}
#}
{% endif %}
{% endfor %}
{{ elog("END macro -> control_run_end", "INFO", 0)}}
{% endif %}
{% endmacro %}