重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇内容介绍了“flink将数据录入数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联建站是一家集网站设计、成都网站设计、网站页面设计、网站优化SEO优化为一体的专业的建站公司,已为成都等多地近百家企业提供网站建设服务。追求良好的浏览体验,以探求精品塑造与理念升华,设计最适合用户的网站页面。 合作只是第一步,服务才是根本,我们始终坚持讲诚信,负责任的原则,为您进行细心、贴心、认真的服务,与众多客户在蓬勃发展的市场环境中,互促共生。
//主类 package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.CheckpointingMode object StreamingTest { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些属性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消费组 kafkaProps.setProperty("group.id", "group2") //获取当前的执行环境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //kafka的consumer,test1是要消费的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps) //kafkaSource.assignTimestampsAndWatermarks(assigner) //设置从最新的offset开始消费 //kafkaSource.setStartFromGroupOffsets() kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的时间间隔 //evn.enableCheckpointing(2000) //添加consumer val stream = evn.addSource(kafkaSource) evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) //stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split(" ")filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .map(x=>{(x._1,(new Integer(x._2)))}) //text.print() //启动执行 text.addSink(new Ssinks()) evn.execute("kafkawd") } }
//自定义sink package flink.streaming import java.sql.Connection import java.sql.PreparedStatement import java.sql.DriverManager import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.configuration.Configuration class Ssinks extends RichSinkFunction[(String,Integer)]{ var conn:Connection=_; var pres:PreparedStatement = _; var username = "root"; var password = "123456"; var dburl = "jdbc:MySQL://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false"; var sql = "insert into words(word,count) values(?,?)"; override def invoke(value:(String, Integer) ) { pres.setString(1, value._1); pres.setInt(2,value._2); pres.executeUpdate(); System.out.println("values :" +value._1+"--"+value._2); } override def open( parameters:Configuration) { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(dburl, username, password); pres = conn.prepareStatement(sql); super.close() } override def close() { pres.close(); conn.close(); } }
“flink将数据录入数据库”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!