Spark消费Kafka如何实现精准一次性消费?-程序员宅基地

技术标签: spark  kafka  Spark  大数据  

在这里插入图片描述

1.定义

  • 精确一次消费(Exactly-once) 是指消息一定会被处理且只会被处理一次。不多不少就一次处理。

如果达不到精确一次消费,可能会达到另外两种情况:

  • 至少一次消费(at least once),主要是保证数据不会丢失,但有可能存在数据重复问题。

  • 最多一次消费 (at most once),主要是保证数据不会重复,但有可能存在数据丢失问题。

如果同时解决了数据丢失和数据重复的问题,那么就实现了精确一次消费的语义了。

2. 问题如何产生

数据何时会丢失: 比如实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前kafka调整了偏移量,那么kafka就会认为数据已经被处理过,即使进程重启,kafka也会从新的偏移量开始,所以之前没有保存的数据就被丢失掉了。

数据何时会重复: 如果数据计算结果已经存盘了,在kafka调整偏移量之前,进程崩溃,那么kafka会认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被2次消费,又会被存盘,数据就被存了2遍,造成数据重复。

3.解决方案

方案一:利用关系型数据库的事务进行处理

出现丢失或者重复的问题,核心就是偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败。那么就不会出现丢失或者重复了。

这样的话可以把存数据和偏移量放到一个事务里。这样就做到前面的成功,如果后面做失败了,就回滚前面那么就达成了原子性。

问题与限制

  1. 数据库选型受限, 只能使用支持事务的关系型数据库 ,如mysql, oracle ,无法使用其他功能强大的nosql数据库。

  2. 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。做分布式事务,结构复杂,拖慢性能。

  3. 如果做本地事务 ,只能把分区数据提取到driver中进行保存,降低并发 ,增加executor到driver的数据传递io。

方案二:手动提交偏移量+幂等性处理

咱们知道如果能够同时解决数据丢失和数据重复问题,就等于做到了精确一次消费。

那咱们就各个击破。

首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。

但是如果数据保存了,没等偏移量提交进程挂了,数据会被重复消费。怎么办?那就要把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。如果能做到这个,就达到了幂等性保存,就不用担心数据会重复了。

难点

话虽如此,在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证。所以有的时候只能优先保证的数据不丢失。数据重复难以避免。即只保证了至少一次消费的语义

文章已收录于https://github.com/wangsl123/dzblog,欢迎start。微信关注【鄙人王道长】,也可第一时间看到文章。

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wangsl754/article/details/107479977

智能推荐

Pandas对行/列求和_pandas对具体列求和-程序员宅基地

文章浏览阅读5.3k次,点赞3次,收藏18次。Pandas对行/列求和_pandas对具体列求和

Visio--用例图、类图、顺序图、活动图_visio画用例图-程序员宅基地

文章浏览阅读1.9w次,点赞32次,收藏228次。做个小结。_visio画用例图

程序员的路----程序员一定要仔细看_程序员的道路-程序员宅基地

文章浏览阅读2.2k次。在网上看到的恶搞程序员的图片,实在是搞笑。看一次笑一次!程序猿的十年他不是乞丐,请尊称他为程序猿。对,他就是程序猿!其实,你们看到的不是僵尸,他们都是苦逼的程序猿!这不是恶搞的,是一个真实的故事!看他的拐杖,其实也挺可怜的!不知是不是敲代码敲多了,把脚敲颓了!_程序员的道路

从Atlas到Microsoft ASP.NET AJAX(6) - Networking, Application Services-程序员宅基地

文章浏览阅读42次。NetworkingCalling Web Service Methods from Script  为了简化Web Services方法调用,客户端代理的设计被改变了,它在方法调用和回调函数设置方面提供了强大的灵活性。  下面的例子展示了CTP版本中Web Services方法的客户端调用,以及回调函数的使用方式。第一个例子展示了在CTP版本中Web ..._.net6 applicationservices

