123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- import configparser
- # CONFIG, get the parameters
- config = configparser.ConfigParser()
- config.read('dwh.cfg')
- access_key = config.get("AWS", "ACCESS_KEY")
- secret_key = config.get("AWS", "SECRET_KEY")
- log_data_source = config.get("S3", "LOG_DATA")
- log_jsonpath = config.get("S3", "LOG_JSONPATH")
- song_data_source = config.get("S3", "SONG_DATA")
- song_jsonpath = config.get("S3", "SONG_JSONPATH")
- # Queries to drop tables
- staging_events_table_drop = "DROP TABLE IF EXISTS staging_events_table;"
- staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs_table;"
- songplay_table_drop = "DROP TABLE IF EXISTS songplays;"
- user_table_drop = "DROP TABLE IF EXISTS users;"
- song_table_drop = "DROP TABLE IF EXISTS songs;"
- artist_table_drop = "DROP TABLE IF EXISTS artists"
- time_table_drop = "DROP TABLE IF EXISTS time;"
- # Create staging tables to be used to copy data to
- # Columns of staging_events_table are identical as those from log data files
- # Data is distributed across the cloud using the column "user_id"
- staging_events_table_create= ("""
- CREATE TABLE IF NOT EXISTS staging_events_table (
- artist VARCHAR,
- auth VARCHAR,
- first_name VARCHAR,
- gender VARCHAR,
- item_in_session INTEGER,
- last_name VARCHAR,
- length DECIMAL(9,5),
- level VARCHAR,
- location VARCHAR,
- method VARCHAR,
- page VARCHAR,
- registration BIGINT,
- session_id INTEGER,
- song VARCHAR,
- status INTEGER,
- ts BIGINT sortkey,
- user_agent VARCHAR,
- user_id INTEGER distkey
- );
- """)
- # Columns of staging_songs_table are identical as those from song data files
- # Data is distributed using the column "artist_id"
- staging_songs_table_create = ("""
- CREATE TABLE IF NOT EXISTS staging_songs_table(
- num_songs BIGINT,
- artist_id VARCHAR distkey,
- artist_latitude DECIMAL(9,5),
- artist_longitude DECIMAL(9,5),
- artist_location VARCHAR,
- artist_name VARCHAR,
- song_id VARCHAR sortkey,
- title VARCHAR,
- duration DECIMAL(9,5),
- year INTEGER
- );
- """)
- # Create tables for the final results
- # The fact table of the schema
- # The data is distributed across the cloud using "user_id"
- # The rows are sorted using start_time
- songplay_table_create = ("""
- CREATE TABLE IF NOT EXISTS songplays (
- songplay_id BIGINT IDENTITY(0,1) NOT NULL PRIMARY KEY,
- start_time TIMESTAMP NOT NULL sortkey,
- user_id INTEGER NOT NULL distkey,
- level VARCHAR,
- song_id VARCHAR NOT NULL,
- artist_id VARCHAR NOT NULL,
- session_id INTEGER NOT NULL,
- location VARCHAR,
- user_agent VARCHAR
- );
- """)
- # Dimension table
- # The table uses "all" distribution style
- user_table_create = ("""
- CREATE TABLE IF NOT EXISTS users (
- user_id INTEGER NOT NULL PRIMARY KEY sortkey,
- first_name VARCHAR,
- last_name VARCHAR,
- gender VARCHAR,
- level VARCHAR
- ) diststyle all;
- """)
- # Dimension table
- # The rows are distributed across the cloud using the column "artist_id"
- song_table_create = ("""
- CREATE TABLE IF NOT EXISTS songs (
- song_id VARCHAR NOT NULL PRIMARY KEY sortkey,
- title VARCHAR,
- artist_id VARCHAR NOT NULL distkey,
- year INTEGER,
- duration DECIMAL(9,5)
- );
- """)
- # Dimension table
- # The table uses "all" distribution style
- artist_table_create = ("""
- CREATE TABLE IF NOT EXISTS artists (
- artist_id VARCHAR NOT NULL PRIMARY KEY sortkey,
- name VARCHAR,
- location VARCHAR,
- latitude DECIMAL(9,5),
- longitude DECIMAL(9,5))
- diststyle all;
- """)
- # Dimension table
- # The table uses "all" distribution style
- time_table_create = ("""
- CREATE TABLE IF NOT EXISTS time(
- start_time TIMESTAMP NOT NULL PRIMARY KEY sortkey,
- hour INTEGER,
- day INTEGER,
- week INTEGER,
- month INTEGER,
- year INTEGER,
- weekday INTEGER)
- diststyle all;
- """)
- # Copy data to the staging tables from existing files
- # Copy from log data files to staging_events_table
- staging_events_copy = ("""
- COPY staging_events_table
- FROM {}
- access_key_id {}
- secret_access_key {}
- json {}
- region 'us-west-2';
- """).format(log_data_source, access_key, secret_key, log_jsonpath)
- # Copy from song data files to staging_songs_table
- staging_songs_copy = ("""
- COPY staging_songs_table
- FROM {}
- access_key_id {}
- secret_access_key {}
- json {}
- region 'us-west-2';
- """).format(song_data_source, access_key, secret_key, song_jsonpath)
- # Select data from staging tables
- # Insert data to the final tables
- # Join staging_songs_table and staging_events_table to get the "song_id" and "artist_id"
- # Insert the rest of data from staging_events_table into songplays table
- songplay_table_insert = ("""
- INSERT INTO songplays (start_time, user_id, level,
- song_id, artist_id, session_id, location, user_agent)
- SELECT DISTINCT dateadd(hr, EXTRACT(hr FROM se.tm),
- dateadd(day, EXTRACT(day FROM se.tm),
- dateadd(month, EXTRACT(month FROM se.tm),
- dateadd(year, EXTRACT(year FROM se.tm) -
- EXTRACT(year FROM 'epoch'::timestamp), 'epoch')))),
- se.user_id, se.level, ss.song_id, ss.artist_id,
- se.session_id, se.location, se.user_agent
- FROM (
- SELECT artist_id, artist_name, song_id, title, duration
- FROM staging_songs_table
- WHERE artist_id IS NOT NULL
- AND song_id IS NOT NULL
- ) AS ss
- JOIN (
- SELECT dateadd(ms,ts,'epoch') AS tm, user_id, level, song, artist,
- length, session_id, location, user_agent
- FROM staging_events_table
- WHERE page = 'NextSong'
- AND ts IS NOT NULL
- AND user_id IS NOT NULL
- ) AS se
- ON ss.artist_name = se.artist
- AND ss.title = se.song
- AND ss.duration = se.length
- ;
- """)
- # Select data from staging_events_table
- # Insert into users table
- # Limit "user_id" to be NOT NULL
- # Assuming the record with larger "ts" is newer
- # Need to communicate with the team about the actual details in real life
- user_table_insert = ("""
- INSERT INTO users (user_id, first_name, last_name, gender, level)
- SELECT DISTINCT id_only_table.user_id, first_name, last_name, gender, level
- FROM
- (
- SELECT user_id, MAX(ts) AS mts
- FROM staging_events_table
- WHERE page = 'NextSong'
- AND user_id IS NOT NULL
- AND ts IS NOT NULL
- GROUP BY user_id
- ) AS id_only_table
- JOIN
- (
- SELECT user_id, first_name, last_name, gender, level, ts
- FROM staging_events_table
- WHERE page = 'NextSong'
- AND user_id IS NOT NULL
- AND ts IS NOT NULL
- ) AS all_columns_table
- ON id_only_table.user_id = all_columns_table.user_id
- AND id_only_table.mts = all_columns_table.ts
- ;
- """)
- # Select data from staging_songs_table
- # Insert into songs table
- # Limit "song_id" and "artist_id" to be NOT NULL
- # Assuming "song_id" and "num_songs" uniquely identify each record
- # And the record with larger "num_songs" is newer
- # Need to communicate with the team in real life
- song_table_insert = ("""
- INSERT INTO songs (song_id, title, artist_id, year, duration)
- SELECT DISTINCT id_only_table.song_id, title, artist_id, year, duration
- FROM
- (
- SELECT song_id, MAX(num_songs) AS m_num
- FROM staging_songs_table
- WHERE song_id IS NOT NULL
- AND artist_id IS NOT NULL
- AND num_songs IS NOT NULL
- GROUP BY song_id
- ) AS id_only_table
- JOIN
- (
- SELECT song_id, title, artist_id, year, duration, num_songs
- FROM staging_songs_table
- WHERE song_id IS NOT NULL
- AND artist_id IS NOT NULL
- AND num_songs IS NOT NULL
- ) all_columns_table
- ON id_only_table.song_id = all_columns_table.song_id
- AND id_only_table.m_num = all_columns_table.num_songs
- ;
- """)
- # Select data from stagging_songs_table
- # Insert into artists table
- # Limit the "song_id" and "artist_id" to be NOT NULL
- # Assuming "song_id" and "num_songs" uniquely identify each record
- # And record with larger "num_songs" is newer
- # Actual communication needed
- artist_table_insert = ("""
- INSERT INTO artists (artist_id, name, location, latitude, longitude)
- SELECT DISTINCT artist_id,
- artist_name,
- artist_location,
- artist_latitude,
- artist_longitude
- FROM
- (
- SELECT song_id, MAX(num_songs) AS m_num
- FROM staging_songs_table
- WHERE song_id IS NOT NULL
- AND artist_id IS NOT NULL
- AND num_songs IS NOT NULL
- GROUP BY song_id
- ) AS id_only_table
- JOIN
- (
- SELECT song_id, num_songs, artist_id, artist_name, artist_location,
- artist_latitude, artist_longitude, year
- FROM staging_songs_table
- WHERE song_id IS NOT NULL
- AND artist_id IS NOT NULL
- AND num_songs IS NOT NULL
- ) AS all_columns_table
- ON id_only_table.song_id = all_columns_table.song_id
- AND id_only_table.m_num = all_columns_table.num_songs
- ;
- """)
- # Select ts (BIGINT) from staging_events_table and limit the data to be NOT NULL
- # Convert ts to TIMESTAMP and truncate the data to date and hour
- # Extract hour, day, week, month, year, weekday part from the truncated timestamp
- # Insert the result in time table
- # For weekdays, Sunday is 0, Monday is 1, Tuesday is 2, and so on
- time_table_insert = ("""
- INSERT INTO time (start_time, hour, day, week, month, year, weekday)
- SELECT trunc_time,
- EXTRACT(hour FROM trunc_time),
- EXTRACT(day FROM trunc_time),
- EXTRACT(week FROM trunc_time),
- EXTRACT(month FROM trunc_time),
- EXTRACT(year FROM trunc_time),
- EXTRACT(weekday FROM trunc_time)
- FROM
- (
- SELECT DISTINCT dateadd(hr, EXTRACT(hr FROM tm),
- dateadd(day, EXTRACT(day FROM tm),
- dateadd(month, EXTRACT(month FROM tm),
- dateadd(year, EXTRACT(year FROM tm) -
- EXTRACT(year FROM 'epoch'::timestamp), 'epoch'))))
- AS trunc_time
- FROM
- (
- SELECT dateadd(ms, ts, 'epoch') AS tm
- FROM staging_events_table
- WHERE page = 'NextSong'
- AND ts IS NOT NULL
- )
- );
- """)
- # QUERY LISTS
- create_table_queries = [staging_events_table_create, staging_songs_table_create, \
- songplay_table_create, user_table_create, song_table_create, \
- artist_table_create, time_table_create]
- drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, \
- songplay_table_drop, user_table_drop, song_table_drop, \
- artist_table_drop, time_table_drop]
- copy_table_queries = [staging_events_copy, staging_songs_copy]
- insert_table_queries = [songplay_table_insert, user_table_insert, \
- song_table_insert, artist_table_insert, time_table_insert]
|