Fetch data from HBase table in Spark

1137250 pts.
Tags:
Big Data
HBase
Spark
We have this huge table in HBase that's named UserAction. It has three different column families. We're trying to fetch all of the data from one column family as a JavaRDD object. We've tried using the code below but it's not working. What else can we do?
    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD>() {

                @Override
                public JavaRDD call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD, JavaRDD, JavaRDD>() {
                @Override
                public JavaRDD call(JavaRDD r1,
                        JavaRDD r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}
Thank you.
0

Answer Wiki

Thanks. We'll let you know when a new response is added.
Send me notifications when members answer or reply to this question.

Discuss This Question:  

 
There was an error processing your information. Please try again later.
Thanks. We'll let you know when a new response is added.
Send me notifications when members answer or reply to this question.

Forgot Password

No problem! Submit your e-mail address below. We'll send you an e-mail containing your password.

Your password has been sent to:

To follow this tag...

There was an error processing your information. Please try again later.

Thanks! We'll email you when relevant content is added and updated.

Following

Share this item with your network: