I am trying to use the ingest sdk to ingest a file in my Java program and getting an error. I have created the table, stage, and pipe in advance. I have also PUT my CSV file into the internal stage to be ready for ingestion. Now my Java program just needs to call ingestFiles to get the file to load. However, when I make the call, I get a 404 error (see the attached log file and Java program below).
I have created my public & private key using these procedures: https://docs.snowflake.net/manuals/user-guide/data-load-snowpipe-rest-gs.html#step-3-configure-security-per-user
Also note that I can successfully load the file by manually issuing the COPY command associated with the pipe. However, I prefer to use the ingest SDK so I can trap the response better.
I suspect this is a permission issue but am not sure. Any help would be appreciated.
03:11:06.275 [main] INFO com.pardi.snowpipetest.IngestTest - successfully loaded private key from file keys/rsa_key.p8
[main] WARN net.snowflake.ingest.connection.RequestBuilder - Could not read version info: java.nio.file.FileSystemNotFoundException
[main] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with subject WX11111.JPARDI
[main] INFO net.snowflake.ingest.connection.SecurityManager - Creating Token with issuer WX11111.JPARDI.SHA256:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[main] INFO net.snowflake.ingest.connection.SecurityManager - Created new JWT - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[main] INFO net.snowflake.ingest.connection.RequestBuilder - Creating a RequestBuilder with arguments : Account : WX11111, User : JPARDI, Scheme : https, Host : .us-east-1.snowflakecomputing.com, Port : 443
[main] INFO net.snowflake.ingest.SimpleIngestManager - Sending Request UUID -
[main] INFO net.snowflake.ingest.connection.RequestBuilder - Created Insert Request : https://.us-east-1.snowflakecomputing.com:443/v1/data/pipes/DEMO_DB.PUBLIC.COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0/insertFiles?requestId=39a2c1e4-c637-4424-bb68-42c4eb71873a
[main] INFO net.snowflake.ingest.SimpleIngestManager - Attempting to unmarshall insert response - HttpResponseProxy{HTTP/1.1 404 Not Found [Content-Type: application/json, Date: Tue, 25 Feb 2020 08:11:10 GMT, Server: nginx, Strict-Transport-Security: max-age=31536000, X-Content-Type-Options: nosniff, X-Frame-Options: deny, Connection: keep-alive] net.snowflake.ingest.internal.apache.http.client.entity.DecompressingEntity@29c80149}
[main] WARN net.snowflake.ingest.connection.ServiceResponseHandler - Exceptional Status Code found in unmarshallInsert Response - 404
[main] ERROR net.snowflake.ingest.connection.ServiceResponseHandler - Status code 404 found in response from service
03:11:11.943 [main] INFO com.pardi.snowpipetest.IngestTest - Service exception:
HTTP Status: 404
{
Message: Specified object does not exist or not authorized. Pipe not found,
Data: null
}
package com.pardi.snowpipetest;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryResponse;
import net.snowflake.ingest.connection.IngestResponse;
import net.snowflake.ingest.connection.IngestResponseException;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import java.io.*;
import java.security.PrivateKey;
import java.security.Security;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.*;
public class IngestTest {
private static final Logger LOGGER = LoggerFactory.getLogger(IngestTest.class);
private static String host = "<xxxx>.us-east-1.snowflakecomputing.com";
private static String account = "<xxxxx>";
private static String user = "xxxxxx";
private static String passPhrase = "xxxxxxxxx";
private static int port = 443;
private static String database = "DEMO_DB";
private static String schema = "PUBLIC";
private static String pipe = "COVETRUS_SNOWFLAKE_BATCH_SINK_SNOWPIPETEST_PIPE_H_CLIENT_0";
private static String fqPipe = database + "." + schema + "." + pipe;
private static PrivateKey privateKey;
private static SimpleIngestManager manager;
public static void main(String[] args) throws Exception {
String privateKeyStr = loadPrivateKey();
privateKey = parseEncryptedPrivateKey(privateKeyStr, passPhrase);
manager = new SimpleIngestManager(account, user, fqPipe, privateKey, "https", host, port);
Set<String> files = new TreeSet<>();
files.add("covetrus_snowflake_batch_sink_snowpipetest_stage_h_client/h_client.csv");
try {
IngestResponse response = manager.ingestFiles(manager.wrapFilepaths(files), null);
LOGGER.info("response=" + response.toString());
HistoryResponse history = waitForFilesHistory(files);
LOGGER.info("Received history response: " + history.toString());
} catch (IngestResponseException e) {
LOGGER.info("Service exception: " + e.toString());
} catch (Exception e) {
LOGGER.info("Exception: " + e.getMessage());
}
}
private static String loadPrivateKey() throws IOException {
byte[] keyBytes;
String filename = "keys/rsa_key.p8";
File privateKeyFile = null;
try {
privateKeyFile = new ClassPathResource(filename).getFile();
FileInputStream fis = new FileInputStream(privateKeyFile);
DataInputStream dis = new DataInputStream(fis);
keyBytes = new byte[(int) privateKeyFile.length()];
dis.readFully(keyBytes);
dis.close();
} catch (IOException e) {
LOGGER.info("FATAL: error loading private key from file " + filename + ", exception=" + e.getMessage());
e.printStackTrace();
throw e;
}
String privateKeyStr = new String(keyBytes);
LOGGER.info("successfully loaded private key from file " + filename);
return privateKeyStr;
}
public static PrivateKey parseEncryptedPrivateKey(String key, String passphrase) {
Security.addProvider(new BouncyCastleFipsProvider());
try {
PEMParser pemParser = new PEMParser(new StringReader(key));
PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemParser.readObject();
pemParser.close();
InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleFipsProvider.PROVIDER_NAME);
PrivateKeyInfo decryptedPrivateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
return converter.getPrivateKey(decryptedPrivateKeyInfo);
} catch (Exception e) {
throw new RuntimeException("Invalid encrypted private key or passphrase");
}
}
private static HistoryResponse waitForFilesHistory(Set<String> files)
throws Exception {
ExecutorService service = Executors.newSingleThreadExecutor();
class GetHistory implements
Callable<HistoryResponse> {
private Set<String> filesWatchList;
GetHistory(Set<String> files) {
this.filesWatchList = files;
}
String beginMark = null;
public HistoryResponse call()
throws Exception {
HistoryResponse filesHistory = null;
while (true) {
Thread.sleep(500);
HistoryResponse response = manager.getHistory(null, null, beginMark);
if (response.getNextBeginMark() != null) {
beginMark = response.getNextBeginMark();
}
if (response != null && response.files != null) {
for (HistoryResponse.FileEntry entry : response.files) {
//if we have a complete file that we've
// loaded with the same name..
String filename = entry.getPath();
if (entry.getPath() != null && entry.isComplete() &&
filesWatchList.contains(filename)) {
if (filesHistory == null) {
filesHistory = new HistoryResponse();
filesHistory.setPipe(response.getPipe());
}
filesHistory.files.add(entry);
filesWatchList.remove(filename);
//we can return true!
if (filesWatchList.isEmpty()) {
return filesHistory;
}
}
}
}
}
}
}
GetHistory historyCaller = new GetHistory(files);
//fork off waiting for a load to the service
Future<HistoryResponse> result = service.submit(historyCaller);
HistoryResponse response = result.get(2, TimeUnit.MINUTES);
return response;
}
}