kafka-flink-hbase_bug搬运攻城狮的博客-程序员秘密

技术标签: flink  

前言

flink现在是越来越火了,我最近也有在看flink官网,上周五一个朋友叫我写个flink读取kafka,还有读取mysql,本人比较苦逼,没有实验集群,样例是写出来了,能不能成事,你们用自己的集群测试一下

  • flink-1.6.2
  • kafka-0.10
Maven pom.xml
 <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${
    hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${
    hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${
    hadoop.version}</version>
        </dependency>

        <!-- flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${
    flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${
    flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${
    flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${
    flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc</artifactId>
            <version>${
    flink.version}</version>
        </dependency>
        <!--Hbase lib库 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${
    hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${
    hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${
    hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
            <version>${
    hbase.version}</version>
        </dependency>
        <!--JDBC-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>

可能会用到上面的依赖jar包

flink与kafka

先看一下官网给的kafka 集成flink示例吧

Kafka010Example
官网示例地址

package com.learn.Flink.kafka

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{
    FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.scala._

/**
  * @Author: king
  * @Datetime: 2018/10/16
  * @Desc: TODO
  *
  */
object Kafka010Example {
    
  def main(args: Array[String]): Unit = {
    

    // 解析输入参数
    val params = ParameterTool.fromArgs(args)

    if (params.getNumberOfParameters < 4) {
    
      println("Missing parameters!\n"
        + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
        + "--bootstrap.servers <kafka brokers> "
        + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
      return
    }

    val prefix = params.get("prefix", "PREFIX:")


    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging
    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
    // 每隔5秒创建一个检查点
    env.enableCheckpointing(5000)
    // 在Web界面中提供参数
    env.getConfig.setGlobalJobParameters(params)

    // 为卡夫卡0.10 x创建一个卡夫卡流源用户
    val kafkaConsumer = new FlinkKafkaConsumer010(
      params.getRequired("input-topic"),
      new SimpleStringSchema,
      params.getProperties)
    //消费kafka数据
    /*val transaction = env
      .addSource(
        new FlinkKafkaConsumer010[String](
          params.getRequired("input-topic"),
          new SimpleStringSchema,
          params.getProperties))
    transaction.print()*/

    //消费kafka数据
    val messageStream = env
      .addSource(kafkaConsumer)
      .map(in => prefix + in)
    messageStream.print()
    // 为卡夫卡0.10 X创建一个生产者
    val kafkaProducer = new FlinkKafkaProducer010(
      params.getRequired("output-topic"),
      new SimpleStringSchema,
      params.getProperties)

    // 将数据写入kafka
    messageStream.addSink(kafkaProducer)

    env.execute("Kafka 0.10 Example")
  }

}

自己写了一个flink消费kafka里面的数据

ReadingFromKafka

package com.learn.Flink.kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{
    CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._

/**
  * @Author: king
  * @Datetime: 2018/11/26 
  * @Desc: TODO
  *
  */
object ReadingFromKafka {
    
  private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181"
  private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092"
  private val TRANSACTION_GROUP = "transaction"

  def main(args: Array[String]) {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // configure Kafka consumer
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)
    kafkaProps.setProperty("group.id", TRANSACTION_GROUP)

    //topicd的名字是new,schema默认使用SimpleStringSchema()即可
    val transaction = env
      .addSource(
        new FlinkKafkaConsumer010[String]("new", new SimpleStringSchema, kafkaProps)
      )

    transaction.print()

    env.execute()

  }

}

然后觉得消费了总要写到哪里去吧,那就写到hbase里面去(自己验证一下,我没有实验集群去验证代码)

package com.learn.Flink.kafka

import java.text.SimpleDateFormat
import java.util.{
    Date, Properties}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.hadoop.hbase.client.{
    ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{
    HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.flink.api.scala._

/**
  * @Author: king
  * @Datetime: 2018/11/23
  * @Desc: TODO
  *
  */
object Flink2hbase {
    
  val ZOOKEEPER_URL = "hostname1:port,hostname2:port,hostname3:port"
  val KAFKA_URL = "hostname1:port,hostname2:port,hostname3:port"
  val columnFamily = "info"
  val tableName = TableName.valueOf("Flink2HBase")

  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置启动检查点(很重要)
    env.enableCheckpointing(1000)
    // 设置为TimeCharacteristic.EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val props = new Properties()
    props.setProperty("zookeeper.connect", ZOOKEEPER_URL)
    props.setProperty("bootstrap.servers", KAFKA_URL)
    props.setProperty("grou.idp", "flink-kafka")
    val transction = env.addSource(
      new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema, props))
    transction.rebalance.map {
     value =>
      print(value)
      writeIntoHBase(value)
    }
    env.execute()


  }

  def writeIntoHBase(m: String): Unit = {
    
    val hbaseconf = HBaseConfiguration.create
    hbaseconf.set("hbase.zookeeper.quorum", ZOOKEEPER_URL)
    hbaseconf.set("hbase.defaults.for.version.skip", "ture")
    val connection = ConnectionFactory.createConnection(hbaseconf)
    val admin = connection.getAdmin

    if (!admin.tableExists(tableName)) {
    
      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(columnFamily)))
    }
    val table = connection.getTable(tableName)
    val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val put = new Put(Bytes.toBytes(df.format(new Date())))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("test"), Bytes.toBytes(m))
    table.put(put)
    table.close()

  }

}

flink与JDBC

package com.learn.Flink.demo

import java.sql.Types

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{
    JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.Row

/**
  * @Author: king
  * @Datetime: 2018/11/23 
  * @Desc: TODO
  *
  */
object JDBCSouce {
    
  val driverClass = "com.mysql.jdbc.Driver"
  val dbUrl = "jdbc:mysql://172.17.17.89:3306/test"
  val userNmae = "usr_test"
  val passWord = "usr_test"

  def main(args: Array[String]): Unit = {
    
    // 运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 插入一组数据
    // 准备数据
    val row1 = new Row(3)
    row1.setField(0, 1)
    row1.setField(1, "shabi")
    row1.setField(2, 20)

    val row2 = new Row(3)
    row2.setField(0, 2)
    row2.setField(1, "doubi")
    row2.setField(2, 22)

    val rows: Array[Row] = Array(row1, row2)

    // 插入数据
    //insertRows(rows)

    // 查看所有数据
    selectAllFields(env)

    // 更新某行
    val row22 = new Row(3)
    row22.setField(0, 2)
    row22.setField(1, "")
    row22.setField(2, 25)
    //updateRow(row22)

  }


  /**
    * 插入数据
    */
  def insertRows(rows: Array[Row]): Unit = {
    
    // 准备输出格式
    val jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
      .setDrivername(driverClass)
      .setDBUrl(dbUrl)
      .setUsername(userNmae)
      .setPassword(passWord)
      .setQuery("insert into flink_test values(?,?,?)")
      // 需要对应到表中的字段
      .setSqlTypes(Array[Int](Types.INTEGER, Types.VARCHAR, Types.INTEGER))
      .finish()

    // 连接到目标数据库,并初始化preparedStatement
    jdbcOutputFormat.open(0, 1)

    // 添加记录到 preparedStatement,此时jdbcOutputFormat需要确保是开启的
    // 未指定列类型时,此操作可能会失败
    for (row <- rows) {
    
      jdbcOutputFormat.writeRecord(row)
    }

    // 执行preparedStatement,并关闭此实例的所有资源
    jdbcOutputFormat.close()
  }


  /* /**
     * 更新某行数据(官网没给出更新示例,不知道实际是不是这样更新的)
     *
     * @param row 更新后的数据
     */
   def updateRow(row: Row): Unit = {
    
     // 准备输出格式
     val jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
       .setDrivername(driverClass)
       .setDBUrl(dbUrl)
       .setUsername(userNmae)
       .setPassword(passWord)
       .setQuery("update flink_test set name = ?, password = ? where id = ?")
       // 需要对应到行rowComb中的字段类型
       .setSqlTypes(Array[Int](Types.VARCHAR, Types.VARCHAR, Types.INTEGER))
       .finish()
 
     // 连接到目标数据库,并初始化preparedStatement
     jdbcOutputFormat.open(0, 1)
 
     // 组装sql中对应的字段,rowComb中的字段个数及类型需要与sql中的问号一致
     val rowComb = new Row(3)
     rowComb.setField(0, row.getField(1).asInstanceOf[String])
     rowComb.setField(1, row.getField(2).asInstanceOf[Int])
     rowComb.setField(2, row.getField(0).asInstanceOf[Int])
 
     // 添加记录到 preparedStatement,此时jdbcOutputFormat需要确保是开启的
     // 未指定列类型时,此操作可能会失败
     jdbcOutputFormat.writeRecord(rowComb)
 
     // 执行preparedStatement,并关闭此实例的所有资源
     jdbcOutputFormat.close()
   }
 */
  /**
    * 查询所有字段
    *
    * @return
    */
  def selectAllFields(env: ExecutionEnvironment) = {
    
    val inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
      .setDrivername(driverClass)
      .setDBUrl(dbUrl)
      .setUsername(userNmae)
      .setPassword(passWord)
      .setQuery("select * from flink_test;")
      // 这里第一个字段类型写int会报类型转换异常。
      .setRowTypeInfo(new RowTypeInfo(
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO))

    val source = env.createInput(inputBuilder.finish)
    source.print()
  }

}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TXBSW/article/details/84553311

智能推荐

tomcat 单机多实例_tlk20071的博客-程序员秘密

环境tomcat:/home/tomcat-6.0.45oms-client站点程序:/home/oms-client/oms-book-client站点tomcat实例:/home/oms-client/tomcat6-sever1端口:38080bms-client 站点程序:/home/bms-client/bms-book-clien

修复被病毒破坏的Winsock_weixin_34376562的博客-程序员秘密

近来的病毒和恶意软件是越来越招人烦,即使你把它清除掉它还是让你的电脑受伤并痛着。最近单位的多台电脑感染了病毒,在清除病毒后,却发生IE无法浏览网页的现象,笔者在处理问题的过程中积累了一些经验,在这里与大家一同分享。故障现象(可能满足其中一条或多条)1、在病毒被查杀之前IE能够浏览网页,一旦病毒被查杀,IE将无法浏览网页;2、检查网络状况,发现电脑能够p...

linux时间戳导出,LinuxPTP时间戳的生成_weixin_39630744的博客-程序员秘密

ingress timestamp下面是收到message时如何生成ingress timestamp的相关内容。以下内容出自sk_receive函数,ts和sw分别表示以硬件时间戳和软件时间戳生成的ingress timestamp。struct cmsghdr *cm;struct msghdr msg;recvmsg(fd, &amp;msg, flags);for (cm = CMSG_F...

delphi SimpleGraph控件的部分注释_cml2030的博客-程序员秘密

TSimpleGraph是一个delphi下的可视化图形开源控件,有很强编辑态和运行态编辑功能,是学习开发DELPHI控件的一个很好的例子,控件本身有很强的扩展性,完全可以自定义绘制各种图形.控件的地址为http://www.delphiarea.com/products/delphi-components/simplegraph/完全读懂这个控件,还是要读源码,吃透以下为零几年时首次...

使用ContentProvider_YuanHangDU的博客-程序员秘密

转载请注明出处: http://write.blog.csdn.net/mdeditor#!postId=51610936一、简介: ContentProvider 在Android中的作用是对外共享数据,也就是说可以通过ContentProvider把应用中的数据共享给其他应用访问,其他应用可以通过ContentProvider对应用中的数据进行增、删、该、查。使用ContentPr

Pisces的属性配置文件加载_hangwen0305的博客-程序员秘密

系统中访问properties文件中定义的配置项是很常见的需求,jdk提供java.util.Properties类加载指定的配置文件(当然不一定从本地文件、任意来源的流对象也可以)。在一个大型系统中,我们有可能需要定义多个配置文件,如果每个文件都需要声明一个Properties与之对应,那么代码会显得很啰嗦。方法一:统一写一个加载工具类,可以,但是不见得最好。方法二:借助spring fr...

随便推点

B/S-软件发展的方向?_socoolfj的博客-程序员秘密

B/S-软件发展的方向?  一、什么是C/S和B/S   第一、什么是C/S结构。C/S (Client/Server)结构,即大家熟知的客户机和服务器结构。它是软件系统体系结构,通过它可以充分利用两端硬件环境的优势,将任务合理分配到Client端和Server端来实现,降低了系统的通讯开销。目前大多数应用软件系统都是Client/Server形式的两层结构,由于现在的软件应用系统正在向分布式的W

云计算框架下基于信用的调度算法分析_zc_ft的博客-程序员秘密

整个程序是在Cloudsim3.0的框架下进行仿真的。

详解sprintf()&sprintf_s()_m0_37346206的博客-程序员秘密_sprintf_s

sprintf函数功能:把格式化的数据写入某个字符串头文件:stdio.h函数原型:int sprintf( char *buffer, const char *format [, argument] … );返回值:字符串长度(strlen)在将各种类型的数据构造成字符串时,sprintf 的功能很强大。sprintf 与printf 在用法上几乎一样,只是打印的目的地不同...

CSS之元素的显示与隐藏、CSS用户界面样式、溢出文字处理_zlinger1的博客-程序员秘密

元素显示隐藏的目的:主要目的是让一个元素在页面中消失,但是不是在文档源码中删除。最常见的是网站广告,当我们点击类似关闭不见了,但是我们重新刷新页面,依然会出现。display显示display设置或检索对象是否及如何显示。display:none 隐藏对象,与它相反的是display:block,除了转换为块级元素之外,同时还有显示元素的意思。特点:隐藏之后不再保留位置。visibil...

华为云数据中心建设全面复工进行中,预计2021年投入使用_NicolasLearner的博客-程序员秘密_华为江西云数据中心

华为云数据中心建设全面复工进行中,预计2021年投入使用(华为云贵州)推荐2020-04-13http://3g.donews.com/News/donews_detail/3089566.html位于贵州省贵安新区华为云数据中心A区项目施工现场,1000余名工人正在对工程进行紧张施工。华为云数据中心是由华为技术有限公司投资,中建四局第一建筑有限公司承建。该数据中心是集数据中心、生产交付中心、员工休息楼于一体的综合性建筑群。数据中心建设完工后,将有超过160个国家的重要数据机库在这里投入运...

推荐文章

热门文章

相关标签