123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- import psycopg2
- import sys
- import os
- import json
- import time
- import kafka
- import decimal
- from kafka import KafkaConsumer
- class SalesConsumer():
- def __init__(self, topic):
- #connection
- self.topic = topic
- conn_string = "dbname='salesbi' user='salesbi'"
- self.conn = psycopg2.connect(conn_string)
- self.cur = self.conn.cursor()
- self.sales = {}
- self.reps = {}
- self.prods = {}
-
- def getrefs(self):
- #get data from the database into a list
- self.cur.execute("""SELECT sales_rep_id,sales_rep_territory,daily_gallon_plan from sales_rep""")
- self.sales_rows = self.cur.fetchall()
- for sr in self.sales_rows:
- self.reps[sr[0]] = {'TerritoryId':sr[1], 'Plan':sr[2]}
- self.cur.execute("""SELECT product_id,gallons from product""")
- self.prod_rows = self.cur.fetchall()
- for p in self.prod_rows:
- self.prods[p[0]] = {'Gallons': p[1]}
- self.cur.execute(
- """
- SELECT territory_id,territory_name, SUM(daily_gallon_plan) plan
- from territory t, sales_rep r
- WHERE r.sales_rep_territory = t.territory_id
- GROUP BY t.territory_id, t.territory_name
- """
- )
- self.terr_rows = self.cur.fetchall()
-
- for t in self.terr_rows:
- self.sales[t[0]] = {'Name':t[1], 'Plan':t[2], 'Act':0.0, 'Ave':0.0, 'Stat':0, 'Tstamp':''}
-
- self.cur.close()
- self.conn.close()
- def getconsumer(self):
- #read message from kafka
- self.consumer = KafkaConsumer(self.topic)
- print( self.topic, 'consumer created')
- return self.consumer
-
- def getsales(self, message):
- #print len(self.sales)
- event1 = json.loads(message.value)
- prod_id = event1['product_id'].encode('ascii', 'ignore')
- units= event1['unit_sold'].encode('ascii', 'ignore')
- sales_rep = event1['sales_rep_id'].encode('ascii', 'ignore')
- tstamp = event1['timestamp'].encode('ascii','ignore')
- territory = self.reps[sales_rep]['TerritoryId']
- gallons = float(self.prods[prod_id]['Gallons'])*float(units)
- self.sales[territory]['Tstamp'] = tstamp
- actual = self.sales[territory]['Act'] + gallons
- self.sales[territory]['Act'] = actual
- ave = (self.sales[territory]['Ave'] + gallons)/2
- self.sales[territory]['Ave'] = ave
-
- plan = float(self.sales[territory]['Plan'])
-
- if actual > plan:
- self.sales[territory]['Stat'] = 1
- elif actual == plan:
- self.sales[territory]['Stat'] = 0
- elif actual < plan:
- self.sales[territory]['Stat'] = -1
-
- if __name__ == '__main__':
- sc = SalesConsumer('transaction_slot')
- sc.getrefs()
- for msg in sc.getconsumer():
- if len(msg.value) == 0:
- continue
- sc.getsales(msg)
- print(json.dumps(sc.sales))
-
|