java – 如何使用spark来处理一系列hbase行?

前端之家收集整理的这篇文章主要介绍了java – 如何使用spark来处理一系列hbase行?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我试图用HBase作为火花的数据源.所以第一步就是从HBase表创建一个RDD.由于Spark使用hadoop输入格式,我可以通过创建rdd http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase找到一种方法来使用所有行.但是如何创建一个范围扫描的RDD?

欢迎所有的建议.

解决方法

以下是使用Spark中的扫描的示例:
import java.io.{DataOutputStream,ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE,"table_name")
conf.set(TableInputFormat.SCAN,convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
rdd.count

您需要将相关的库添加到Spark类路径,并确保它们与您的Spark兼容.提示:您可以使用hbase classpath找到它们.

猜你在找的Java相关文章