123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import random
- import time
- import argparse
- import json
- from datetime import datetime
- from pyspark import SparkConf, SparkContext
- from pyspark.sql import SparkSession
- from pyspark.sql.utils import StreamingQueryException
- from pyspark.sql import DataFrame
- from pyspark.sql import types as T
- from pyspark.sql import functions as F
- spark_check_point_dir = "checkpoint"
- WATERMARK_DELAY_THRESHOLD_SECONDS = 5 # 窗口水位
- WINDOW_DURATION_SECONDS = 60 # 窗口大小
- WINDOW_SLIDE_DURATION_SECONDS = 5 # 窗口步长
- WATERMARK_DELAY_THRESHOLD = "{} seconds".format(WATERMARK_DELAY_THRESHOLD_SECONDS)
- WINDOW_DURATION = "{} seconds".format(WINDOW_DURATION_SECONDS)
- WINDOW_SLIDE_DURATION = "{} seconds".format(WINDOW_SLIDE_DURATION_SECONDS)
- transferSchema = T.StructType(
- [
- T.StructField("timestamp", T.TimestampType(), True),
- T.StructField("school", T.StringType(), True),
- T.StructField("major", T.StringType(), True),
- T.StructField("data", T.BinaryType(), True),
- ]
- )
- def _kafka_args():
- parser = argparse.ArgumentParser(add_help=False)
- args = parser.add_argument_group("kafka configurations")
- args.add_argument(
- "-t",
- "--source_topics",
- help="Kafka source topics, separate by comma",
- default="kafka-default",
- )
- args.add_argument(
- "-a", "--address", help="Kafka address.", default="localhost:9092"
- )
- args.add_argument(
- "-st", "--sink_topic", help="Kafka sink topic", default="kafka-default-sink"
- )
- args.add_argument("-u", "--username", help="Kafka server's username.")
- args.add_argument("-p", "--password", help="Kafka server's password.")
- return parser
- def start_algorithm_logic(sdf: DataFrame) -> DataFrame:
- @F.udf(returnType=transferSchema)
- def transferDF(data):
- # 根据传入的数据来做反序列化,可以是pb 也可以是json。具体结构可根据kafka producer里生产的数据结构调整
- # obj = ObjectInfo()
- # obj.ParseFromString(pb_bytes)
- # 在这里,我们将data定义为json dict.
- # {"timestamp": "2006-01-02Z03:04:05", "school": "cambridge", "major": "computer science", "name": "hanmeimei", "extra": ""}
- obj = json.loads(data)
- return (
- datetime.strptime(obj["timestamp"], "%Y-%m-%dT%H:%M:%S.%f"),
- obj["school"],
- obj["major"],
- data,
- )
- @F.udf(T.ArrayType(T.BinaryType()))
- def handle_grouped_data(data_list, window) -> T.ArrayType(T.BinaryType()):
- if not window or len(window) != 1:
- print("handle_grouped_data, should have only one window instance")
- return None
- if len(data_list) == 0:
- return None
- sample = data_list[0]
- sample_dict = json.loads(sample)
- print(
- f"date_list length:{len(data_list)},school: {sample_dict['school']}, major: {sample_dict['major']}, window start:{window[0].start}, window end:{window[0].end}"
- )
- # do whatever you want to do with the date set here.
- # return the result list which will produce to kafka sink topic
- results = [sample]
- return results
- # 从pb数据中拆出timestamp,region_id, camera_id用来group和做窗口
- sdf = sdf.select(transferDF(sdf.value).alias("window_data")).select(
- "window_data.timestamp",
- "window_data.school",
- "window_data.major",
- "window_data.data",
- )
- # 水位设置,将滑动窗口中数据以camera_id和region_id分组
- window_group = sdf.withWatermark("timestamp", WATERMARK_DELAY_THRESHOLD).groupBy(
- F.window(F.col("timestamp"), WINDOW_DURATION, WINDOW_SLIDE_DURATION),
- F.col("school"),
- F.col("major"),
- )
- # 处理数据
- result_df_set = window_group.agg(
- handle_grouped_data(
- F.collect_list("data"), F.collect_set("window")
- ).alias( # 使用窗口数据
- "value_set"
- )
- ).withColumn(
- "value", F.explode(F.col("value_set"))
- ) # 拆分处理后的数据集
- # 返回结果
- return result_df_set.filter(result_df_set.value != b"")
- def start_query(sc: SparkContext, args: argparse.Namespace):
- spark = SparkSession(sparkContext=sc)
- kafka_address = args.address
- kafka_source_topics = args.source_topics
- kafka_sink_topic = args.sink_topic
- kafka_username = args.username
- kafka_password = args.password
- if kafka_username and kafka_password:
- kafka_jaas_config = f"org.apache.kafka.common.security.plain.PlainLoginModule required username={kafka_username} password={kafka_password};"
- # kafka source
- sdf = (
- spark.readStream.format("kafka")
- .option("kafka.bootstrap.servers", kafka_address)
- .option("subscribe", kafka_source_topics)
- .option("kafka.security.protocol", "SASL_PLAINTEXT")
- .option("kafka.sasl.mechanism", "PLAIN")
- .option("kafka.sasl.jaas.config", kafka_jaas_config)
- .option("kafka.reconnect.backoff.ms", 2000)
- .option("kafka.reconnect.backoff.max.ms", 10000)
- .option("failOnDataLoss", False)
- .option("backpressure.enabled", True)
- .load()
- )
- sdf = start_algorithm_logic(sdf)
- # out sink
- query = (
- sdf.writeStream.format("kafka")
- .option("kafka.bootstrap.servers", kafka_address)
- .option("topic", kafka_sink_topic)
- .option("kafka.security.protocol", "SASL_PLAINTEXT")
- .option("kafka.sasl.mechanism", "PLAIN")
- .option("kafka.sasl.jaas.config", kafka_jaas_config)
- .option("kafka.reconnect.backoff.ms", 2000)
- .option("kafka.reconnect.backoff.max.ms", 10000)
- .option("checkpointLocation", spark_check_point_dir)
- .start()
- )
- return query
- else:
- # kafka source
- sdf = (
- spark.readStream.format("kafka")
- .option("kafka.bootstrap.servers", kafka_address)
- .option("subscribe", kafka_source_topics)
- .option("kafka.reconnect.backoff.ms", 2000)
- .option("kafka.reconnect.backoff.max.ms", 10000)
- .option("failOnDataLoss", False)
- .option("backpressure.enabled", True)
- .load()
- )
- sdf = start_algorithm_logic(sdf)
- # out sink
- query = (
- sdf.writeStream.format("kafka")
- .option("kafka.bootstrap.servers", kafka_address)
- .option("topic", kafka_sink_topic)
- .option("kafka.reconnect.backoff.ms", 2000)
- .option("kafka.reconnect.backoff.max.ms", 10000)
- .option("checkpointLocation", spark_check_point_dir)
- .start()
- )
- return query
- def main():
- parser = argparse.ArgumentParser(
- parents=[_kafka_args()],
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- args = parser.parse_args()
- spark_conf = (
- SparkConf()
- .setMaster("local")
- .set(
- "spark.jars",
- "jars/*jar",
- )
- .set("spark.driver.memory", "10240m")
- )
- sc = SparkContext(conf=spark_conf)
- while True:
- query = start_query(sc, args)
- try:
- query.awaitTermination()
- except StreamingQueryException as err:
- wait = random.randint(0, 60)
- print(
- f"Query exception: {err},After {wait}seconds, query will be restarted."
- )
- time.sleep(wait)
- if __name__ == "__main__":
- main()
|