forked from arnoN7/dbt-incremental-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconso_client_multiple_streams.sql
32 lines (32 loc) · 1.3 KB
/
conso_client_multiple_streams.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{{-
config(
materialized='incremental_stream',
unique_key=['ID', 'SOURCE']
)
-}}
with client_web as (
SELECT ID, 'WEB' as SOURCE, FIRST_NAME, LAST_NAME, BIRTHDATE, LOADED_AT {{ incr_stream.get_stream_metadata_columns() }}
FROM {{incr_stream.stream_ref('add_clients')}}
),
client_retail as (
SELECT ID, 'RETAIL' as SOURCE, FIRST_NAME, LAST_NAME, BIRTHDATE, LOADED_AT {{ incr_stream.get_stream_metadata_columns() }}
FROM {{incr_stream.stream_source('SOURCE_CRM', 'SOURCE_CLIENTS')}}
), client_web_2 as (
SELECT ID, 'WEB2' as SOURCE, FIRST_NAME, LAST_NAME, BIRTHDATE, LOADED_AT {{ incr_stream.get_stream_metadata_columns() }}
FROM {{incr_stream.stream_ref('add_clients_')}}
),
union_clients as (
SELECT * from client_web
UNION
SELECT * FROM client_retail
UNION
SELECT * FROM client_web_2
),
client_with_dup as (
SELECT row_number() OVER (PARTITION BY ID, SOURCE ORDER BY LOADED_AT DESC) as NB_DUP,
ID, SOURCE, FIRST_NAME, LAST_NAME, BIRTHDATE, LOADED_AT {{ incr_stream.get_stream_metadata_columns() }}
FROM union_clients
), clients_without_dup as (
SELECT ID, SOURCE, FIRST_NAME, LAST_NAME, BIRTHDATE, LOADED_AT {{ incr_stream.get_stream_metadata_columns() }} FROM client_with_dup WHERE NB_DUP = 1 ORDER BY LOADED_AT
)
SELECT * FROM clients_without_dup