2
votes

I have a cassandra table with around 500+ million records (In 6 nodes), now I'm trying to insert data using spark-cassandra-connector in Amazon EMR

Table Structure

  CREATE TABLE dmp.dmp_user_profiles_latest (
        pid text PRIMARY KEY,
        xnid int,
        day_count map<text, int>,
        first_seen map<text, timestamp>,
        last_seen map<text, timestamp>,
        usage_count map<text, int>,
        city text,
        country text,
        lid set<text>,

    )WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 172800
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.1
    AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count);
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country);

The following are my spark-submit options

--class com.mobi.vserv.driver.Query5kPids1
--conf spark.dynamicAllocation.enabled=true  
--conf spark.yarn.executor.memoryOverhead=1024    
--conf spark.yarn.driver.memoryOverhead=1024 
--executor-memory 1g
--executor-cores 2
--driver-memory 4g

But in the logs I have seen writing to Cassandra takes around 4-5 minutes for loading 2 lakh (200,000) records(while total execution time is 6+ minutes)

I have added the following in Spark conf also

conf.set("spark.cassandra.output.batch.size.rows", "auto");
conf.set("spark.cassandra.output.concurrent.writes", "500");
conf.set("spark.cassandra.output.batch.size.bytes", "100000");
conf.set("spark.cassandra.output.throughput_mb_per_sec","1");

But still there is no performance improvement , also increasing the no of cores in Amazon EMR doesn't help.

Please note that In my Cassandra table we have not used any partitioning/clustering column , so could this be the reason for such slow performance.

Please Note Network speed is 30 MB PS an primary key is an Alphanumeric Values eg - a9be3eb4-751f-48ee-b593-b3f89e18622d

Cassandra.yaml

cluster_name: 'dmp Cluster'
num_tokens: 100
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
     - /data/cassandra/data
disk_failure_policy: stop
commit_failure_policy: stop

key_cache_size_in_mb:

key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
 - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
 - seeds: "10.142.76.97,10.182.19.301"

concurrent_reads: 256
concurrent_writes: 128
concurrent_counter_writes: 32

memtable_allocation_type: heap_buffers
memtable_flush_writers: 8
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: 10.142.76.97
start_rpc: true
rpc_address: 10.23.244.172
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
batch_size_warn_threshold_in_kb: 5
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 64
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 500000

range_request_timeout_in_ms: 1000000

write_request_timeout_in_ms: 200000

counter_write_request_timeout_in_ms: 500000

cas_contention_timeout_in_ms: 100000

endpoint_snitch: Ec2Snitch

dynamic_snitch_update_interval_in_ms: 100

dynamic_snitch_reset_interval_in_ms: 600000

dynamic_snitch_badness_threshold: 0.1

request_scheduler: org.apache.cassandra.scheduler.NoScheduler

server_encryption_options:
    internode_encryption: none
    keystore: conf/.keystore
    keystore_password: cassandra
    truststore: conf/.truststore
    truststore_password: cassandra

client_encryption_options:
    enabled: false
    keystore: conf/.keystore
    keystore_password: cassandra

internode_compression: all

inter_dc_tcp_nodelay: false
1
can we have your database structure?Whitefret
Do you have access to the node? To see how your database is scatter across the cluster? Maybe all your records goes to the same node (thus increasing the amount of nodes is useless)Whitefret
Yes I do have access , how do I check that ? As nodetool status shows 6 nodes up and running and each of them having 100 tokensRahul Koshaley
I don't really know, I don't use Amazon EMR :/ I just wanted to know if by chance, all your pid hash fall in the same range (which would be very unfortunate)Whitefret
Amazon EMR I'm using for processing, while the Cassandra is in EC2 (6 nodes)Rahul Koshaley

1 Answers

1
votes

As talked in the comment, it seems your problem comes from your index on day_count.

As seen in this page, index won't be efficient if you must update them all the time, and it does when you insert a different value into day_count (which is possibly everytime).

You need to rework your database, but as this is your production environment, you can't just DROP INDEX IF EXISTS keyspace.index_name if this index is necessary, but you could create a secondary database using day_count as the primary key, or use day_count as an ordering index.