0
votes

I am trying to create azure data factory pipelines and resources using python. I was successful with certain ADF activities like Lookup, Copy .. but the problem I am facing here is I am trying to copy few tables from SQL to blob using FOR Each activity and it is throwing the below error

How would you create activities inside for each activity? Any inputs is greatly appreciated. Thanks!

Ref: https://docs.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.foreachactivity?view=azure-python

Error Message

TypeError: 'CopyActivity' object is not iterable

Code Block

## Lookup Activity
ls_sql_name = 'ls_'+project_name+'_'+src_svr_type+'_dev'
linked_service_name =LinkedServiceReference(reference_name=ls_sql_name)
lkp_act_name ='Get Table Names'
sql_reader_query = "SELECT top 3 name from sys.tables where name like '%dim'"
source = SqlSource(sql_reader_query= sql_reader_query)
dataset= {"referenceName": "ds_sql_Dim_input","type": "DatasetReference"}
LookupActivity_ = LookupActivity(name =lkp_act_name, linked_service_name= linked_service_name, source = source, dataset = dataset
                                ,first_row_only =False)


#create copy activity
ds_name = 'ds_sql_dim_input' #these datasets already created
dsOut_name ='ds_blob_dim_output' #these datasets already created

copy_act_name = 'Copy SQL to Blob(parquet)'
sql_reader_query =  {"value": "@item().name","type": "Expression"}
sql_source = SqlSource(sql_reader_query=sql_reader_query)
blob_sink = ParquetSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=copy_act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=sql_source, sink=blob_sink)

## For Each Activity
pl_name ='pl_Test'
items= {"value": "@activity('Get Table Names').output.value","type": "Expression"}
dependsOn = [{"activity": "Get Table Names","dependencyConditions": ["Succeeded"]}]
ForEachActivity_= ForEachActivity(name = 'Copy tables in loop',items=items,depends_on=dependsOn ,activities =copy_activity)

params_for_pipeline = {}
p_obj = PipelineResource(activities=[LookupActivity_,ForEachActivity_], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, pl_name, p_obj)
1

1 Answers

1
votes

Activities needs to be a list of Activity, and you are passing a single one. Try creating a list and add the copy activity to it, and the pass that list in the activities parameter.