Springboot集成kettle实战
- 介绍
- 项目截图
- java+kettle实战代码
- 1 pom.xml
- 2 kettleUtil
- 3 业务层方法
- a 资源库业务
- b 转换的相关业务
- c 完整代码
介绍
kettle就不介绍了,Web界面极其简单、丑陋,生产环境无法投入使用.所以这篇实战内容是springboot结合kettle的web端执行已经在spoon客户端做好的ktr和kjb(既:这里不包括web页面制作ktr和kjb).主要功能如下:
- web页面选择本地文件夹作为资源库
- 从已配置好的资源库里选择ktr和kjb保存到系统
- 配置ktr和kjb的日志级别,可执行的角色(数据权限)
- 立即执行ktr,kjb
- 定时执行ktr,kjb
- 查看执行日志
项目截图





java+kettle实战代码
1 pom.xml
这里只贴了kettle的相关依赖.完整的请看后面的完整代码
<!-- kettle核心依赖 --><dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-core</artifactId><version>${kettle-version}</version></dependency><dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-engine</artifactId><version>${kettle-version}</version></dependency><dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-dbdialog</artifactId><version>${kettle-version}</version></dependency><dependency><groupId>org.pentaho.di.plugins</groupId><artifactId>kettle-sap-plugin-core</artifactId><version>${kettle-version}</version></dependency><dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-ui-swt</artifactId><version>${kettle-version}</version></dependency><!--kettle执行复杂脚本需要此包,如执行js组件--><dependency><groupId>org.codehaus.janino</groupId><artifactId>janino</artifactId><version>${janino-version}</version></dependency><dependency><groupId>org.eclipse.birt.runtime.3_7_1</groupId><artifactId>org.mozilla.javascript</artifactId><version>${javascript-version}</version></dependency><!-- kettle plugin excel--><dependency><groupId>net.sourceforge.jexcelapi</groupId><artifactId>jxl</artifactId><version>${jxl-version}</version></dependency><!-- 降低MySQL版本,kettle中连接mysql资源库使用的是低版本驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency><!-- org.gjt.mm.mysql.Driver --><dependency><groupId>mm.mysql</groupId><artifactId>mm.mysql</artifactId><version>2.0.7</version></dependency><!-- sqlserver(native)连接方式驱动--><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>sqljdbc4</artifactId><version>4.0</version></dependency><!-- sqlser连接驱动--><dependency><groupId>net.sourceforge.jtds</groupId><artifactId>jtds</artifactId><version>1.2.4</version></dependency>
2 kettleUtil
封装了一些常用的方法,贴了执行转换的,job类似
/*** 执行文件资源库转换* @param transPath 转换路径(相对于资源库)* @param transName 转换名称(不需要后缀)* @param namedParams 命名参数* @param clParams 命令行参数*/public void callTrans(String transPath, String transName, Map<String,String> namedParams, String[] clParams) throws Exception {KettleEnv.init();DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)","xxx.xxx.x.xx","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "password");String msg;KettleFileRepository repo = this.fileRepositoryCon();TransMeta transMeta = this.loadTrans(repo, transPath, transName);transMeta.addDatabase(databaseMeta);VariableSpace space=new Variables();TransLogTable jobLogTable= TransLogTable.getDefault(space,transMeta,null);jobLogTable.setTableName("kettle_trans_log");jobLogTable.setConnectionName("kettle_trans_log");transMeta.setTransLogTable(jobLogTable);//transMeta.getTransLogTable().setTableName(repInitialization.transLog);//转换Trans trans = new Trans(transMeta);//设置命名参数if(null != namedParams) {//namedParams.forEach(trans::setParameterValue);/*for (Map.Entry<String, String> entry : namedParams.entrySet()) {trans.setParameterValue(entry.getKey(), entry.getValue());}*/for(Iterator<Map.Entry<String, String>> it = namedParams.entrySet().iterator(); it.hasNext();){Map.Entry<String, String> entry = it.next();trans.setParameterValue(entry.getKey(), entry.getValue());}}trans.setLogLevel(this.getLogerLevel(KETTLE_LOG_LEVEL));//执行trans.execute(clParams);trans.waitUntilFinished();KettleLogStore.discardLines(trans.getLogChannelId(),true);//记录日志String logChannelId = trans.getLogChannelId();LoggingBuffer appender = KettleLogStore.getAppender();String logText = appender.getBuffer(logChannelId, true).toString();log.info("[logTextlogText:"+logText+":logTextlogText]");//抛出异常if (trans.getErrors() > 0) {msg = "There are errors during transformation exception!(转换过程中发生异常)";log.error(msg);throw new Exception(msg);}}
3 业务层方法
a 资源库业务
/*** 资源库Service业务层处理* * @author kone* @date 2021-07-12*/
@Service
public class XRepositoryServiceImpl implements IXRepositoryService
{@Autowiredprivate XRepositoryMapper xRepositoryMapper;/*** 查询资源库* * @param id 资源库ID* @return 资源库*/@Overridepublic XRepository selectXRepositoryById(Long id){return xRepositoryMapper.selectXRepositoryById(id);}/*** 查询资源库列表* * @param xRepository 资源库* @return 资源库*/@Overridepublic List<XRepository> selectXRepositoryList(XRepository xRepository){return xRepositoryMapper.selectXRepositoryList(xRepository);}/*** 新增资源库* * @param xRepository 资源库* @return 结果*/@Overridepublic int insertXRepository(XRepository xRepository){String userName = (String) PermissionUtils.getPrincipalProperty("userName");xRepository.setCreatedBy(userName);xRepository.setUpdateBy(userName);xRepository.setType("File");return xRepositoryMapper.insertXRepository(xRepository);}/*** 修改资源库* * @param xRepository 资源库* @return 结果*/@Overridepublic int updateXRepository(XRepository xRepository){String userName = (String) PermissionUtils.getPrincipalProperty("userName");xRepository.setUpdateTime(DateUtils.getNowDate());xRepository.setUpdateBy(userName);return xRepositoryMapper.updateXRepository(xRepository);}/*** 删除资源库对象* * @param ids 需要删除的数据ID* @return 结果*/@Overridepublic int deleteXRepositoryByIds(String ids){return xRepositoryMapper.updateIsDelBatch(Convert.toStrArray(ids));// return xRepositoryMapper.deleteXRepositoryByIds(Convert.toStrArray(ids));}/*** 删除资源库信息* * @param id 资源库ID* @return 结果*/@Overridepublic int deleteXRepositoryById(Long id){return xRepositoryMapper.updateIsDel(id);//return xRepositoryMapper.deleteXRepositoryById(id);}@Overridepublic List<RepoTree> selectRepoRoot(XRepository repository) {List<XRepository> repositoryList = xRepositoryMapper.selectXRepositoryList(repository);List<RepoTree> ztrees = initZtree2(repositoryList);return ztrees;}@Overridepublic List<RepoTree> selectRepoTree(Long id) {XRepository xrs = xRepositoryMapper.selectXRepositoryById(id);List<RepositoryTree> repositoryTrees = getRepoTress(xrs);List<RepositoryTree> subTrees = new ArrayList<>();String type=null;String pId=String.valueOf(xrs.getId());List<RepoTree> ztrees = initZtree(repositoryTrees,String.valueOf(id));return ztrees;}public List<RepoTree> initZtree(List<RepositoryTree> repositoryList ,String parentId){List<RepoTree> ztrees = new ArrayList<RepoTree>();for (RepositoryTree rt : repositoryList) {if(rt.getId().equals(parentId) || rt.getText().equals("/")){continue;}RepoTree ztree = new RepoTree();ztree.setId(rt.getId());ztree.setpId(rt.getParent());ztree.setName(rt.getText());ztree.setTitle(rt.getPath());ztrees.add(ztree);}return ztrees;}public List<RepoTree> initZtree2(List<XRepository> repositoryList ){List<RepoTree> ztrees = new ArrayList<RepoTree>();for (XRepository rt : repositoryList){RepoTree ztree = new RepoTree();ztree.setId(String.valueOf(rt.getId()));ztree.setpId(" ");ztree.setName(rt.getRepoName());ztree.setTitle(rt.getBaseDir());ztrees.add(ztree);}return ztrees;}private List<RepositoryTree> getRepoTress(XRepository xr) {List<RepositoryTree> repositoryTrees = new ArrayList<>();List<XRepository> xRepositoryList =xRepositoryMapper.selectXRepositoryList(xr);if (!CollectionUtils.isEmpty(xRepositoryList)) {xRepositoryList.forEach(item -> {List<RepositoryTree> tmpRepositoryList = new ArrayList<>();String type = item.getType();if (type.equalsIgnoreCase("File")) {// 文件库String baseDir = item.getBaseDir();try {KettleFileRepository repository = (KettleFileRepository) KettleUtil_2.conFileRep(String.valueOf(item.getId()), item.getRepoName(), baseDir);XRepoManager.getAllDirectoryTreeList(String.valueOf(item.getId()), repository, "/", tmpRepositoryList);if (tmpRepositoryList.size() > 0) {RepositoryDirectoryInterface rDirectory = repository.loadRepositoryDirectoryTree().findDirectory("/");RepositoryTree repositoryTree = new RepositoryTree();repositoryTree.setParent(String.valueOf(item.getId()));repositoryTree.setId(item.getRepoId() + "@" + rDirectory.getObjectId().toString());//repositoryTree.setId(String.valueOf(item.getId()));repositoryTree.setText(rDirectory.getName().equals("\\/") ? "基础路径" : rDirectory.getName());repositoryTree.setLasted(false);repositoryTree.setType("tree");repositoryTree.setPath("file");tmpRepositoryList.add(repositoryTree);}} catch (KettleException e) {StringWriter sw = new StringWriter();e.printStackTrace(new PrintWriter(sw));}}repositoryTrees.addAll(tmpRepositoryList);});}return repositoryTrees;}}
b 转换的相关业务
注释应该还算清楚(有点不自信/(ㄒoㄒ)/~~)
/*** 转换Service业务层处理* * @author kone* @date 2021-07-14*/
@Service("kettleTransServiceImpl")
public class KettleTransServiceImpl implements IKettleTransService
{@Autowiredprivate KettleTransMapper kettleTransMapper;@Autowiredprivate XRepositoryMapper repositoryMapper;@Autowiredprivate KettleUtil kettleUtil;/*** 查询转换** @param id 转换ID* @return 转换*/@Overridepublic KettleTrans selectKettleTransById(Long id){return kettleTransMapper.selectKettleTransById(id);}/*** 查询转换列表** @param kettleTrans 转换* @return 转换*/@Overridepublic List<KettleTrans> selectKettleTransList(KettleTrans kettleTrans){Object o=PermissionUtils.getPrincipalProperty("roles");List<SysRole> roleList=new ArrayList<>();// roleList= (List<SysRole>) PermissionUtils.getPrincipalProperty("roles");if(o != null && o instanceof List<?>){for(Object r:(List<?>)o){roleList.add(SysRole.class.cast(r));}}//当前用户的roleKeyList<String> roleKeys=roleList.stream().map(SysRole::getRoleKey).collect(Collectors.toList());return kettleTransMapper.selectKettleTransList(kettleTrans,roleKeys);}/*** 新增转换** @param kettleTrans 转换* @return 结果*/@Overridepublic AjaxResult insertKettleTrans(KettleTrans kettleTrans){String transName=kettleTrans.getTransName();if(kettleTransMapper.selectKettleTransByTransName(transName)>0){return AjaxResult.error("已存在同名转换");}String userName = (String) PermissionUtils.getPrincipalProperty("userName");if(kettleTrans.getRoleKey()==null){kettleTrans.setRoleKey("admin");}else{if(!kettleTrans.getRoleKey().contains("admin")){kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin"));}}kettleTrans.setCreatedBy(userName);kettleTrans.setUpdateBy(userName);kettleTrans.setTransType("File");return AjaxResult.success(kettleTransMapper.insertKettleTrans(kettleTrans));}/*** 修改转换** @param kettleTrans 转换* @return 结果*/@Overridepublic int updateKettleTrans(KettleTrans kettleTrans){String userName = (String) PermissionUtils.getPrincipalProperty("userName");kettleTrans.setUpdateBy(userName);kettleTrans.setUpdateTime(DateUtils.getNowDate());kettleTrans.setTransType("File");if(kettleTrans.getRoleKey()==null){kettleTrans.setRoleKey("admin");}else{if(!kettleTrans.getRoleKey().contains("admin")){kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin"));}} return kettleTransMapper.updateKettleTrans(kettleTrans);}/*** 删除转换对象** @param ids 需要删除的数据ID* @return 结果*/@Overridepublic int deleteKettleTransByIds(String ids){return kettleTransMapper.deleteKettleTransByIds(Convert.toStrArray(ids));}/*** 删除转换信息** @param id 转换ID* @return 结果*/@Overridepublic int deleteKettleTransById(Long id){return kettleTransMapper.deleteKettleTransById(id);}/*** @Description:立即执行一次转换* @Author: Kone.wang* @Date: 2021/7/15 14:31* @param trans :* @return: void**/@Overridepublic AjaxResult run(KettleTrans trans) {Long id = trans.getId();KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id);if(kettleTrans ==null){return AjaxResult.error("转换不存在!");}XRepository repository=repositoryMapper.selectXRepositoryById(kettleTrans.getTransRepositoryId());if(repository==null){return AjaxResult.error("资源库不存在!");}String path = kettleTrans.getTransPath();try {kettleUtil.KETTLE_LOG_LEVEL=kettleTrans.getTransLogLevel();kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleTrans.getTransRepositoryId());kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();kettleUtil.callTrans(path,kettleTrans.getTransName(),null,null);} catch (Exception e) {e.printStackTrace();}return AjaxResult.success("执行成功!");}/*** @Description:查询抓换执行日志* @Author: Kone.wang* @Date: 2021/7/28 16:24* @param kettleTrans:* @return: java.util.List<java.lang.String>**/@Overridepublic List<String> queryTransLog(KettleTrans kettleTrans) {List<String> transLogs=kettleTransMapper.queryTransLog(kettleTrans.getTransName());return transLogs;}/*** @Description:设置定时执行转换* @Author: Kone.wang* @Date: 2021/7/21 14:59* @param id:* @param transName:* @return: com.ruoyi.common.core.domain.AjaxResult**/@Overridepublic AjaxResult runTransQuartz(String id, String transName) {KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(Long.valueOf(id));return run(kettleTrans);}/*** @Description:检查该转换是否设置了定时任务* @Author: Kone.wang* @Date: 2021/7/21 16:37* @param checkStr:* @return: int**/@Overridepublic Long checkQuartzExist(String checkStr) {return kettleTransMapper.checkQuartzExist(checkStr);}
}
job和定时任务的细节代码就不贴了,要不然全屏都是这些鬼东西.
c 完整代码
码云:点这里















