重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要介绍“spark读取hbase的数据实例代码”,在日常操作中,相信很多人在spark读取hbase的数据实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spark读取hbase的数据实例代码”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
目前创新互联公司已为近千家的企业提供了网站建设、域名、虚拟主机、网站托管、服务器托管、企业网站设计、双河网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
package hgs.spark.hbase //https://blog.csdn.net/mlljava1111/article/details/52675901 import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.FilterList.Operator import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.Base64 import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.filter.LongComparator object HbaseToSpark { def main(args: Array[String]): Unit = { //System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val conf = new SparkConf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.setMaster("local").setAppName("hbasedata") val context = new SparkContext(conf) //hbase配置 val hconf = new HBaseConfiguration hconf.set("hbase.zookeeper.quorum", "bigdata00:2181,bigdata01:2181,bigdata02:2181") hconf.set("hbase.zookeeper.property.clientPort", "2181") hconf.set(TableInputFormat.INPUT_TABLE, "test") val scan = new Scan //扫描的表rowkey的开始和结束 scan.setStartRow("1991".getBytes) scan.setStopRow("3000".getBytes) //val list = new FilterList(Operator.MUST_PASS_ALL) //val filter1 = new RowFilter(CompareOp.GREATER_OR_EQUAL,new LongComparator(1991)) //val filter2 = new RowFilter(CompareOp.LESS_OR_EQUAL,new RegexStringComparator("3000*")) // list.addFilter(filter1) // list.addFilter(filter2) //scan.setFilter(list) //添加scan hconf.set(TableInputFormat.SCAN, convertScanToString(scan)) val hrdd = context.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val resultrdd = hrdd.repartition(2) //打印结果 resultrdd.foreach{case(_,value)=>{ val key = Bytes.toString(value.getRow) val name = Bytes.toString(value.getValue("cf1".getBytes, "name".getBytes)) val age = Bytes.toString(value.getValue("cf1".getBytes, "age".getBytes)) println("rowkey:"+key+" "+"name:"+name+" "+"age:"+age) } } context.stop() } def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } }
到此,关于“spark读取hbase的数据实例代码”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!