package com.Nxin.BigData;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.Job;
import org.pentaho.di.trans.Trans;
public class Kettle {
public static void runTran(String tranname,Map<String,String> Paters) {
try {
TransMeta transMeta = new TransMeta(tranname);
Trans trans = new Trans(transMeta);
if (Paters!=null){ //设置参数
for(Map.Entry<String, String> entry : Paters.entrySet()){
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
transMeta.setParameterValue(entry.getKey(),entry.getValue());
}
}
trans.prepareExecution(null);
trans.startThreads();
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
} catch (KettleException e) {e.printStackTrace();}
}
public static void runJob(String jobname,Map<String,String> Paters) {
try {
JobMeta jobMeta = new JobMeta(jobname, null);
Job job = new Job(null, jobMeta);
if (Paters!=null){//设置参数
for(Map.Entry<String, String> entry : Paters.entrySet()){
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
jobMeta.setParameterValue(entry.getKey(),entry.getValue());
}
}
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
} catch (KettleException e) { e.printStackTrace(); }
}
public static void runTran(String tranName,KettleFileRepository rep,Map<String,String> Paters) {
try {
Trans trans = null;
if (tranName != null && !"".equals(tranName)) {
TransMeta transMeta = rep.loadTransformation(rep.getTransformationID(tranName, null), null); // 转换对象
trans = new Trans(transMeta); // 转换
if (Paters!=null){ //设置参数
for(Map.Entry<String, String> entry : Paters.entrySet()){
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
transMeta.setParameterValue(entry.getKey(),entry.getValue());
}
}
trans.execute(null); // 执行转换
trans.waitUntilFinished(); // 等待转换执行结束
if (trans.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
}
} catch (Exception e) { e.printStackTrace(); }
}
public static void runJob(String jobName,KettleFileRepository rep,Map<String,String> Paters) {
try {
Job job = null;
RepositoryDirectoryInterface directory = rep.loadRepositoryDirectoryTree();
if (jobName != null && !"".equals(job)) {
JobMeta jobMeta = rep.loadJob(jobName, directory, null, null);
jobMeta.activateParameters();
job = new Job(rep,jobMeta); //必须加rep否则无法
if (Paters!=null){//设置参数
for(Map.Entry<String, String> entry : Paters.entrySet()){
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
jobMeta.setParameterValue(entry.getKey(),entry.getValue());
}
}
job.start(); // 执行作业
job.waitUntilFinished(); // 等待作业执行结束
if (job.getErrors() > 0) {
throw new RuntimeException("传输过程中发生异常");
}
}
} catch (Exception e) { e.printStackTrace(); }
}
public static void runJob(String jobName,KettleFileRepository rep) {
runJob(jobName,rep,new HashMap<String,String>());
}
public static void runTran(String tranName,KettleFileRepository rep) {
runTran(tranName,rep,new HashMap<String,String>());
}
public static void runJob(String jobName) {
runJob(jobName,new HashMap<String,String>());
}
public static void runTran(String tranName) {
runTran(tranName,new HashMap<String,String>());
}
public static KettleFileRepository FileRepository(String RepName,String path) {
KettleFileRepositoryMeta repMeta = new KettleFileRepositoryMeta("", "", RepName, path); // 资源库元对象
KettleFileRepository rep = new KettleFileRepository(); // 文件形式的资源库
rep.init(repMeta);
return rep;
}
public static void main(String[] args) throws KettleException {
String path=new File("D:\\ETL\\Kettle").toURI().toString(),repName="Kettle";
KettleEnvironment.init(); // 初始化
KettleFileRepository rep = FileRepository(repName, path);
Map<String,String> Paters = new HashMap<String,String>();
Paters.put("client", "clientA");
//runTran("D:\\test.ktr"); //执行转换
//runTran("D:\\test.ktr",Paters); //执行转换 带参数
//runJob("D:\\test.kjb"); //执行作业
//runJob("D:\\test.kjb",Paters); //执行作业 带参数
//runTran("test0103",rep); //执行转换 基于文件资源库
//runTran("test0103",rep,Paters); //执行转换 基于文件资源库 带参数
//runJob("test0103_2",rep); //执行作业 基于文件资源库
//runJob("test0103_2",rep,Paters); //执行作业 基于文件资源库 带参数
rep.disconnect();
}
}