Here is one possible solution. I assume you have client library for the key-value store(RocksDB in your case) that you want to access.
KeyValuePair
represents a bean class representing one Key-value pair from your key-value store.
Classes
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
Create KeyValue RDD
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
Note:
Above solution creates an RDD with two partitions by default(one of them will be empty). Increase the partitions before applying any transformation on keyValuePairRDD
to distribute the processing across executors.
Different ways to increase partitions:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)