consumer.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import psycopg2
  2. import sys
  3. import os
  4. import json
  5. import time
  6. import kafka
  7. import decimal
  8. from kafka import KafkaConsumer
  9. class SalesConsumer():
  10. def __init__(self, topic):
  11. #connection
  12. self.topic = topic
  13. conn_string = "dbname='salesbi' user='salesbi'"
  14. self.conn = psycopg2.connect(conn_string)
  15. self.cur = self.conn.cursor()
  16. self.sales = {}
  17. self.reps = {}
  18. self.prods = {}
  19. def getrefs(self):
  20. #get data from the database into a list
  21. self.cur.execute("""SELECT sales_rep_id,sales_rep_territory,daily_gallon_plan from sales_rep""")
  22. self.sales_rows = self.cur.fetchall()
  23. for sr in self.sales_rows:
  24. self.reps[sr[0]] = {'TerritoryId':sr[1], 'Plan':sr[2]}
  25. self.cur.execute("""SELECT product_id,gallons from product""")
  26. self.prod_rows = self.cur.fetchall()
  27. for p in self.prod_rows:
  28. self.prods[p[0]] = {'Gallons': p[1]}
  29. self.cur.execute(
  30. """
  31. SELECT territory_id,territory_name, SUM(daily_gallon_plan) plan
  32. from territory t, sales_rep r
  33. WHERE r.sales_rep_territory = t.territory_id
  34. GROUP BY t.territory_id, t.territory_name
  35. """
  36. )
  37. self.terr_rows = self.cur.fetchall()
  38. for t in self.terr_rows:
  39. self.sales[t[0]] = {'Name':t[1], 'Plan':t[2], 'Act':0.0, 'Ave':0.0, 'Stat':0, 'Tstamp':''}
  40. self.cur.close()
  41. self.conn.close()
  42. def getconsumer(self):
  43. #read message from kafka
  44. self.consumer = KafkaConsumer(self.topic)
  45. print( self.topic, 'consumer created')
  46. return self.consumer
  47. def getsales(self, message):
  48. #print len(self.sales)
  49. event1 = json.loads(message.value)
  50. prod_id = event1['product_id'].encode('ascii', 'ignore')
  51. units= event1['unit_sold'].encode('ascii', 'ignore')
  52. sales_rep = event1['sales_rep_id'].encode('ascii', 'ignore')
  53. tstamp = event1['timestamp'].encode('ascii','ignore')
  54. territory = self.reps[sales_rep]['TerritoryId']
  55. gallons = float(self.prods[prod_id]['Gallons'])*float(units)
  56. self.sales[territory]['Tstamp'] = tstamp
  57. actual = self.sales[territory]['Act'] + gallons
  58. self.sales[territory]['Act'] = actual
  59. ave = (self.sales[territory]['Ave'] + gallons)/2
  60. self.sales[territory]['Ave'] = ave
  61. plan = float(self.sales[territory]['Plan'])
  62. if actual > plan:
  63. self.sales[territory]['Stat'] = 1
  64. elif actual == plan:
  65. self.sales[territory]['Stat'] = 0
  66. elif actual < plan:
  67. self.sales[territory]['Stat'] = -1
  68. if __name__ == '__main__':
  69. sc = SalesConsumer('transaction_slot')
  70. sc.getrefs()
  71. for msg in sc.getconsumer():
  72. if len(msg.value) == 0:
  73. continue
  74. sc.getsales(msg)
  75. print(json.dumps(sc.sales))