I am able to connect to AWS Managed Cassandra Service, using the below code snippet.
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
I was getting the below exception while writing Stream POJO to Cassandra.
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency level LOCAL_ONE is not supported for this operation. Supported consistency levels are: LOCAL_QUORUM
I was able to solve this issue in another project(without using flink), by setting ConsistencyLevel = LOCAL_QUORUM, using the below snippet.
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
When I tried the same in flink, I am getting the below error:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.datastax.driver.core.QueryOptions@130161f7 is not serializable. The object probably contains or references non serializable fields.
Is there anything, I am missing out? Kindly elaborate on how to connect/write to MCS using Flink Cassandra connector.
PS:
- I have used the below command for creating keyspace.
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
I didn't use AmazonRootCA1.pem in my code.
I am not using cassandra_truststore.jks in my code or environment.
I had Installed Certificate
temp_file.der
certificate, which was created by following these steps.I am using Flink 1.8.2, since that is the environment version available in Kinesis Data Analytics.
UPDATE 07-04-2020
I am able to fix the serialization issue by creating a Serializable wrapper for QueryOptions. Please find the code snippet below:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
With this solution, I was able to set the consistency level to LOCAL_QUORUM in the code and run without any exceptions.
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
But while writing to MCS, I am getting the same error:
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency level LOCAL_ONE is not supported for this operation. Supported consistency levels are: LOCAL_QUORUM
Any help would be deeply appreciated!