0
votes

Query I'm using to create table,

CREATE TABLE counts_tbl ( id VARCHAR PRIMARY KEY, counts BINARY ) WITH
\"template=partitioned\""

Apache spark User Defined function -

val countListUdf = udf((countList: WrappedArray[String]) => {
        var countMap: HashMap[String, Long] = HashMap.empty[String, Long]
        countList.foreach(c => {
            if (countMap.contains(c)) {
                countMap(c) = requestCountMap(c) + 1
            } else {
                countMap(c) = 1
            }
        })
        countMap
    })

Storing the dataframe to apache ignite -

df.
agg(
  requestListUdf(collect_list("count")).as("counts")
).
withColumn("id", udf(() => java.util.UUID.randomUUID().toString)).
write.
     format(IgniteDataFrameSettings.FORMAT_IGNITE).
      option(IgniteDataFrameSettings.OPTION_CONFIG_FILE,
"/usr/local/ignite/examples/config/example-ignite.xml").
      option(IgniteDataFrameSettings.OPTION_TABLE, "counts_tbl").
      option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS,
"id").
      option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS,
"template=replicated").
      mode(SaveMode.Append).
      save()

Warning while saving dataframe -

WARN BinaryContext:576 - Class "scala.collection.immutable.HashMap$SerializationProxy" cannot be serialized using BinaryMarshaller because it either implements Externalizable interface or have writeObject/readObject methods. OptimizedMarshaller will be used instead and class instances will be deserialized on the server. Please ensure that all nodes have this class in classpath. To enable binary serialization either implement Binarylizable interface or set explicit serializer using BinaryTypeConfiguration.setSerializer() method.

Querying the table-

var queryResult = cache.query(new SqlFieldsQuery("SELECT * FROM
counts_tbl")).getAll();

Result Set -

java.util.List[java.util.List[_]] = [[012415a4-9f12-454b-b800-098fac56842f,
[B@1600e52d], [046845b4-96f3-46e0-93fc-d8735c00a8f6, [B@16bb3fc]]

Could you please guide me to deserialize counts field in result set?

And also whats the right way to read/write a dataframe with hashmap?

Methods I've tried to deserialize:

  1. Using ObjectInputStream
import scala.collection.JavaConversions._
var biValue = resultSet(0)(1).asInstanceOf[Array[Byte]]
var ois = new ObjectInputStream(new ByteArrayInputStream(biValue))

Error - 
java.io.StreamCorruptedException: invalid stream header: FE920800
  at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:866)
  at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
  ... 55 elided
  1. Using OptimizedMarshaller
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller
import scala.collection.JavaConversions._
var marshaller = new OptimizedMarshaller
var emptyMap = scala.collection.mutable.HashMap.empty[String, Long]
var biValue = resultSet(0)(1).asInstanceOf[Array[Byte]]
marshaller.unmarshal(biValue, emptyMap.getClass.getClassLoader())

Error - 
org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: sun.misc.Launcher$AppClassLoader@4534b60d
  at org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:266)
  at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
  ... 55 elided

Thanks

1

1 Answers

0
votes

You can serialize this map into JSON and then store it as VARCHAR.

Alternatively, you can go the indexedTypes route, where you can store objects of arbitrary composition, they will not be available with JDBC, but with SqlFieldsQuery they would. However, I'm not sure how this will work with Spark.