I am trying to create azure data factory pipelines and resources using python. I was successful with certain ADF activities like Lookup, Copy .. but the problem I am facing here is I am trying to copy few tables from SQL to blob using FOR Each activity and it is throwing the below error
How would you create activities inside for each activity? Any inputs is greatly appreciated. Thanks!
Error Message
TypeError: 'CopyActivity' object is not iterable
Code Block
## Lookup Activity
ls_sql_name = 'ls_'+project_name+'_'+src_svr_type+'_dev'
linked_service_name =LinkedServiceReference(reference_name=ls_sql_name)
lkp_act_name ='Get Table Names'
sql_reader_query = "SELECT top 3 name from sys.tables where name like '%dim'"
source = SqlSource(sql_reader_query= sql_reader_query)
dataset= {"referenceName": "ds_sql_Dim_input","type": "DatasetReference"}
LookupActivity_ = LookupActivity(name =lkp_act_name, linked_service_name= linked_service_name, source = source, dataset = dataset
,first_row_only =False)
#create copy activity
ds_name = 'ds_sql_dim_input' #these datasets already created
dsOut_name ='ds_blob_dim_output' #these datasets already created
copy_act_name = 'Copy SQL to Blob(parquet)'
sql_reader_query = {"value": "@item().name","type": "Expression"}
sql_source = SqlSource(sql_reader_query=sql_reader_query)
blob_sink = ParquetSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=copy_act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=sql_source, sink=blob_sink)
## For Each Activity
pl_name ='pl_Test'
items= {"value": "@activity('Get Table Names').output.value","type": "Expression"}
dependsOn = [{"activity": "Get Table Names","dependencyConditions": ["Succeeded"]}]
ForEachActivity_= ForEachActivity(name = 'Copy tables in loop',items=items,depends_on=dependsOn ,activities =copy_activity)
params_for_pipeline = {}
p_obj = PipelineResource(activities=[LookupActivity_,ForEachActivity_], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, pl_name, p_obj)