1
votes

everybody! I'm coding a spark project mixing scala and java languages. Some classes were coding in Java, while i'm handling the Spark in Scala. I need use custom accumulator class (BagAccum extends AccumulatorV2[Int, Bag]) with Int as type in and Bag as type out. The Bag type is a java class in the project. The compiler show me the error:

BagAccum needs to be abstract, since method value in class AccumulatorV2 of type => structure.Bag is not defined main.scala /BagGraph/src line 75 Scala Problem

In this case, how i can define the Bag class? Below, the complete code for BagAccum scala class.

This is my code, declaring the BagAccum class.

class BagAccum extends AccumulatorV2[Int, Bag] { 
    private var bag:Bag = new Bag
    override def isZero: Boolean = bag.empty()
    override def copy(): BagAccum = {
      val newBag = new BagAccum
      newBag.bag = this.bag
      newBag
    }
    override def reset(): Unit = bag = null
    override def add(v: Int): Unit ={
        bag.insert(v)
    }
    override def merge(other: AccumulatorV2[Int, Bag]): Unit = bag.merge(other.value)
    def size():Int = bag.size()
}
1
you should add some code to show you triedBenjamin Breton
I posted as answer. The stack showed me error.Fernando de Sá
I post as answer below. Thanks!Fernando de Sá
@FernandodeSá you can remove your answer now :)koiralo
Not all methods are implemented in your classsj li

1 Answers

0
votes

I tried different solutions, but the more simple is solely coding the accumulator in Java, like this:

{
    public  class BagAccumulator extends AccumulatorV2<Integer,Bag>
    {
        private static final long serialVersionUID = -3739727823287550826L;

        private Bag _value = new Bag();

        public BagAccumulator() {
        }

        public BagAccumulator(Bag arg0) {
            _value = arg0;
        }

        @Override
        public void add(Integer arg0) {
            _value.insert(arg0);
        }

        @Override
        public AccumulatorV2<Integer, Bag> copy() {
            return new BagAccumulator(_value);
        }

        @Override
        public boolean isZero() {
            return _value.empty();
        }

        @Override
        public void merge(AccumulatorV2<Integer, Bag> arg0) {
            _value.merge(arg0.value());
        }

        @Override
        public void reset() {
            _value.clear();
        }

        @Override
        public Bag value() {
            return _value;
        }
    }
}