I store time-series
data in HBase
. The rowkey is composed from user_id
and timestamp
, like this:
{
"userid1-1428364800" : {
"columnFamily1" : {
"val" : "1"
}
}
}
"userid1-1428364803" : {
"columnFamily1" : {
"val" : "2"
}
}
}
"userid2-1428364812" : {
"columnFamily1" : {
"val" : "abc"
}
}
}
}
Now I need to perform per-user analysis. Here is the initialization of hbase_rdd
(from here)
sc = SparkContext(appName="HBaseInputFormat")
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
The natural mapreduce-like way to process would be:
hbase_rdd
.map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value
.groupByKey()
.map(processUserData) # process user's data
While executing first map (shift timestamp from key to value) it is crucial to know when the time-series data of the current user is finished and therefore groupByKey transformation could be started. Thus we do not need to map over all table and store all the temporary data. It is possible because hbase stores row-keys in a sorted order.
With hadoop streaming it could be done in such way:
import sys
current_user_data = []
last_userid = None
for line in sys.stdin:
k, v = line.split('\t')
userid, timestamp = k.split('-')
if userid != last_userid and current_user_data:
print processUserData(last_userid, current_user_data)
last_userid = userid
current_user_data = [(timestamp, v)]
else:
current_user_data.append((timestamp, v))
The question is: how to utilize the sorted order of hbase keys within Spark?