0
votes

I am a junior developer and I was in charge of implementing the Facebook API to an existing project. However, the business team figured out that the Google Analytics results displayed on BigQuery are wrong. They asked me to fix it. This is the architecture:

enter image description here

What I have done is:

  • On BigQuery, checking how close/far are the results from Google Analytics. I found there is a pattern, the results I am getting on BigQuery are always either 1, 2 or 3 times the original value of GA.

  • I checked if there is actually multiple cron jobs on the Compute Engine. There is actually only 1 cron job and running once a day.

  • I verified the results on Google Cloud Storage. And the result on Google Cloud Storage are correct as you can see bellow:

enter image description here

Based on those informations, I strongly believe that the issue is coming from the Cloud Function as it's the only element between GCS and BQ. I have look at the Cloud Function that trigger files from GCS and I could not find any duplicate operations.

Do you know how can I find the issue?

Cloud Function

BUCKET = "xxxx"
GOOGLE_PROJECT = "xxxx"
HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Last Non-Direct Click Conversions": "last_non_direct_click_conversions",
    "Last Non-Direct Click Conversion Value": "last_non_direct_click_conversion_value",
    "Last Click Prio Conversions": "last_click_prio_conversions",
    "Last Click Prio Conversion Value": "last_click_prio_conversion_value",
    "Data-Driven Conversions": "dda_conversions",
    "Data-Driven Conversion Value": "dda_conversion_value",
    "% Change in Conversions from Last Non-Direct Click to Last Click Prio": "last_click_prio_vs_last_click",
    "% Change in Conversions from Last Non-Direct Click to Data-Driven": "dda_vs_last_click"
}

SPEND_HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Spend": "spend"
}

tables_schema = {
    "google-analytics": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("goal", bigquery.enums.SqlTypeNames.STRING, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversions", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE')
    ],
    "google-analytics-spend": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("spend", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
    ]
}


def download_from_gcs(file):
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.get_blob(file['name'])
    file_name = os.path.basename(os.path.normpath(file['name']))
    blob.download_to_filename(f"/tmp/{file_name}")
    return file_name


