etl.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import configparser
  2. import psycopg2
  3. from sql_queries import copy_table_queries, insert_table_queries
  4. def load_staging_tables(cur, conn):
  5. """Load data from files into staging tables;
  6. Parameter: cur, a cursor object used to execute commands;
  7. Parameter: conn, a database connection object."""
  8. for query in copy_table_queries:
  9. cur.execute(query)
  10. conn.commit()
  11. def insert_tables(cur, conn):
  12. """Insert data from staging tables;
  13. Parameter: cur, a cursor object used to execute commands;
  14. Parameter: coon, a database connection oject."""
  15. for query in insert_table_queries:
  16. cur.execute(query)
  17. conn.commit()
  18. def main():
  19. # Read configuration information to establish connection
  20. config = configparser.ConfigParser()
  21. config.read('dwh.cfg')
  22. # Create a connection object to the AWS Redshift cluster
  23. conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
  24. cur = conn.cursor()
  25. # Load data from files
  26. load_staging_tables(cur, conn)
  27. # Insert data from staging files
  28. insert_tables(cur, conn)
  29. cur.close()
  30. conn.close()
  31. if __name__ == "__main__":
  32. main()