example.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import random
  2. import time
  3. import argparse
  4. import json
  5. from datetime import datetime
  6. from pyspark import SparkConf, SparkContext
  7. from pyspark.sql import SparkSession
  8. from pyspark.sql.utils import StreamingQueryException
  9. from pyspark.sql import DataFrame
  10. from pyspark.sql import types as T
  11. from pyspark.sql import functions as F
  12. spark_check_point_dir = "checkpoint"
  13. WATERMARK_DELAY_THRESHOLD_SECONDS = 5 # 窗口水位
  14. WINDOW_DURATION_SECONDS = 60 # 窗口大小
  15. WINDOW_SLIDE_DURATION_SECONDS = 5 # 窗口步长
  16. WATERMARK_DELAY_THRESHOLD = "{} seconds".format(WATERMARK_DELAY_THRESHOLD_SECONDS)
  17. WINDOW_DURATION = "{} seconds".format(WINDOW_DURATION_SECONDS)
  18. WINDOW_SLIDE_DURATION = "{} seconds".format(WINDOW_SLIDE_DURATION_SECONDS)
  19. transferSchema = T.StructType(
  20. [
  21. T.StructField("timestamp", T.TimestampType(), True),
  22. T.StructField("school", T.StringType(), True),
  23. T.StructField("major", T.StringType(), True),
  24. T.StructField("data", T.BinaryType(), True),
  25. ]
  26. )
  27. def _kafka_args():
  28. parser = argparse.ArgumentParser(add_help=False)
  29. args = parser.add_argument_group("kafka configurations")
  30. args.add_argument(
  31. "-t",
  32. "--source_topics",
  33. help="Kafka source topics, separate by comma",
  34. default="kafka-default",
  35. )
  36. args.add_argument(
  37. "-a", "--address", help="Kafka address.", default="localhost:9092"
  38. )
  39. args.add_argument(
  40. "-st", "--sink_topic", help="Kafka sink topic", default="kafka-default-sink"
  41. )
  42. args.add_argument("-u", "--username", help="Kafka server's username.")
  43. args.add_argument("-p", "--password", help="Kafka server's password.")
  44. return parser
  45. def start_algorithm_logic(sdf: DataFrame) -> DataFrame:
  46. @F.udf(returnType=transferSchema)
  47. def transferDF(data):
  48. # 根据传入的数据来做反序列化,可以是pb 也可以是json。具体结构可根据kafka producer里生产的数据结构调整
  49. # obj = ObjectInfo()
  50. # obj.ParseFromString(pb_bytes)
  51. # 在这里,我们将data定义为json dict.
  52. # {"timestamp": "2006-01-02Z03:04:05", "school": "cambridge", "major": "computer science", "name": "hanmeimei", "extra": ""}
  53. obj = json.loads(data)
  54. return (
  55. datetime.strptime(obj["timestamp"], "%Y-%m-%dT%H:%M:%S.%f"),
  56. obj["school"],
  57. obj["major"],
  58. data,
  59. )
  60. @F.udf(T.ArrayType(T.BinaryType()))
  61. def handle_grouped_data(data_list, window) -> T.ArrayType(T.BinaryType()):
  62. if not window or len(window) != 1:
  63. print("handle_grouped_data, should have only one window instance")
  64. return None
  65. if len(data_list) == 0:
  66. return None
  67. sample = data_list[0]
  68. sample_dict = json.loads(sample)
  69. print(
  70. f"date_list length:{len(data_list)},school: {sample_dict['school']}, major: {sample_dict['major']}, window start:{window[0].start}, window end:{window[0].end}"
  71. )
  72. # do whatever you want to do with the date set here.
  73. # return the result list which will produce to kafka sink topic
  74. results = [sample]
  75. return results
  76. # 从pb数据中拆出timestamp,region_id, camera_id用来group和做窗口
  77. sdf = sdf.select(transferDF(sdf.value).alias("window_data")).select(
  78. "window_data.timestamp",
  79. "window_data.school",
  80. "window_data.major",
  81. "window_data.data",
  82. )
  83. # 水位设置,将滑动窗口中数据以camera_id和region_id分组
  84. window_group = sdf.withWatermark("timestamp", WATERMARK_DELAY_THRESHOLD).groupBy(
  85. F.window(F.col("timestamp"), WINDOW_DURATION, WINDOW_SLIDE_DURATION),
  86. F.col("school"),
  87. F.col("major"),
  88. )
  89. # 处理数据
  90. result_df_set = window_group.agg(
  91. handle_grouped_data(
  92. F.collect_list("data"), F.collect_set("window")
  93. ).alias( # 使用窗口数据
  94. "value_set"
  95. )
  96. ).withColumn(
  97. "value", F.explode(F.col("value_set"))
  98. ) # 拆分处理后的数据集
  99. # 返回结果
  100. return result_df_set.filter(result_df_set.value != b"")
  101. def start_query(sc: SparkContext, args: argparse.Namespace):
  102. spark = SparkSession(sparkContext=sc)
  103. kafka_address = args.address
  104. kafka_source_topics = args.source_topics
  105. kafka_sink_topic = args.sink_topic
  106. kafka_username = args.username
  107. kafka_password = args.password
  108. if kafka_username and kafka_password:
  109. kafka_jaas_config = f"org.apache.kafka.common.security.plain.PlainLoginModule required username={kafka_username} password={kafka_password};"
  110. # kafka source
  111. sdf = (
  112. spark.readStream.format("kafka")
  113. .option("kafka.bootstrap.servers", kafka_address)
  114. .option("subscribe", kafka_source_topics)
  115. .option("kafka.security.protocol", "SASL_PLAINTEXT")
  116. .option("kafka.sasl.mechanism", "PLAIN")
  117. .option("kafka.sasl.jaas.config", kafka_jaas_config)
  118. .option("kafka.reconnect.backoff.ms", 2000)
  119. .option("kafka.reconnect.backoff.max.ms", 10000)
  120. .option("failOnDataLoss", False)
  121. .option("backpressure.enabled", True)
  122. .load()
  123. )
  124. sdf = start_algorithm_logic(sdf)
  125. # out sink
  126. query = (
  127. sdf.writeStream.format("kafka")
  128. .option("kafka.bootstrap.servers", kafka_address)
  129. .option("topic", kafka_sink_topic)
  130. .option("kafka.security.protocol", "SASL_PLAINTEXT")
  131. .option("kafka.sasl.mechanism", "PLAIN")
  132. .option("kafka.sasl.jaas.config", kafka_jaas_config)
  133. .option("kafka.reconnect.backoff.ms", 2000)
  134. .option("kafka.reconnect.backoff.max.ms", 10000)
  135. .option("checkpointLocation", spark_check_point_dir)
  136. .start()
  137. )
  138. return query
  139. else:
  140. # kafka source
  141. sdf = (
  142. spark.readStream.format("kafka")
  143. .option("kafka.bootstrap.servers", kafka_address)
  144. .option("subscribe", kafka_source_topics)
  145. .option("kafka.reconnect.backoff.ms", 2000)
  146. .option("kafka.reconnect.backoff.max.ms", 10000)
  147. .option("failOnDataLoss", False)
  148. .option("backpressure.enabled", True)
  149. .load()
  150. )
  151. sdf = start_algorithm_logic(sdf)
  152. # out sink
  153. query = (
  154. sdf.writeStream.format("kafka")
  155. .option("kafka.bootstrap.servers", kafka_address)
  156. .option("topic", kafka_sink_topic)
  157. .option("kafka.reconnect.backoff.ms", 2000)
  158. .option("kafka.reconnect.backoff.max.ms", 10000)
  159. .option("checkpointLocation", spark_check_point_dir)
  160. .start()
  161. )
  162. return query
  163. def main():
  164. parser = argparse.ArgumentParser(
  165. parents=[_kafka_args()],
  166. formatter_class=argparse.ArgumentDefaultsHelpFormatter,
  167. )
  168. args = parser.parse_args()
  169. spark_conf = (
  170. SparkConf()
  171. .setMaster("local")
  172. .set(
  173. "spark.jars",
  174. "jars/*jar",
  175. )
  176. .set("spark.driver.memory", "10240m")
  177. )
  178. sc = SparkContext(conf=spark_conf)
  179. while True:
  180. query = start_query(sc, args)
  181. try:
  182. query.awaitTermination()
  183. except StreamingQueryException as err:
  184. wait = random.randint(0, 60)
  185. print(
  186. f"Query exception: {err},After {wait}seconds, query will be restarted."
  187. )
  188. time.sleep(wait)
  189. if __name__ == "__main__":
  190. main()