-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathactivity.sql
92 lines (72 loc) · 2.6 KB
/
activity.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
{% macro activity(
relationship,
activity_name,
included_columns=var("included_columns", var("dbt_activity_schema",
{}).get("included_columns", dbt_activity_schema.columns().values() | list)),
additional_join_condition="true"
) %}
{{ return(adapter.dispatch("activity", "dbt_activity_schema")(
relationship,
activity_name,
included_columns,
additional_join_condition
)) }}
{% endmacro %}
{% macro default__activity(
relationship,
activity_name,
included_columns,
additional_join_condition
) %}
{# An activity to include in the dataset.
params:
relationship: relationship
The relationship that defines the how the appended activity is joined to
the primary activity.
activity_name: str
The string identifier of the activity in the Activity Stream to join to
the primary activity.
included_columns: List[str]
List of columns to join to the primary activity, defaults to the
`included_columns` vars if it is set, otherwise defaults to the columns
defined in columns.sql.
additional_join_condition: str
A valid sql boolean to condition the join of the appended activity. Can
optionally contain the python f-string placeholders "{primary}" and
"{appended}" in the string; these will be compiled with the correct
aliases.
Eg:
"json_extract({primary}.feature_json, 'dim1')
= "json_extract({appended}.feature_json, 'dim1')"
The "{primary}" and "{appended}" placholders correctly compiled
depending on the cardinatity of the joined activity in the
`appended_activities` list argument to `dataset.sql`.
Compiled:
"json_extract(stream.feature_json, 'dim1')
= "json_extract(stream_3.feature_json, 'dim1')"
Given that the appended activity was 3rd in the `appended_activities`
list argument.
#}
{% set columns = dbt_activity_schema.columns() %}
{# Required for the joins, but not necessarily included in the final result. #}
{% set required_columns = [
columns.activity_id,
columns.activity,
columns.ts,
columns.customer,
columns.activity_occurrence,
columns.activity_repeated_at
] %}
{% for col in included_columns %}
{% if col in required_columns %}
{% do required_columns.remove(col) %}
{% endif %}
{% endfor %}
{% do return(namespace(
name = activity_name,
included_columns = included_columns,
required_columns = required_columns,
relationship = relationship,
additional_join_condition = additional_join_condition
)) %}
{% endmacro %}