cassandra_rw.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import uuid
  2. from cassandra.cqlengine import connection
  3. from datetime import datetime
  4. from cassandra.cqlengine.management import sync_table
  5. import csv
  6. from TxInfo import TxInfoModel
  7. import pandas as pd
  8. class CassandraReadWriteDb:
  9. def __init__(self, ip_addrs, keyspace):
  10. connection.setup( ip_addrs, keyspace, protocol_version=3)
  11. def sync_class_table(self, typeOfClass):
  12. self.typeOfClass = typeOfClass
  13. sync_table(typeOfClass)
  14. #Write CSV to cassandra
  15. def write_file_table(self, credit_logs):
  16. with open(credit_logs) as csv_file:
  17. csv_reader = csv.DictReader(csv_file, delimiter=',')
  18. for row in csv_reader:
  19. self.typeOfClass.create(**dict(row))
  20. #Read Cassandra data to pandas
  21. def get_pandas_from_cassandra(self):
  22. tx_info = pd.DataFrame()
  23. for q in TxInfoModel.objects():
  24. d = pd.DataFrame.from_records([q.values()])
  25. tx_info = tx_info.append(d)
  26. tx_info.columns = q.keys()
  27. return tx_info
  28. def write_json_table(self, data):
  29. print (data)
  30. self.typeOfClass.create(**dict(data))
  31. if __name__ == '__main__':
  32. cwd = CassandraReadWriteDb(ip_addrs=['172.17.0.2'], keyspace="emp")
  33. cwd.sync_class_table(TxInfoModel)
  34. #cwd.write_file_table('credit.csv')
  35. print(cwd.get_pandas_from_cassandra())