问题描述:

My spark streaming job is consuming data from Kafka

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),

prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);

whenever i restart my job it start consuming from last offset store (i am assuming this because it takes a lot of time to send processed data and if i change the consumer group it works instantly with new message)

I am kafka 8.1.1 where auto.offset.reset is default to largest which means whenever i'll restart kafka will send data from where i left.

My use case ask me to ignore this data and process only arriving data. How can i achieve this?

any suggestion

网友答案:

There is two ways you can achieve this:

  1. Create a unique consumer group each time on restart and it will consume from the latest offset.

  2. Use the direct approach instead of receiver based; here you have more control over how you consume but would have to update zookeeper manually to store your offsets. In the example below it will always start at latest offset.

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    

Documentation on direct approach here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html

相关阅读:
Top