123456789101112131415161718192021222324252627282930313233 |
- """Example Kafka consumer."""
- import json
- import os
- from kafka import KafkaConsumer, KafkaProducer
- KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
- TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
- LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
- FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')
- def is_suspicious(transaction: dict) -> bool:
- """Determine whether a transaction is suspicious."""
- return transaction['amount'] >= 900
- if __name__ == '__main__':
- consumer = KafkaConsumer(
- TRANSACTIONS_TOPIC,
- bootstrap_servers=KAFKA_BROKER_URL,
- value_deserializer= json.loads(value),
- )
- producer = KafkaProducer(
- bootstrap_servers=KAFKA_BROKER_URL,
- value_serializer= json.dumps(value).encode(),
- )
- for message in consumer:
- transaction: dict = message.value
- topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
- producer.send(topic, value=transaction)
- print(topic, transaction) # DEBUG
|