app-backup.py 1002 B

123456789101112131415161718192021222324252627282930313233
  1. """Example Kafka consumer."""
  2. import json
  3. import os
  4. from kafka import KafkaConsumer, KafkaProducer
  5. KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
  6. TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
  7. LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
  8. FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')
  9. def is_suspicious(transaction: dict) -> bool:
  10. """Determine whether a transaction is suspicious."""
  11. return transaction['amount'] >= 900
  12. if __name__ == '__main__':
  13. consumer = KafkaConsumer(
  14. TRANSACTIONS_TOPIC,
  15. bootstrap_servers=KAFKA_BROKER_URL,
  16. value_deserializer= json.loads(value),
  17. )
  18. producer = KafkaProducer(
  19. bootstrap_servers=KAFKA_BROKER_URL,
  20. value_serializer= json.dumps(value).encode(),
  21. )
  22. for message in consumer:
  23. transaction: dict = message.value
  24. topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
  25. producer.send(topic, value=transaction)
  26. print(topic, transaction) # DEBUG