I'm processing log files from s3 using pyspark and filtering them based on the date.
The are composed by compressed json files partitioned by year/month/day as follow:s3://bucket/logs/YYYY/MM/DD/<hash>.json.gz
As the partitioning don't follow the HDFS partition syntax (year=YYYY/month=MM/day=DD), I'm reading the whole folder and creating the columns using the input_file_name
and regex:
df = spark.read.option("compression", "gzip").text("s3a://bucket/logs/*/*/*/*.json.gz")
df = df.withColumn("path_file", input_file_name())
df = df.withColumn("logstash_date", regexp_extract(col('path_file'), r"(?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02})", 1))
df = df.withColumn("logstash_date", regexp_replace(col('logstash_date'), '/', '-').cast('date'))
df = df.filter(col("logstash_date") >= from_date.date())
# later on uses from_json to parse schema, apply more filters and do a join (to deduplicate logs)
Had the logs been partitioned using the HDFS syntax spark would be able to filter them without reading the actual data.
But even though I don't use the data itself spark seems to read it anyway.
Showing here the info from the UI:
The logical plan seems perfect, filtering the date before the Project [value#0 AS raw_data#15, path_file#2]
:
== Parsed Logical Plan ==
'InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, ['event_day], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
+- Repartition 10, false
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
+- Filter isnull(id#98)
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, id#98]
+- Join LeftOuter, (id#22 = id#98)
:- Filter (((((facility#35 = rails-production) && NOT (controller#33 = PingController)) && isnotnull(path#30)) && (isnotnull(user_id#25) && isnotnull(timestamp#65))) && (((isnotnull(method#28) && NOT (method#28 = HEAD)) && NOT client_id#23 LIKE converge_%) && (NOT controller#33 LIKE Hotsite::% && NOT message#32 LIKE somos_id%)))
: +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, to_date('timestamp, None) AS event_day#81]
: +- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, to_timestamp('timestamp, None) AS timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15]
: +- Project [data#18.id AS id#22, data#18.client_id AS client_id#23, data#18.somos_id AS somos_id#24, data#18.user_id AS user_id#25, data#18.session_id AS session_id#26, data#18.user_agent AS user_agent#27, data#18.method AS method#28, data#18.status AS status#29, data#18.path AS path#30, data#18.timestamp AS timestamp#31, data#18.message AS message#32, data#18.controller AS controller#33, data#18.action AS action#34, data#18.facility AS facility#35, raw_data#15]
: +- Project [raw_data#15, path_file#2, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), raw_data#15, Some(GMT)) AS data#18]
: +- Project [value#0 AS raw_data#15, path_file#2]
: +- Project [value#0, path_file#2]
: +- Filter (logstash_date#9 >= 18392)
: +- Project [value#0, path_file#2, cast(regexp_replace(logstash_date#5, /, -) as date) AS logstash_date#9]
: +- Project [value#0, path_file#2, regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1) AS logstash_date#5]
: +- Project [value#0, input_file_name() AS path_file#2]
: +- Relation[value#0] text
+- Project [id#98]
+- Filter (event_day#114 >= 18392)
+- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114] parquet
But in the optimized logical plan the 2 steps are joined together:
== Optimized Logical Plan ==
InsertIntoHadoopFsRelationCommand s3a://<REDACTED>, false, [event_day#81], Parquet, Map(basePath -> s3a://<REDACTED>, path -> s3a://<REDACTED>), Append, [id, client_id, somos_id, user_id, session_id, user_agent, method, status, path, timestamp, message, controller, action, facility, raw_data, event_day, generated_at]
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81, 2020-05-12T05:31:32.820718+00:00 AS generated_at#149]
+- Repartition 10, false
+- Project [id#22, client_id#23, somos_id#24, user_id#25, session_id#26, user_agent#27, method#28, status#29, path#30, timestamp#65, message#32, controller#33, action#34, facility#35, raw_data#15, event_day#81]
+- Filter isnull(id#98)
+- Join LeftOuter, (id#22 = id#98)
:- Project [jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).id AS id#22, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id AS client_id#23, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).somos_id AS somos_id#24, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id AS user_id#25, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).session_id AS session_id#26, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_agent AS user_agent#27, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method AS method#28, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).status AS status#29, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path AS path#30, cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) AS timestamp#65, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message AS message#32, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller AS controller#33, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).action AS action#34, jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility AS facility#35, value#0 AS raw_data#15, cast(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp) as date) AS event_day#81]
: +- Filter (((((((((((cast(regexp_replace(regexp_extract(path_file#2, (?:s3a:\/\/bucket\/logs\/)(\d{04}\/\d{02}\/\d{02}), 1), /, -) as date) >= 18392) && (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).facility = rails-production)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller = PingController)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).path)) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).user_id)) && isnotnull(cast(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).timestamp as timestamp))) && isnotnull(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method)) && NOT (jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).method = HEAD)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).client_id LIKE converge_%) && NOT StartsWith(jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).controller, Hotsite::)) && NOT jsontostructs(StructField(id,StringType,true), StructField(client_id,StringType,true), StructField(somos_id,StringType,true), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(user_agent,StringType,true), StructField(method,StringType,true), StructField(status,StringType,true), StructField(path,StringType,true), StructField(timestamp,StringType,true), StructField(message,StringType,true), StructField(controller,StringType,true), StructField(action,StringType,true), StructField(facility,StringType,true), value#0, Some(GMT)).message LIKE somos_id%)
: +- Project [value#0, input_file_name() AS path_file#2]
: +- Relation[value#0] text
+- Project [id#98]
+- Filter ((isnotnull(event_day#114) && (event_day#114 >= 18392)) && isnotnull(id#98))
+- Relation[id#98,client_id#99,somos_id#100,user_id#101,session_id#102,user_agent#103,method#104,status#105,path#106,timestamp#107,message#108,controller#109,action#110,facility#111,raw_data#112,generated_at#113,event_day#114]
The physical plan executes raw_data read in the same step as the filter.
Is there any way to avoid this?
I had a previous experience filtering the file names in python and passing a huge list/generator of filepaths to the spark.read() and this is also extremely slow.
Thanks in advance.
ps: few info that may be relevant:
- I'm running the job daily
- There was 40GB of log files in the last week
- 410GB since January 1
- Running with 2 executors with 7 cpu each