While coding in Java in Spark, I have been facing the problems with parameters in reduceByKey in Spark. I didn't understand the parameters used in the reduceByKey function. I know that what reduceByKey means and the way it works. However, the codes below are a little different from the basic spark code examples (ex. word count example)
As you can see, there are two parameters in reduceByKey which are new KrukalReducer(numPoints) and numSubGraphs. numSubGraphs is integer value and the KruskalReducer is java class.
mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
new KruskalReducer(numPoints), numSubGraphs);
I did't understand why such integer variables are used for reduceByKey. I tried to connect two parameters to the concept with ReduceByKey but failed to get it.
I attached the java class for your information.
public static final class KruskalReducer implements Function2<Iterable<Edge>, Iterable<Edge>, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
private transient UnionFind uf = null;
private final int numPoints;
public KruskalReducer(int numPoints) {
this.numPoints = numPoints;
}
// merge sort
@Override
public Iterable<Edge> call(Iterable<Edge> leftEdges, Iterable<Edge> rightEdges) throws Exception{
uf = new UnionFind(numPoints);
List<Edge> edges = Lists.newArrayList();
Iterator<Edge> leftEdgesIterator = leftEdges.iterator();
Iterator<Edge> rightEdgesIterator = rightEdges.iterator();
Edge leftEdge = leftEdgesIterator.next();
Edge rightEdge = rightEdgesIterator.next();
Edge minEdge;
boolean isLeft;
Iterator<Edge> minEdgeIterator;
final int numEdges = numPoints - 1;
do {
if (leftEdge.getWeight() < rightEdge.getWeight()) {
minEdgeIterator = leftEdgesIterator;
minEdge = leftEdge;
isLeft = true;
} else {
minEdgeIterator = rightEdgesIterator;
minEdge = rightEdge;
isLeft = false;
}
if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
edges.add(minEdge);
}
minEdge = minEdgeIterator.hasNext() ? minEdgeIterator.next() : null;
if (isLeft) {
leftEdge = minEdge;
} else {
rightEdge = minEdge;
}
}while (minEdge != null && edges.size() < numEdges);
minEdge = isLeft ? rightEdge : leftEdge;
minEdgeIterator = isLeft ? rightEdgesIterator : leftEdgesIterator;
while (edges.size() < numEdges && minEdgeIterator.hasNext()) {
if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
edges.add(minEdge);
}
minEdge = minEdgeIterator.next();
}
return edges;
}
}
Additionally, the full related codes are shown as below. (You can skip this code if you get confused)
JavaPairRDD<Integer, Iterable<Edge>> mstToBeMerged = partitions.combineByKey(new CreateCombiner(),
new Merger(), new KruskalReducer(numPoints));
JavaPairRDD<Integer, Iterable<Edge>> mstToBeMergedResult = null;
while (numSubGraphs > 1){
numSubGraphs = (numSubGraphs + (K - 1)) / K;
mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
new KruskalReducer(numPoints), numSubGraphs);
mstToBeMerged = mstToBeMergedResult;
displayResults(mstToBeMerged);
}
private static class CreateCombiner implements Function<Edge, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
@Override
public Iterable<Edge> call(Edge edge) throws Exception {
List<Edge> edgeList = Lists.newArrayListWithCapacity(1);
edgeList.add(edge);
return edgeList;
}
}
private static class Merger implements Function2<Iterable<Edge>, Edge, Iterable<Edge>>{
private static final long serialVersionUID = 1L;
@Override
public Iterable<Edge> call(Iterable<Edge> list, Edge edge) throws Exception {
List<Edge> mergeList = Lists.newArrayList(list);
mergeList.add(edge);
return mergeList;
}
}