Spark Streaming + Kafka实战
内容整理自:
接收Kafka数据
接收数据的方式有两种:
- 利用Receiver接收数据
- 直接从Kafka读取数据
利用Receiver接收数据
从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
需要注意的地方:
-
在Receiver的方式中,
Spark中的partition和kafka中的partition并不是相关的
,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度 -
对于不同的Group和topic我们可以使用
多个Receiver
创建不同的Dstream来并行接收数据,之后可以利用union
来统一成一个Dstream -
如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成
StorageLevel.MEMORY_AND_DISK_SER
,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
直接从kafka读取数据
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch
这种方法相较于Receiver方式的优势在于:
-
简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
-
高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
-
精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性
向Kafka写入数据
Spark没有提供统一的接口用于写数据入Kafka, 因此我们需要使用Kafka接口自定义包装
input.foreachRDD(rdd => {
// 不能在这里新建KafkaProducer,因为KafkaProducer是不可序列化的
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)
}
}
})
})
上面这种方法实现最简单,但缺点也很明显,我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的
因此对于每个partition的每条记录都需要创建KafkaProducer,相当于都要建立一次连接,低效且不灵活,优化方案:
- 懒加载方式定义KafkaProducer
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- 利用广播变量的形式,将KafkaProducer广播到每一个executor
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
- 在每个executor中数据写入Kafka
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})
Spark Streaming+Kafka调优
Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置
合理的批处理时间(batchDuration)
初始化StreamingContext
时设置,代表job处理的时间。不能过小,会导致Spark Streaming频繁提交作业,如果batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积, 最终导致Spark Streaming发生阻塞.
合理的Kafka拉取量(maxRatePerPartition)
配置参数:spark.streaming.kafka.maxRatePerPartition
, 默认没有上线,即Kafka中有多少数据都会全部拉出。
这个参数需要结合上面的batchDuration
配置,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量
缓存反复使用的DStream(RDD)
对经常复用的RDD,我们会选择cache()将其缓存下来,同理在Spark Streaming我们也可以选择将DStream缓存下来,防止过度的调度资源造成的网络开销
设置合理的GC
设置合理的资源配置
num-executors
: 用于设置Spark作业总共要用多少个Executor进程来执行executor-memory
: 该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联executor-cores
: 该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力
设置合理的parallelism
partition和parallelism
-
partition
: partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低 -
parallelism
: parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的partition数量, 而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响
通过spark.default.parallelism
可以设置默认的分片数量,也可以在创建RDD时指定. 在SparkStreaming+Kafka的使用中,如果采用Direct连接方式,Spark中的partition和Kafka中的Partition是一一对应的,一般默认设置为Kafka中Partition的数量