3
votes

I'm trying to execute a Pyspark statement that writes to BigTable within a Python for loop, which leads to the following error (job submitted using Dataproc). Any client not properly closed (as suggested here) and if yes, any way to do so in Pyspark ?

Note that manually re-executing the script each time with a new Dataproc job works fine, so the job itself is correct.

Thanks for your support !

Pyspark script


from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import json

sc = SparkContext()
sqlc = SQLContext(sc) 

def create_df(n_start,n_stop):

    # Data
        
    row_1 = ['a']+['{}'.format(i) for i in range(n_start,n_stop)]
    row_2 = ['b']+['{}'.format(i) for i in range(n_start,n_stop)]
      
    # Spark schema
    
    ls = [row_1,row_2]
    schema = ['col0'] + ['col{}'.format(i) for i in range(n_start,n_stop)]
    
    # Catalog

    first_col = {"col0":{"cf":"rowkey", "col":"key", "type":"string"}}
    other_cols =  {"col{}".format(i):{"cf":"cf", "col":"col{}".format(i), "type":"string"} for i in range(n_start,n_stop)}
    
    first_col.update(other_cols)
    columns = first_col
        
    d_catalogue = {}
    d_catalogue["table"] = {"namespace":"default", "name":"testtable"}
    d_catalogue["rowkey"] = "key"
    d_catalogue["columns"] = columns
        
    catalog = json.dumps(d_catalogue)
    
    # Dataframe

    df = sc.parallelize(ls, numSlices=1000).toDF(schema=schema) 
    
    return df,catalog

for i in range(0,2):

   N_step = 100
   N_start = 1
   N_stop = N_start+N_step

   data_source_format = "org.apache.spark.sql.execution.datasources.hbase"

   df,catalog = create_df(N_start,N_stop)
   
   df.write\
        .options(catalog=catalog,newTable= "5")\
            .format(data_source_format)\
                .save()

   N_start += N_step
   N_stop += N_step

Dataproc job

gcloud dataproc jobs submit pyspark <my_script>.py \
    --cluster $SPARK_CLUSTER \
        --jars <path_to_jar>/bigtable-dataproc-spark-shc-assembly-0.1.jar \
            --region=us-east1 

Error

...
ERROR com.google.bigtable.repackaged.io.grpc.internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=41, target=bigtable.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
...
1
Did you try other languages? like the sample here github.com/GoogleCloudPlatform/cloud-bigtable-examples/blob/… - Dagang
Could you provide the full PySpark code? I can help reproduce. - Dagang
@Dagang. Thanks for proposing to help. Yes I've run the entire tutorial in Scala, though not tested the loop: stackoverflow.com/questions/65483442/… - py-r
@Dagang: I've added full Pyspark code. Thanks ! - py-r
Are you using the latest version of gcloud? If not, try updating to the latest version. It looks similar to this issue that was fixed recently. github.com/googleapis/java-bigtable-hbase/issues/2504 cloud.google.com/sdk/gcloud/reference/components/update - Oliver Aragon

1 Answers

1
votes

If you are not using the latest version, try updating to it. It looks similar to this issue that was fixed recently. I would imagine the error message still showing up, but the job now finishing means that the support team is still working on it and hopefully they will fix it in the next release.