5
votes

I want to encrypt values in one column of my Pandas (or py/spark) dataframe, e.g. to take the the column mobno in the following dataframe, encrypt it and put the result in the encrypted_value column:

encrypted-dataframe

I want to use AWS KMS encryption key. My question is: what is the most elegant way how to achieve this?

I am thinking about using UDF, which will call the boto3's KMS client. Something like:

@udf
def encrypt(plaintext):
  response = kms_client.encrypt(
    KeyId=aws_kms_key_id,
    Plaintext=plaintext
  )
  ciphertext = response['CiphertextBlob']
  return ciphertext

and then applying this udf on the whole column.

But I am not quite confident this is the right way. This stems from the fact that I am an encryption-rookie - first, I don't even know this kms_client_encrypt function is meant for encrypting values (from the columns) or it is meant for manipulate the keys. Maybe the better way is to obtain the key and then use some python encryption library (such as hashlib).

I would like to have some clarification on the encryption process and also recommendation what the best approach to column encryption is.

1

1 Answers

0
votes

To avoid many calls to the KMS service in a UDF, use AWS Secrets Manager instead to retrieve your encryption key and pycrypto to encrypt the column. The following works:

from pyspark.sql.functions import udf, col
from Crypto.Cipher import AES

region_name = "eu-west-1"
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secret_key = json.loads(get_secret_value_response['SecretString'])
clear_text_column = 'mobo'

def encrypt(key, text):
    obj = AES.new(key, AES.MODE_CFB, 'This is an IV456')
    return obj.encrypt(text)

def udf_encrypt(key):
    return udf(lambda text: encrypt(key, text))

df.withColumn("encrypted", udf_encrypt(secret_key)(col(clear_text_column))).show()

Or alternatively, using more efficient Pandas UDF as suggested by @Vektor88 (PySpark 3 syntax):

from functools import partial

encrypt_with_key = partial(encrypt, secret_key)

@pandas_udf(BinaryType())
def pandas_udf_encrypt(clear_strings: pd.Series) -> pd.Series:
    return clear_strings.apply(encrypt_with_key)

df.withColumn('encrypted', pandas_udf_encrypt(clear_text_column)).show()