sql_queries.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import configparser
  2. # CONFIG, get the parameters
  3. config = configparser.ConfigParser()
  4. config.read('dwh.cfg')
  5. access_key = config.get("AWS", "ACCESS_KEY")
  6. secret_key = config.get("AWS", "SECRET_KEY")
  7. log_data_source = config.get("S3", "LOG_DATA")
  8. log_jsonpath = config.get("S3", "LOG_JSONPATH")
  9. song_data_source = config.get("S3", "SONG_DATA")
  10. song_jsonpath = config.get("S3", "SONG_JSONPATH")
  11. # Queries to drop tables
  12. staging_events_table_drop = "DROP TABLE IF EXISTS staging_events_table;"
  13. staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs_table;"
  14. songplay_table_drop = "DROP TABLE IF EXISTS songplays;"
  15. user_table_drop = "DROP TABLE IF EXISTS users;"
  16. song_table_drop = "DROP TABLE IF EXISTS songs;"
  17. artist_table_drop = "DROP TABLE IF EXISTS artists"
  18. time_table_drop = "DROP TABLE IF EXISTS time;"
  19. # Create staging tables to be used to copy data to
  20. # Columns of staging_events_table are identical as those from log data files
  21. # Data is distributed across the cloud using the column "user_id"
  22. staging_events_table_create= ("""
  23. CREATE TABLE IF NOT EXISTS staging_events_table (
  24. artist VARCHAR,
  25. auth VARCHAR,
  26. first_name VARCHAR,
  27. gender VARCHAR,
  28. item_in_session INTEGER,
  29. last_name VARCHAR,
  30. length DECIMAL(9,5),
  31. level VARCHAR,
  32. location VARCHAR,
  33. method VARCHAR,
  34. page VARCHAR,
  35. registration BIGINT,
  36. session_id INTEGER,
  37. song VARCHAR,
  38. status INTEGER,
  39. ts BIGINT sortkey,
  40. user_agent VARCHAR,
  41. user_id INTEGER distkey
  42. );
  43. """)
  44. # Columns of staging_songs_table are identical as those from song data files
  45. # Data is distributed using the column "artist_id"
  46. staging_songs_table_create = ("""
  47. CREATE TABLE IF NOT EXISTS staging_songs_table(
  48. num_songs BIGINT,
  49. artist_id VARCHAR distkey,
  50. artist_latitude DECIMAL(9,5),
  51. artist_longitude DECIMAL(9,5),
  52. artist_location VARCHAR,
  53. artist_name VARCHAR,
  54. song_id VARCHAR sortkey,
  55. title VARCHAR,
  56. duration DECIMAL(9,5),
  57. year INTEGER
  58. );
  59. """)
  60. # Create tables for the final results
  61. # The fact table of the schema
  62. # The data is distributed across the cloud using "user_id"
  63. # The rows are sorted using start_time
  64. songplay_table_create = ("""
  65. CREATE TABLE IF NOT EXISTS songplays (
  66. songplay_id BIGINT IDENTITY(0,1) NOT NULL PRIMARY KEY,
  67. start_time TIMESTAMP NOT NULL sortkey,
  68. user_id INTEGER NOT NULL distkey,
  69. level VARCHAR,
  70. song_id VARCHAR NOT NULL,
  71. artist_id VARCHAR NOT NULL,
  72. session_id INTEGER NOT NULL,
  73. location VARCHAR,
  74. user_agent VARCHAR
  75. );
  76. """)
  77. # Dimension table
  78. # The table uses "all" distribution style
  79. user_table_create = ("""
  80. CREATE TABLE IF NOT EXISTS users (
  81. user_id INTEGER NOT NULL PRIMARY KEY sortkey,
  82. first_name VARCHAR,
  83. last_name VARCHAR,
  84. gender VARCHAR,
  85. level VARCHAR
  86. ) diststyle all;
  87. """)
  88. # Dimension table
  89. # The rows are distributed across the cloud using the column "artist_id"
  90. song_table_create = ("""
  91. CREATE TABLE IF NOT EXISTS songs (
  92. song_id VARCHAR NOT NULL PRIMARY KEY sortkey,
  93. title VARCHAR,
  94. artist_id VARCHAR NOT NULL distkey,
  95. year INTEGER,
  96. duration DECIMAL(9,5)
  97. );
  98. """)
  99. # Dimension table
  100. # The table uses "all" distribution style
  101. artist_table_create = ("""
  102. CREATE TABLE IF NOT EXISTS artists (
  103. artist_id VARCHAR NOT NULL PRIMARY KEY sortkey,
  104. name VARCHAR,
  105. location VARCHAR,
  106. latitude DECIMAL(9,5),
  107. longitude DECIMAL(9,5))
  108. diststyle all;
  109. """)
  110. # Dimension table
  111. # The table uses "all" distribution style
  112. time_table_create = ("""
  113. CREATE TABLE IF NOT EXISTS time(
  114. start_time TIMESTAMP NOT NULL PRIMARY KEY sortkey,
  115. hour INTEGER,
  116. day INTEGER,
  117. week INTEGER,
  118. month INTEGER,
  119. year INTEGER,
  120. weekday INTEGER)
  121. diststyle all;
  122. """)
  123. # Copy data to the staging tables from existing files
  124. # Copy from log data files to staging_events_table
  125. staging_events_copy = ("""
  126. COPY staging_events_table
  127. FROM {}
  128. access_key_id {}
  129. secret_access_key {}
  130. json {}
  131. region 'us-west-2';
  132. """).format(log_data_source, access_key, secret_key, log_jsonpath)
  133. # Copy from song data files to staging_songs_table
  134. staging_songs_copy = ("""
  135. COPY staging_songs_table
  136. FROM {}
  137. access_key_id {}
  138. secret_access_key {}
  139. json {}
  140. region 'us-west-2';
  141. """).format(song_data_source, access_key, secret_key, song_jsonpath)
  142. # Select data from staging tables
  143. # Insert data to the final tables
  144. # Join staging_songs_table and staging_events_table to get the "song_id" and "artist_id"
  145. # Insert the rest of data from staging_events_table into songplays table
  146. songplay_table_insert = ("""
  147. INSERT INTO songplays (start_time, user_id, level,
  148. song_id, artist_id, session_id, location, user_agent)
  149. SELECT DISTINCT dateadd(hr, EXTRACT(hr FROM se.tm),
  150. dateadd(day, EXTRACT(day FROM se.tm),
  151. dateadd(month, EXTRACT(month FROM se.tm),
  152. dateadd(year, EXTRACT(year FROM se.tm) -
  153. EXTRACT(year FROM 'epoch'::timestamp), 'epoch')))),
  154. se.user_id, se.level, ss.song_id, ss.artist_id,
  155. se.session_id, se.location, se.user_agent
  156. FROM (
  157. SELECT artist_id, artist_name, song_id, title, duration
  158. FROM staging_songs_table
  159. WHERE artist_id IS NOT NULL
  160. AND song_id IS NOT NULL
  161. ) AS ss
  162. JOIN (
  163. SELECT dateadd(ms,ts,'epoch') AS tm, user_id, level, song, artist,
  164. length, session_id, location, user_agent
  165. FROM staging_events_table
  166. WHERE page = 'NextSong'
  167. AND ts IS NOT NULL
  168. AND user_id IS NOT NULL
  169. ) AS se
  170. ON ss.artist_name = se.artist
  171. AND ss.title = se.song
  172. AND ss.duration = se.length
  173. ;
  174. """)
  175. # Select data from staging_events_table
  176. # Insert into users table
  177. # Limit "user_id" to be NOT NULL
  178. # Assuming the record with larger "ts" is newer
  179. # Need to communicate with the team about the actual details in real life
  180. user_table_insert = ("""
  181. INSERT INTO users (user_id, first_name, last_name, gender, level)
  182. SELECT DISTINCT id_only_table.user_id, first_name, last_name, gender, level
  183. FROM
  184. (
  185. SELECT user_id, MAX(ts) AS mts
  186. FROM staging_events_table
  187. WHERE page = 'NextSong'
  188. AND user_id IS NOT NULL
  189. AND ts IS NOT NULL
  190. GROUP BY user_id
  191. ) AS id_only_table
  192. JOIN
  193. (
  194. SELECT user_id, first_name, last_name, gender, level, ts
  195. FROM staging_events_table
  196. WHERE page = 'NextSong'
  197. AND user_id IS NOT NULL
  198. AND ts IS NOT NULL
  199. ) AS all_columns_table
  200. ON id_only_table.user_id = all_columns_table.user_id
  201. AND id_only_table.mts = all_columns_table.ts
  202. ;
  203. """)
  204. # Select data from staging_songs_table
  205. # Insert into songs table
  206. # Limit "song_id" and "artist_id" to be NOT NULL
  207. # Assuming "song_id" and "num_songs" uniquely identify each record
  208. # And the record with larger "num_songs" is newer
  209. # Need to communicate with the team in real life
  210. song_table_insert = ("""
  211. INSERT INTO songs (song_id, title, artist_id, year, duration)
  212. SELECT DISTINCT id_only_table.song_id, title, artist_id, year, duration
  213. FROM
  214. (
  215. SELECT song_id, MAX(num_songs) AS m_num
  216. FROM staging_songs_table
  217. WHERE song_id IS NOT NULL
  218. AND artist_id IS NOT NULL
  219. AND num_songs IS NOT NULL
  220. GROUP BY song_id
  221. ) AS id_only_table
  222. JOIN
  223. (
  224. SELECT song_id, title, artist_id, year, duration, num_songs
  225. FROM staging_songs_table
  226. WHERE song_id IS NOT NULL
  227. AND artist_id IS NOT NULL
  228. AND num_songs IS NOT NULL
  229. ) all_columns_table
  230. ON id_only_table.song_id = all_columns_table.song_id
  231. AND id_only_table.m_num = all_columns_table.num_songs
  232. ;
  233. """)
  234. # Select data from stagging_songs_table
  235. # Insert into artists table
  236. # Limit the "song_id" and "artist_id" to be NOT NULL
  237. # Assuming "song_id" and "num_songs" uniquely identify each record
  238. # And record with larger "num_songs" is newer
  239. # Actual communication needed
  240. artist_table_insert = ("""
  241. INSERT INTO artists (artist_id, name, location, latitude, longitude)
  242. SELECT DISTINCT artist_id,
  243. artist_name,
  244. artist_location,
  245. artist_latitude,
  246. artist_longitude
  247. FROM
  248. (
  249. SELECT song_id, MAX(num_songs) AS m_num
  250. FROM staging_songs_table
  251. WHERE song_id IS NOT NULL
  252. AND artist_id IS NOT NULL
  253. AND num_songs IS NOT NULL
  254. GROUP BY song_id
  255. ) AS id_only_table
  256. JOIN
  257. (
  258. SELECT song_id, num_songs, artist_id, artist_name, artist_location,
  259. artist_latitude, artist_longitude, year
  260. FROM staging_songs_table
  261. WHERE song_id IS NOT NULL
  262. AND artist_id IS NOT NULL
  263. AND num_songs IS NOT NULL
  264. ) AS all_columns_table
  265. ON id_only_table.song_id = all_columns_table.song_id
  266. AND id_only_table.m_num = all_columns_table.num_songs
  267. ;
  268. """)
  269. # Select ts (BIGINT) from staging_events_table and limit the data to be NOT NULL
  270. # Convert ts to TIMESTAMP and truncate the data to date and hour
  271. # Extract hour, day, week, month, year, weekday part from the truncated timestamp
  272. # Insert the result in time table
  273. # For weekdays, Sunday is 0, Monday is 1, Tuesday is 2, and so on
  274. time_table_insert = ("""
  275. INSERT INTO time (start_time, hour, day, week, month, year, weekday)
  276. SELECT trunc_time,
  277. EXTRACT(hour FROM trunc_time),
  278. EXTRACT(day FROM trunc_time),
  279. EXTRACT(week FROM trunc_time),
  280. EXTRACT(month FROM trunc_time),
  281. EXTRACT(year FROM trunc_time),
  282. EXTRACT(weekday FROM trunc_time)
  283. FROM
  284. (
  285. SELECT DISTINCT dateadd(hr, EXTRACT(hr FROM tm),
  286. dateadd(day, EXTRACT(day FROM tm),
  287. dateadd(month, EXTRACT(month FROM tm),
  288. dateadd(year, EXTRACT(year FROM tm) -
  289. EXTRACT(year FROM 'epoch'::timestamp), 'epoch'))))
  290. AS trunc_time
  291. FROM
  292. (
  293. SELECT dateadd(ms, ts, 'epoch') AS tm
  294. FROM staging_events_table
  295. WHERE page = 'NextSong'
  296. AND ts IS NOT NULL
  297. )
  298. );
  299. """)
  300. # QUERY LISTS
  301. create_table_queries = [staging_events_table_create, staging_songs_table_create, \
  302. songplay_table_create, user_table_create, song_table_create, \
  303. artist_table_create, time_table_create]
  304. drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, \
  305. songplay_table_drop, user_table_drop, song_table_drop, \
  306. artist_table_drop, time_table_drop]
  307. copy_table_queries = [staging_events_copy, staging_songs_copy]
  308. insert_table_queries = [songplay_table_insert, user_table_insert, \
  309. song_table_insert, artist_table_insert, time_table_insert]