I'm trying to set up an Azure Data Factory to copy and denormalize my data from a AzureSQL database to another AzureSQL database for reporting/BI purposes with a data flow, but I ran into a problem with inserting dates.
This is the definition of my dataflow.
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable2",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
}
}
}
and these are the definitions of my source
{
"name": "AzureSqlTable1",
"properties": {
"linkedServiceName": {
"referenceName": "Source_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [
{
"name": "BucketId",
"type": "varchar"
},
{
"name": "StreamId",
"type": "char"
},
{
"name": "StreamIdOriginal",
"type": "nvarchar"
},
{
"name": "StreamRevision",
"type": "int",
"precision": 10
},
{
"name": "Items",
"type": "tinyint",
"precision": 3
},
{
"name": "CommitId",
"type": "uniqueidentifier"
},
{
"name": "CommitSequence",
"type": "int",
"precision": 10
},
{
"name": "CommitStamp",
"type": "datetime2",
"scale": 7
},
{
"name": "CheckpointNumber",
"type": "bigint",
"precision": 19
},
{
"name": "Dispatched",
"type": "bit"
},
{
"name": "Headers",
"type": "varbinary"
},
{
"name": "Payload",
"type": "varbinary"
}
],
"typeProperties": {
"tableName": "[dbo].[Commits]"
}
}
}
and sink data sets
{
"name": "AzureSqlTable2",
"properties": {
"linkedServiceName": {
"referenceName": "Dest_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [],
"typeProperties": {
"tableName": "dbo.Test2"
}
}
}
When running my pipeline with the data flow I get the following error:
Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
My Azure SQL audit log shows the following statement that failed (which is not a huge surprise considering that it uses VARCHAR(50) as type for [CommitStamp]:
INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
[BucketId] NVARCHAR(MAX),
[CommitStamp] VARCHAR(50),
[StreamId] NVARCHAR(MAX),
[StreamIdOriginal] NVARCHAR(MAX),
[StreamRevision] INT,
[Items] INT,
[CommitId] NVARCHAR(MAX),
[CommitSequence] INT,
[CheckpointNumber] BIGINT,
[Dispatched] BIT,
[Headers] VARBINARY(MAX),
[Payload] VARBINARY(MAX),
[r8e440f7252bb401b9ead107597de6293] INT)
with (ROWS_PER_BATCH = 4096, TABLOCK)
I have absolutely no idea why this occurs. It looks like the schema information is correct but somehow it seems the data factory/data flow wants to insert the CommitStamp as a string type.
As requested, the output from the data flow/code/plan view:
source(output(
BucketId as string,
StreamId as string,
StreamIdOriginal as string,
StreamRevision as integer,
Items as integer,
CommitId as string,
CommitSequence as integer,
CommitStamp as timestamp,
CheckpointNumber as long,
Dispatched as boolean,
Headers as binary,
Payload as binary
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table',
schemaName: '[dbo]',
tableName: '[Commits]',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Application',
user: 'Sign2025Admin',
password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
mapColumn(
BucketId,
CommitStamp
),
schemaName: 'dbo',
tableName: 'Test2',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Reporting',
user: 'Sign2025Admin',
password: '**********') ~> sink1