一文带你了解socket网络编程以及详解过程和原理_socket编程-程序员宅基地

文章浏览阅读1.1w次,点赞42次,收藏178次。Socket(套接字)是计算机网络编程中的一种抽象概念,它提供了在网络上进行通信的接口。通过使用 Socket,可以在不同计算机之间建立连接,并进行数据的传输和交换。通过 Socket,客户端可以与服务器建立连接并发送请求,服务器接收请求并返回响应。通过 Socket,可以在多个用户之间实现实时的文字、音频或视频通信。可以使用 Socket 在不同计算机之间传输文件,如上传和下载文件。可通过 Socket 在远程计算机上执行指令或操作。_socket编程

nodejs第三方模块 express框架 传参_nodejs express 模块间参数传递-程序员宅基地

文章浏览阅读413次。nodejs第三方模块 express框架 传参get接口动态参数以这个举例 http://localhost:8001/edit/124其中,/124传递的id值124就是动态参数服务器通过req.params获取app.get("/edit/:id",(req,res)=>{ console.log(req.params) res.send("发送成功")})这个在我们删除和编辑数据的时候最常用,根据指定的id删除内容,以及编辑内容时的数据回显。查询字符串参数_nodejs express 模块间参数传递

随便推点

计算机考研408每日一题 day78_命中率高且电路实现简单的cache内存映射方式-程序员宅基地

文章浏览阅读392次。对于二叉排序树,下面的说法 ___是正确的。(华南理工大学 2006年)设备驱动程序在读写磁盘数据时一般釆用下列哪种I/O方式?(中国科学院大学 2018)下列___交换技术是独占信道工作方式。(中南大学 2006年)命中率高且电路实现简单的Cache 与内存映射方式是___映射方式。(中国科学院大学 2015)_命中率高且电路实现简单的cache内存映射方式

Unhandled exceptions: java.lang.IllegalAccessException, java.lang.reflect.InvocationTargetException_unhandled exceptions: java.lang.instantiationexcep-程序员宅基地

文章浏览阅读3.2k次。可能是没有异常处理,只需要加上throws Exception就解决问题了。_unhandled exceptions: java.lang.instantiationexception, java.lang.illegalacc

webview ERROR_UNSUPPORTED_SCHEME ,errorcode=-10问题处理-程序员宅基地

文章浏览阅读1.5k次。webView.setWebViewClient(new WebViewClient(){ @Override public boolean shouldOverrideUrlLoading(WebView view, String url) { try{ if(url.startsWith("baidumap://")){ Intent _error_unsupported_scheme

embed标签-程序员宅基地

文章浏览阅读66次。Embed  (一)、基本语法:  embed src=url  说明:embed可以用来插入各种多媒体,格式可以是 Midi、Wav、AIFF、AU、MP3等等, Netscape及新版的IE 都支持。url为音频或视频文件及其路径,可以是相对路径或绝对路径。  示例:<embed src="your.mid">  (二)、属性设置:  1、自动播放...

php 上传文件漏洞,PHP -- 文件包含、文件上传漏洞-程序员宅基地

文章浏览阅读330次。PHP -- 文件包含、文件上传漏洞PHP -- 文件包含、文件上传漏洞文件包含文件引入漏洞,是由一个动态页面编译时引入另一个文件的操作。文件引入本身是没有问题,它是用于统一页面风格,减少代码冗余的一种技术。但是在特定的场景下就会形成漏洞jsp:include指令和include动作,其中include动作用于引入动态文件php:include(),include_once(),require()..._php文件上传4漏洞

配置NGINX同时运行 https 和 http_nginx 和 http无法同时启动-程序员宅基地

文章浏览阅读406次。SSL 是需要申请证书的,key和PEM文件要放到服务器路径。然后NGINX下要进行443端口和80端口的绑定。server { listen 80; server_name ietaiji.com www.ietaiji.com; root "D:/aaa/WWW/ietaiji"; index index.html_nginx 和 http无法同时启动