diff --git a/pdi/validate_file_sub.ktr b/pdi/validate_file_sub.ktr index 78b7b7e..f4f7eb3 100644 --- a/pdi/validate_file_sub.ktr +++ b/pdi/validate_file_sub.ktr @@ -947,78 +947,78 @@ import org.apache.http.util.EntityUtils; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { - Object[] r = getRow(); - - if (r == null) { - setOutputDone(); - return false; - } - - r = createOutputRow(r, data.outputRowMeta.size()); - - int queueNr = Integer.parseInt(getVariable("queue_nr")); - Queue queue = Queue.get(queueNr); - - String validatorUrl = getVariable("validator_url"); - String progressPath = getVariable("progress_path", "TestRuns?view=progress"); - - HttpGet httpGet = new HttpGet(validatorUrl + progressPath); - httpGet.setConfig(queue.config); - - int queueMaxSize = Integer.parseInt(getVariable("queue_max_size", "4")); - int queueCheckDelay = Integer.parseInt(getVariable("queue_check_delay", "2")); - int count = queueMaxSize; - int delay = 0; - int errors = 0; - logDetailed("waiting for queue..."); - do { - try { - Thread.sleep(delay * 1000); - delay = queueCheckDelay; - - logDetailed("Requesting " + httpGet.getRequestLine()); - CloseableHttpResponse response = queue.client.execute(httpGet); - try { - logDetailed(response.getStatusLine().toString()); - if (response.getStatusLine().getStatusCode() == 200) { - String resStr = EntityUtils.toString(response.getEntity()); - count = 0; - if (!"[]".equals(resStr)) { - try { - JSONArray jsonArray = (JSONArray) new JSONParser().parse(resStr); - //for (JSONObject obj : jsonArray) { - for (int i = 0; i < jsonArray.size(); i++) { - JSONObject obj = (JSONObject) jsonArray.get(i); - if ((Double) obj.get("percentStepsCompleted") < 100) { - count++; - obj.get("id"); - } - } - } catch (Exception e) { - logBasic("Response : " + EntityUtils.toString(response.getEntity())); - logError("Error", e); - } - } - logBasic("queue " + queueNr + " size: " + count); - } - - } finally { - response.close(); - } - } catch (Exception e) { - logError("Error", e); - delay = 90; // 1.5 minute delay in case of error - errors++; - } - if(errors > 12) - count = -1; - } while (!queue.incIfSmaller(queueMaxSize - count)); - - logDetailed("...proceeding"); - - putRow(data.outputRowMeta, r); - - return true; + Object[] r = getRow(); + + if (r == null) { + setOutputDone(); + return false; + } + + r = createOutputRow(r, data.outputRowMeta.size()); + + int queueNr = Integer.parseInt(getVariable("queue_nr")); + Queue queue = Queue.get(queueNr); + + String validatorUrl = getVariable("validator_url"); + String progressPath = getVariable("progress_path", "TestRuns?view=progress"); + + HttpGet httpGet = new HttpGet(validatorUrl + progressPath); + httpGet.setConfig(queue.config); + + int queueMaxSize = Integer.parseInt(getVariable("queue_max_size", "4")); + int queueCheckDelay = Integer.parseInt(getVariable("queue_check_delay", "2")); + int count = queueMaxSize; + int delay = 0; + int errors = 0; + logDetailed("waiting for queue..."); + do { + try { + Thread.sleep(delay * 1000); + delay = queueCheckDelay; + + logDetailed("Requesting " + httpGet.getRequestLine()); + CloseableHttpResponse response = queue.client.execute(httpGet); + try { + logDetailed(response.getStatusLine().toString()); + if (response.getStatusLine().getStatusCode() == 200) { + String resStr = EntityUtils.toString(response.getEntity()); + count = 0; + if (!"[]".equals(resStr)) { + try { + JSONArray jsonArray = (JSONArray) new JSONParser().parse(resStr); + //for (JSONObject obj : jsonArray) { + for (int i = 0; i < jsonArray.size(); i++) { + JSONObject obj = (JSONObject) jsonArray.get(i); + if ((Double) obj.get("percentStepsCompleted") < 100) { + count++; + obj.get("id"); + } + } + } catch (Exception e) { + logBasic("Response : " + EntityUtils.toString(response.getEntity())); + logError("Error", e); + } + } + logBasic("queue " + queueNr + " size: " + count); + } + + } finally { + response.close(); + } + } catch (Exception e) { + logError("Error", e); + delay = 90; // 1.5 minute delay in case of error + errors++; + } + if(errors > 12) + count = -1; + } while (!queue.incIfSmaller(queueMaxSize - count)); + + logDetailed("...proceeding"); + + putRow(data.outputRowMeta, r); + + return true; } @@ -1066,64 +1066,64 @@ import org.apache.http.util.EntityUtils; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { - Object[] r = getRow(); - - if (r == null) { - setOutputDone(); - return false; - } - - r = createOutputRow(r, data.outputRowMeta.size()); - - String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); - String testPath = getVariable("test_path", "TestRuns"); - - String runId = get(Fields.In, "run_id").getString(r); - Long startSeconds = get(Fields.In, "start_seconds").getLong(r); - String filename = get(Fields.In, "filename").getString(r); - - Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); - - // Http get - HttpGet httpGet = new HttpGet(validatorUrl + testPath + "/" + runId + ".html"); - httpGet.setConfig(queue.config); - - int tries = 0; - int maxTries = 2; - boolean success = false; - do { - try { - logBasic("Get result html for " + filename); - if (tries > 0) - Thread.sleep(30 * 1000); - - logDetailed("Requesting " + httpGet.getRequestLine()); - CloseableHttpResponse response = queue.client.execute(httpGet); - try { - logDetailed(response.getStatusLine().toString()); - if (response.getStatusLine().getStatusCode() == 200) { - success = true; - get(Fields.Out, "response_html").setValue(r, EntityUtils.toString(response.getEntity())); - } else { - logBasic("Get result html FAILED:"); - logBasic(EntityUtils.toString(response.getEntity())); - break; - } - } finally { - response.close(); - } - } catch (Exception e) { - logError("Error", e); - } - } while (!success && ++tries < maxTries); - - if (success) - putRow(data.outputRowMeta, r); - else { - putError(data.outputRowMeta, r, tries, "Get result html failed", filename, ""); - } - - return true; + Object[] r = getRow(); + + if (r == null) { + setOutputDone(); + return false; + } + + r = createOutputRow(r, data.outputRowMeta.size()); + + String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); + String testPath = getVariable("test_path", "TestRuns"); + + String runId = get(Fields.In, "run_id").getString(r); + Long startSeconds = get(Fields.In, "start_seconds").getLong(r); + String filename = get(Fields.In, "filename").getString(r); + + Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); + + // Http get + HttpGet httpGet = new HttpGet(validatorUrl + testPath + "/" + runId + ".html"); + httpGet.setConfig(queue.config); + + int tries = 0; + int maxTries = 2; + boolean success = false; + do { + try { + logBasic("Get result html for " + filename); + if (tries > 0) + Thread.sleep(30 * 1000); + + logDetailed("Requesting " + httpGet.getRequestLine()); + CloseableHttpResponse response = queue.client.execute(httpGet); + try { + logDetailed(response.getStatusLine().toString()); + if (response.getStatusLine().getStatusCode() == 200) { + success = true; + get(Fields.Out, "response_html").setValue(r, EntityUtils.toString(response.getEntity())); + } else { + logBasic("Get result html FAILED:"); + logBasic(EntityUtils.toString(response.getEntity())); + break; + } + } finally { + response.close(); + } + } catch (Exception e) { + logError("Error", e); + } + } while (!success && ++tries < maxTries); + + if (success) + putRow(data.outputRowMeta, r); + else { + putError(data.outputRowMeta, r, tries, "Get result html failed", filename, ""); + } + + return true; } @@ -1432,45 +1432,45 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.HttpClients; public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) { - int queueNr = Integer.parseInt(getVariable("queue_nr")); + int queueNr = Integer.parseInt(getVariable("queue_nr")); logBasic("Initializing queue " + queueNr); - Queue queue = Queue.init(queueNr); + Queue queue = Queue.init(queueNr); - String proxyHost = getVariable("http.proxyHost", ""); - int proxyPort = Integer.parseInt(getVariable("http.proxyPort", "-1")); - String proxyUser = getVariable("http.proxyUser", ""); - String proxyPassword = getVariable("http.proxyPassword", ""); + String proxyHost = getVariable("http.proxyHost", ""); + int proxyPort = Integer.parseInt(getVariable("http.proxyPort", "-1")); + String proxyUser = getVariable("http.proxyUser", ""); + String proxyPassword = getVariable("http.proxyPassword", ""); - // Setup Proxy and Proxy Credentials - CredentialsProvider credsProvider = new BasicCredentialsProvider(); - HttpHost proxy = null; + // Setup Proxy and Proxy Credentials + CredentialsProvider credsProvider = new BasicCredentialsProvider(); + HttpHost proxy = null; - if (!proxyHost.equals("")) { - proxy = new HttpHost(proxyHost, proxyPort); + if (!proxyHost.equals("")) { + proxy = new HttpHost(proxyHost, proxyPort); - if (!proxyUser.equals("")) { - credsProvider.setCredentials( - new AuthScope(proxyHost, proxyPort), - new UsernamePasswordCredentials(proxyUser, proxyPassword) - ); - } - } + if (!proxyUser.equals("")) { + credsProvider.setCredentials( + new AuthScope(proxyHost, proxyPort), + new UsernamePasswordCredentials(proxyUser, proxyPassword) + ); + } + } - queue.client = HttpClients - .custom() - .setDefaultCredentialsProvider(credsProvider) - .build(); + queue.client = HttpClients + .custom() + .setDefaultCredentialsProvider(credsProvider) + .build(); - RequestConfig.Builder reqConfigBuilder = RequestConfig.custom(); - if (proxy != null) { - reqConfigBuilder.setProxy(proxy); - } + RequestConfig.Builder reqConfigBuilder = RequestConfig.custom(); + if (proxy != null) { + reqConfigBuilder.setProxy(proxy); + } - queue.config = reqConfigBuilder.build(); + queue.config = reqConfigBuilder.build(); - return parent.initImpl(stepMetaInterface, stepDataInterface); + return parent.initImpl(stepMetaInterface, stepDataInterface); } public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { @@ -1677,77 +1677,77 @@ import org.apache.http.util.EntityUtils; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { - Object[] r = getRow(); - - if (r == null) { - setOutputDone(); - return false; - } - - r = createOutputRow(r, data.outputRowMeta.size()); - - Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); - - String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); - String testPath = getVariable("test_path", "TestRuns"); - String authToken = getVariable("authorization_token"); - - String testArgs = get(Fields.In, "test_args").getString(r); - String filename = get(Fields.In, "filename").getString(r); - - // Http post - HttpPost httpPost = new HttpPost(validatorUrl + testPath); - httpPost.setConfig(queue.config); - if (authToken != null) - httpPost.setHeader("X-API-Key", authToken); - try { - httpPost.setEntity(new StringEntity(testArgs, ContentType.APPLICATION_JSON)); - } catch (Exception e) { - logError("Error", e); - } - - /* Uploads are deleted after 5 minutes from the server, - so in order not to block test launching for next records - and not to let the launched tests pile up! - retries need to be limited. */ - int tries = 0; - boolean success = false; - do { - try { - // Delay 10 seconds only see comment above - if (tries > 0) - Thread.sleep(10 * 1000); - - logBasic("Launching test for " + filename); - logDetailed("Requesting " + httpPost.getRequestLine()); - CloseableHttpResponse response = queue.client.execute(httpPost); - try { - logDetailed(response.getStatusLine().toString()); - if (response.getStatusLine().getStatusCode() == 201) { - success = true; - get(Fields.Out, "response").setValue(r, EntityUtils.toString(response.getEntity())); - get(Fields.Out, "start_seconds").setValue(r, Instant.now().getEpochSecond()); - } else { - logBasic("Launch FAILED:"); - logBasic(EntityUtils.toString(response.getEntity())); - } - } finally { - response.close(); - } - } catch (Exception e) { - logError("Error", e); - } - } while (!success && tries++ < 1); // 1 retry only see comment above - - queue.decrement(); - - if (success) - putRow(data.outputRowMeta, r); - else { - putError(data.outputRowMeta, r, tries, "Test launch failed", filename, ""); - } - - return true; + Object[] r = getRow(); + + if (r == null) { + setOutputDone(); + return false; + } + + r = createOutputRow(r, data.outputRowMeta.size()); + + Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); + + String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); + String testPath = getVariable("test_path", "TestRuns"); + String authToken = getVariable("authorization_token"); + + String testArgs = get(Fields.In, "test_args").getString(r); + String filename = get(Fields.In, "filename").getString(r); + + // Http post + HttpPost httpPost = new HttpPost(validatorUrl + testPath); + httpPost.setConfig(queue.config); + if (authToken != null) + httpPost.setHeader("X-API-Key", authToken); + try { + httpPost.setEntity(new StringEntity(testArgs, ContentType.APPLICATION_JSON)); + } catch (Exception e) { + logError("Error", e); + } + + /* Uploads are deleted after 5 minutes from the server, + so in order not to block test launching for next records + and not to let the launched tests pile up! + retries need to be limited. */ + int tries = 0; + boolean success = false; + do { + try { + // Delay 10 seconds only see comment above + if (tries > 0) + Thread.sleep(10 * 1000); + + logBasic("Launching test for " + filename); + logDetailed("Requesting " + httpPost.getRequestLine()); + CloseableHttpResponse response = queue.client.execute(httpPost); + try { + logDetailed(response.getStatusLine().toString()); + if (response.getStatusLine().getStatusCode() == 201) { + success = true; + get(Fields.Out, "response").setValue(r, EntityUtils.toString(response.getEntity())); + get(Fields.Out, "start_seconds").setValue(r, Instant.now().getEpochSecond()); + } else { + logBasic("Launch FAILED:"); + logBasic(EntityUtils.toString(response.getEntity())); + } + } finally { + response.close(); + } + } catch (Exception e) { + logError("Error", e); + } + } while (!success && tries++ < 1); // 1 retry only see comment above + + queue.decrement(); + + if (success) + putRow(data.outputRowMeta, r); + else { + putError(data.outputRowMeta, r, tries, "Test launch failed", filename, ""); + } + + return true; } @@ -2108,72 +2108,75 @@ import org.apache.http.util.EntityUtils; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { - Object[] r = getRow(); - - if (r == null) { - setOutputDone(); - return false; - } - - r = createOutputRow(r, data.outputRowMeta.size()); - - - String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); - String uploadPath = getVariable("upload_path", "TestObjects?action=upload"); - Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); - - // Load the file - String longFileNameString = get(Fields.In, "filename").getString(r); - File file = new File(longFileNameString); - - // Multipart entity - MultipartEntityBuilder entity = MultipartEntityBuilder.create(); - //entity.setMode(HttpMultipartMode.STRICT); - entity.setMode(HttpMultipartMode.BROWSER_COMPATIBLE); - entity.addBinaryBody("file", file); - entity.addTextBody("fileName", file.getName()); - - // Http post - HttpPost httpPost = new HttpPost(validatorUrl + uploadPath); - httpPost.setConfig(queue.config); - httpPost.setEntity(entity.build()); - - int tries = 0; - int maxTries = 12; - boolean success = false; - do { - try { - if (tries > 0) - Thread.sleep(90 * 1000); // delay 1.5 minutes - - logBasic("Uploading " + longFileNameString); - logDetailed("Requesting " + httpPost.getRequestLine()); - CloseableHttpResponse response = queue.client.execute(httpPost); - try { - logDetailed(response.getStatusLine().toString()); - if (response.getStatusLine().getStatusCode() == 200) { - success = true; - get(Fields.Out, "response").setValue(r, EntityUtils.toString(response.getEntity())); - } else { - logBasic("Upload FAILED:"); - logBasic(EntityUtils.toString(response.getEntity())); - } - } finally { - response.close(); - } - } catch (Exception e) { - logError("Error", e); - } - } while (!success && tries++ < maxTries); - - if (success) - putRow(data.outputRowMeta, r); - else { - putError(data.outputRowMeta, r, tries, "Upload failed", longFileNameString, ""); - queue.decrement(); - } - - return true; + Object[] r = getRow(); + + if (r == null) { + setOutputDone(); + return false; + } + + r = createOutputRow(r, data.outputRowMeta.size()); + + + String validatorUrl = getVariable("validator_url", "http://inspire.ec.europa.eu/validator/v2/"); + String uploadPath = getVariable("upload_path", "TestObjects?action=upload"); + Queue queue = Queue.get(Integer.parseInt(getVariable("queue_nr"))); + + // Load the file + String longFileNameString = get(Fields.In, "filename").getString(r); + File file = new File(longFileNameString); + + // Multipart entity + MultipartEntityBuilder entity = MultipartEntityBuilder.create(); + //entity.setMode(HttpMultipartMode.STRICT); + entity.setMode(HttpMultipartMode.BROWSER_COMPATIBLE); + entity.addBinaryBody("fileupload", file); + //entity.addTextBody("fileupload", file.getName()); + + // Http post + HttpPost httpPost = new HttpPost(validatorUrl + uploadPath); + httpPost.setConfig(queue.config); + httpPost.setEntity(entity.build()); + // 20210921 PATCH: Include Accept header + httpPost.addHeader("Accept", "*/*"); + + logBasic(httpPost.getEntity().toString()); + int tries = 0; + int maxTries = 12; + boolean success = false; + do { + try { + if (tries > 0) + Thread.sleep(90 * 1000); // delay 1.5 minutes + + logBasic("Uploading " + longFileNameString); + logDetailed("Requesting " + httpPost.getRequestLine()); + CloseableHttpResponse response = queue.client.execute(httpPost); + try { + logDetailed(response.getStatusLine().toString()); + if (response.getStatusLine().getStatusCode() == 200) { + success = true; + get(Fields.Out, "response").setValue(r, EntityUtils.toString(response.getEntity())); + } else { + logBasic("Upload FAILED:"); + logBasic(EntityUtils.toString(response.getEntity())); + } + } finally { + response.close(); + } + } catch (Exception e) { + logError("Error", e); + } + } while (!success && tries++ < maxTries); + + if (success) + putRow(data.outputRowMeta, r); + else { + putError(data.outputRowMeta, r, tries, "Upload failed", longFileNameString, ""); + queue.decrement(); + } + + return true; } @@ -2245,4 +2248,4 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K N - + \ No newline at end of file