java kettle v2

    xiaoxiao2021-03-26  4

    package com.Nxin.BigData; import java.io.File; import java.util.HashMap; import java.util.Map; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.filerep.KettleFileRepository; import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.job.JobMeta; import org.pentaho.di.job.Job; import org.pentaho.di.trans.Trans; public class Kettle { public static void runTran(String tranname,Map<String,String> Paters) { try { TransMeta transMeta = new TransMeta(tranname); Trans trans = new Trans(transMeta);   if (Paters!=null){ //设置参数   for(Map.Entry<String, String> entry : Paters.entrySet()){   System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());   transMeta.setParameterValue(entry.getKey(),entry.getValue()); }   } trans.prepareExecution(null);          trans.startThreads();   trans.waitUntilFinished(); if (trans.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } catch (KettleException e) {e.printStackTrace();} } public static void runJob(String jobname,Map<String,String> Paters) { try { JobMeta jobMeta = new JobMeta(jobname, null); Job job = new Job(null, jobMeta);   if (Paters!=null){//设置参数   for(Map.Entry<String, String> entry : Paters.entrySet()){   System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());   jobMeta.setParameterValue(entry.getKey(),entry.getValue());   }   } job.start();   job.waitUntilFinished(); if (job.getErrors() > 0) { throw new RuntimeException("传输过程中发生异常"); } } catch (KettleException e) { e.printStackTrace(); } }          public static void runTran(String tranName,KettleFileRepository rep,Map<String,String> Paters) {         try {      Trans trans = null;      if (tranName != null && !"".equals(tranName)) {      TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(tranName, null), null);  // 转换对象      trans = new Trans(transMeta); // 转换    if (Paters!=null){ //设置参数   for(Map.Entry<String, String> entry : Paters.entrySet()){   System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());   transMeta.setParameterValue(entry.getKey(),entry.getValue()); }   }     trans.execute(null);   // 执行转换      trans.waitUntilFinished();  // 等待转换执行结束      if (trans.getErrors() > 0) {      throw new RuntimeException("传输过程中发生异常");          }      } catch (Exception e) { e.printStackTrace(); }      }     public static void runJob(String jobName,KettleFileRepository rep,Map<String,String> Paters) {        try {     Job job = null;     RepositoryDirectoryInterface directory = rep.loadRepositoryDirectoryTree();      if (jobName != null && !"".equals(job)) {     JobMeta jobMeta = rep.loadJob(jobName, directory, null, null);     jobMeta.activateParameters();     job = new Job(rep,jobMeta); //必须加rep否则无法    if (Paters!=null){//设置参数    for(Map.Entry<String, String> entry : Paters.entrySet()){     System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());     jobMeta.setParameterValue(entry.getKey(),entry.getValue());    }    }    job.start(); // 执行作业     job.waitUntilFinished();  // 等待作业执行结束       if (job.getErrors() > 0) {       throw new RuntimeException("传输过程中发生异常");     }     }     } catch (Exception e) { e.printStackTrace(); }   }    public static void runJob(String jobName,KettleFileRepository rep) { runJob(jobName,rep,new HashMap<String,String>());  } public static void runTran(String tranName,KettleFileRepository rep) { runTran(tranName,rep,new HashMap<String,String>());  } public static void runJob(String jobName) { runJob(jobName,new HashMap<String,String>());  } public static void runTran(String tranName) { runTran(tranName,new HashMap<String,String>());  }     public static KettleFileRepository FileRepository(String RepName,String path) {         KettleFileRepositoryMeta repMeta = new KettleFileRepositoryMeta("", "", RepName, path); // 资源库元对象          KettleFileRepository rep = new KettleFileRepository(); // 文件形式的资源库      rep.init(repMeta);      return rep;     }     public static void main(String[] args) throws KettleException { String path=new File("D:\\ETL\\Kettle").toURI().toString(),repName="Kettle";    KettleEnvironment.init(); // 初始化     KettleFileRepository rep = FileRepository(repName, path);         Map<String,String> Paters = new HashMap<String,String>();    Paters.put("client", "clientA"); //runTran("D:\\test.ktr"); //执行转换 //runTran("D:\\test.ktr",Paters); //执行转换  带参数     //runJob("D:\\test.kjb"); //执行作业 //runJob("D:\\test.kjb",Paters); //执行作业 带参数        //runTran("test0103",rep); //执行转换 基于文件资源库    //runTran("test0103",rep,Paters); //执行转换 基于文件资源库 带参数        //runJob("test0103_2",rep); //执行作业 基于文件资源库    //runJob("test0103_2",rep,Paters); //执行作业 基于文件资源库 带参数        rep.disconnect(); }      }
    转载请注明原文地址: https://ju.6miu.com/read-500327.html

    最新回复(0)