3
votes

I am using Cloud Tasks. I need to trigger the execution of Task C only when Task A and Task B have been completed successfully. So I need some way of reading / being notified of the statuses of Tasks triggered. But I see no way of doing this in GCP's documentation. Using Node.js SDK to create tasks and Cloud Functions as task handlers if at all that helps.

Edit:

As requested, here is more info on what we are doing:

Tasks 1 - 10 each make HTTP requests, fetch data, update individual collections in Firestore based on this data. These 10 tasks can run in parallel and in no particular order as they don't have any dependency on each other. All of these tasks are actually implemented inside GCF.

Task 11 actually depends on the Firestore collection data updated by Tasks 1 - 10. So it can only run after Tasks 1 - 10 are completed successfully.

We do issue a RunID as a common identifier to group a particular run of all tasks (1 - 11).

2
If you add more context on what the tasks do and how they depend on each other we will be able to point you better. - Juancki
Question edited to give you more info - Raj Chaudhary

2 Answers

3
votes

Cloud Task only trigger task, you can only define time condition. You have to code manually the check when the task C run.

Here an example of process:

  • Task A is running, at the end, the task write in firestore that is completed
  • Task B is running, at the end, the task write in firestore that is completed
  • Task C start and check if A and B are completed in firestore.
    • If not, the task exit in error
    • Is yes, continue the process

You have to customize your C task queue for retrying the task in case of error.

Another, expensive, solution is to use Cloud Composer for handling this workflow

There is no other solution for now about workflow management.

1
votes

Cloud Tasks is not the tool you want to use in this case. Take a look into Cloud Composer which is built in top of Apache Airflow for GCP.

Edit: You could create a GCF to handle the states of those requests

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

################ TASK A
taskA_list = [
    "https://via.placeholder.com/400",
    "https://via.placeholder.com/410",
    "https://via.placeholder.com/420",
    "https://via.placeholder.com/430",
    "https://via.placeholder.com/440",
    "https://via.placeholder.com/450",
    "https://via.placeholder.com/460",
    "https://via.placeholder.com/470",
    "https://via.placeholder.com/480",
    "https://via.placeholder.com/490",
]

def call2TaskA(url):
    html = requests.get(url, stream=True)
    return (url,html.status_code)


processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in taskA_list:
        processes.append(executor.submit(call2TaskA, url))

isOkayToDoTaskB = True
for taskA in as_completed(processes):
    result = taskA.result()
    if result[1] != 200: # your validation on taskA
        isOkayToDoTaskB = False
    results.append(result)

if not isOkayToDoTaskB:
    raise ValueError('Problems: {}'.format(results))

################ TASK B
def doTaskB():
    pass

doTaskB()