def load_in_bigquery(file_object, dataset: str, table: str):
    client = bigquery.Client()
    table_id = f"{GOOGLE_PROJECT}.{dataset}.{table}"
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        schema=tables_schema[table]
    )

    job = client.load_table_from_file(file_object, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.


def __order_columns(df: pd.DataFrame, spend=False) ->pd.DataFrame:
    # We want to have source and medium columns at the third position
    # for a spend data frame and at the fourth postion for others df
    # because spend data frame don't have goal column.
    pos = 2 if spend else 3

    cols = df.columns.tolist()
    cols[pos:2] = cols[-2:]
    cols = cols[:-2]
    return df[cols]


def __common_transformation(df: pd.DataFrame, date: str, goal: str) -> pd.DataFrame:
    # for any kind of dataframe, we add date and week columns
    # based on the file name and we split Source/Medium from the csv
    # into two different columns

    week_of_the_year = datetime.strptime(date, '%Y-%m-%d').isocalendar()[1]
    df.insert(0, 'date', date)
    df.insert(1, 'week', week_of_the_year)
    mapping = SPEND_HEADER_MAPPING if goal == "spend" else HEADER_MAPPING
    print(df.columns.tolist())
    df = df.rename(columns=mapping)
    print(df.columns.tolist())
    print(df)
    df["source_medium"] = df["source_medium"].str.replace(' ', '')
    df[["source", "medium"]] = df["source_medium"].str.split('/', expand=True)
    df = df.drop(["source_medium"], axis=1)
    df["week"] = df["week"].astype(int, copy=False)
    return df


def __transform_spend(df: pd.DataFrame) -> pd.DataFrame:
    df["spend"] = df["spend"].astype(float, copy=False)
    df = __order_columns(df, spend=True)
    return df[df.columns[:6]]


def __transform_attribution(df: pd.DataFrame, goal: str) -> pd.DataFrame:
    df.insert(2, 'goal', goal)
    df["last_non_direct_click_conversions"] = df["last_non_direct_click_conversions"].astype(int, copy=False)
    df["last_click_prio_conversions"] = df["last_click_prio_conversions"].astype(int, copy=False)
    df["dda_conversions"] = df["dda_conversions"].astype(float, copy=False)
    return __order_columns(df)


def transform(df: pd.DataFrame, file_name) -> pd.DataFrame:
    goal, date, *_ = file_name.split('_')
    df = __common_transformation(df, date, goal)
    # we only add goal in attribution df (google-analytics table).
    return __transform_spend(df) if "spend" in file_name else __transform_attribution(df, goal)


def main(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    file = event

    file_name = download_from_gcs(file)
    df = pd.read_csv(f"/tmp/{file_name}")

    transformed_df = transform(df, file_name)

    with open(f"/tmp/bq_{file_name}", "w") as file_object:
        file_object.write(transformed_df.to_csv(index=False))

    with open(f"/tmp/bq_{file_name}", "rb") as file_object:
        table = "google-analytics-spend" if "spend" in file_name else "google-analytics"
        load_in_bigquery(file_object, dataset='attribution', table=table)

update

Yes, the cloud function is triggered by the GCS object finalize event. Moreover, the function won't be automatically retried on failure.

I am following your suggestions and I am now checking the log table on my Cloud Function page. On the last 10 lines of logs data, it seems that 3 different instances of the Cloud Function were run. I am not able to get more details when I am expanding each lines.

enter image description here

I am also going to check BigQuery logs now. I guess the easiest solution would be to use BigQueryAuditMetadata and get logs about when the table was updated?

1
"I am not able to get more details when I am expanding each lines." I saw in your code some 'print' functions. You can use that (at least for debugging, temporary), and add those 'print' in any place. For example - to get and print metadata information about the object, which is supposed to be handled. For example, you have an 'event' payload. You can use "event['crc32c']", "event['md5Hash']", "event['timeCreated']" and print them (into the log).al-dann
In terms of BigQuery job history - just go to the BigQuery console UI, then choose a "Job History" tab - and have a look there. Bear in mind - the project, presumably based on your code, - should be where the cloud function is deployed, as it looks that the job should be in that project. And it is not your personal job.al-dann
Now I am pretty sure there is multiple Cloud Function invocations. I tried to make a .txt file to keep track of the files that have already been uploaded to BigQuery. I was thinking to add an if/else to check if the file to upload was already present in that list. If the .txt file was present on the list, I would skip. Else, I will upload the file to BQ and write the filename to the list. But this does not work as its not possible to write on a Cloud Function. Do you have any other advices?Pierre56
I am a bit busy right now, but I will come back and update my answer later today. Very shortly - the function is to be idempotent, and the state of the process (if the data/file was uploaded into BQ or not) should be kept outside of the cloud function. A text file (in some GCS bucket) is an option, but GCS has plenty of drawbacks in this particular case. A firestore - is much, much better choice.al-dann

1 Answers

2
votes

From my point of view, this is a very big topic, so it might very difficult to provide one precise solution to solve all issues. So, I won't be able to solve the issue, but I can only express some personal observations and provide some suggestions.

The cloud function is triggered by the GCS object finalize event - can you check that this is correct, please? In that case, the event is 'going through the PubSub' before triggering the cloud function invocation. Now there are 2 things to have in mind:

  • The PubSub is based on 'deliver at least once' paradigm, thus duplicate message deliveries are possible.
  • Such cloud function invocation has an automatic acknowledgement. And the developer has not control over that. The PubSub cannot be used to control the overall process state. And the longer the cloud function is being executed (up to timeout 540 seconds or more), the more chances that the PubSub makes (an internal) decision that the message has not been delivered, therefore it should be delivered again, thus a new invocation of the cloud function.

Some additional details are here: Issue: Cloud Function explicit acknowledgement of a pubsub message

Now, how to see if that happens. Personally I would start with the logging. When a cloud function starts, I would log the object name and some hash code (i.e. CRC32C or MD5, etc. which are available from the event metadata) - just in the cloud function code. In that case, I would be able to see many cloud function invocations for one GCS object from the logs (if that happens). Another good idea - to get information - how long a cloud function is being executed.

By doing that step we can check if the cloud function is called more than once fora given GCS object.

The next step - how we load the data. The 'load' is a job. It means that there exist a queue and a scheduler (somewhere in GCP BigQuery service). And the load job stays in the queue until it is picked up for loading, and then that job is performed/executed. All of that is extremely 'asynchronous'. Can you check if there are failed load jobs, please? That can be done though BigQuery UI in the simplest case.

On top of that, loading from inside of the cloud function memory - from my point to view - not only very expensive, but also very risky and unreliable. Even simple save the csv into a GCS bucket and load from the bucket - may be much better.

The next - there is a quota 1500 load jobs per table per day as far as I remember. If you have many files to load - you can easily exceed that quota.

Alternative way for 'loading' data - use streaming. It does not have such quota limitations, but it is chargeable, thus you are to pay for streaming.

I will stop for now, let me know if the above was useful, please. And in what direction you are going to develop your solution.

=> Update 04 February 2021 10:50 GMT

To avoid copy and paste - see the answer here: Cloud Function running multiple times instead of once