系统间通讯实现数据信息实时同步解决方案

article/2025/8/28 23:23:31

        项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送消息,让B系统后台进行自动数据信息同步。

        这个可以根据各企业各自的系统间通讯方式来灵活处理。这里我介绍我运用的处理方式,作为经验总结记录和分享。

        深谙spring的实现原理:使用dom4j技术对xml文件进行解析读取扫描注解通过反射机制来进行对象的创建,于是解决上述需求的方案由此得到启发。对于我们实际系统来说,这就是一个小框架,扩展性非常好,后来者只需要专注业务逻辑的实现即可完成数据同步的需求。

        下面先贴目录结构

         


这里运用先缓存业务逻辑处理方法的策略,即在服务器启动的时候就将写好的业务逻辑处理方法缓存到内存中,通过监听器监听到其他系统有发送同步消息时自动调用相应的处理方法来进行数据同步。

要缓存服务,需要用到注解和反射

下面贴上两个自定义注解:分别是类注解和方法注解

package com.zy.isc.common;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** <p>Title: IscClassMapping</p>* <p>Description: 消息业务处理类注解* 用于标识类为消息处理类,和IscMethodMapping方法注解配合使用* spring容器加载完成后会将具休的业务方法缓存起来,用于处理消息。* </p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface IscClassMapping {}
package com.zy.isc.common;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** <p>Title: IscMethodMapping</p>* <p>Description: 消息业务处理方法注解</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface IscMethodMapping {public String name();public String desc() default "";
}

然后定义一个通用的业务处理类,通过这个类来保存注解类对象,然后运用反射机制来调用具体的业务逻辑处理方法

package com.zy.isc.common;import java.io.Serializable;
import java.lang.reflect.Method;/*** <p>Title: ServiceBean</p>* <p>Description: 保存到map中的业务bean</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class ServiceBean implements Serializable{private static final long serialVersionUID = 7453372917648514518L;private Method method;private Object object;private String desc;public Method getMethod() {return method;}public void setMethod(Method method) {this.method = method;}public Object getObject() {return object;}public void setObject(Object object) {this.object = object;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}}

spring容器初始化时还需要做的另一件事——将带有注解的类和方法缓存在map中,key值就是方法上面的注解value值,key对应的value就是带注解的对应的业务处理类对象实例

package com.zy.isc.core_receive;import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;import com.zy.isc.common.IscClassMapping;
import com.zy.isc.common.IscMethodMapping;
import com.zy.isc.common.ServiceBean;/*** <p>Title: InitServiceMethodMapping</p>* <p>Description: spring容器启动时调用这里的初始化方法,将带有自定义注解的类和方法缓存在map中</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class InitServiceMethodMapping {private static Logger logger = LoggerFactory.getLogger(InitServiceMethodMapping.class);private static Map<String,ServiceBean> map = null;private InitServiceMethodMapping(){}public static Map<String,ServiceBean> getMethodMap(){return map;}public static synchronized void init() throws Exception{if(map == null){logger.info("initialize biz interface object and save into map start");map = new HashMap<String, ServiceBean>();ApplicationContext context = SpringContextReceiveUtil.getApplicationContext();for(String s : context.getBeanDefinitionNames()){Class<?> c = context.getBean(s).getClass();if(c.getAnnotation(IscClassMapping.class)!=null){Method[]method = c.getDeclaredMethods();ServiceBean serviceBean = null;for(Method m : method){IscMethodMapping mksIscMethodMapping = m.getAnnotation(IscMethodMapping.class);if(mksIscMethodMapping!=null){if(!map.containsKey(mksIscMethodMapping.name())){serviceBean = new ServiceBean();serviceBean.setObject(context.getBean(s));serviceBean.setMethod(m);serviceBean.setDesc(mksIscMethodMapping.desc());map.put(mksIscMethodMapping.name(),serviceBean);logger.info("@biz interface name:["+mksIscMethodMapping.name()+"],already saved in cached map@");}else{throw new Exception("initialize biz interface failed, name:["+mksIscMethodMapping.name()+"]repeated,please modify then try,classpath:"+c.getName());}}}}}logger.info("initialize biz interface object and save into map start,total biz interface count:"+map.size());}}
}

然后在spring容器启动的时候调用上述类中的初始化方法和启动消息监听器

package com.zy.isc.core_receive;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;import com.pingan.isc.ISCMessageBroker;
import com.zy.isc.handler.ServerHandler;public class SpringContextReceiveUtil implements ApplicationContextAware{private Logger logger = LoggerFactory.getLogger(SpringContextReceiveUtil.class);private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {logger.info("initialize spring context start:"+applicationContext);SpringContextReceiveUtil.applicationContext = applicationContext;logger.info("initialize spring context end, bean count:"+applicationContext.getBeanDefinitionCount());try {//这里缓存业务接口时容器还没有完全启动完成,所以使用纯线程来启动消息中心监听程序,以免影响启动超时的情况InitServiceMethodMapping.init();//启动消息监听initMessageHandler();} catch (Exception e) {logger.error("initialize spring context failed",e);}}public static ApplicationContext getApplicationContext(){return applicationContext;}public void initMessageHandler() {try {logger.info("initialize MSG listener start");//启动消息监听int corePoolSize = 10;int maximumPoolSize = 20;int keepAliveTime = 300;int queueSize = 100;ServerHandler handler = new ServerHandler();ISCMessageBroker.MessageExecutor(corePoolSize, maximumPoolSize, keepAliveTime, queueSize, handler);logger.info("initialize MSG listener end");} catch (Exception e) {logger.error("initialize MSG listener exception",e);}}
}

消息监听器启动时需要指定消息处理器,这个处理器实现了MessageHandler接口,一旦有消息从其他系统发过来,监听器监听到消息到来就会调用messageReceived(Object arg0)这个方法,参数即为接收到的消息

package com.zy.isc.handler;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.pingan.isc.core.MessageHandler;
import com.zy.isc.core_receive.ServiceReceiveExecutor;public class ServerHandler implements MessageHandler {private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);@Overridepublic void messageReceived(Object arg0) throws Exception{try {logger.info("=======invoke biz method start=======");long start = System.currentTimeMillis();ServiceReceiveExecutor.execute(arg0);long end = System.currentTimeMillis();logger.info("=======invoke biz method end=======");logger.info("time cost:"+(end-start)/1000);}catch (Exception e) {logger.error("Message Received Exception"+arg0,e);}}
}

然后在这个类的messageReceived(Object arg0)方法中再调用接收消息执行器将接收到的消息进行处理,解析消息内容得到里面用来标记具体业务逻辑处理方法的值,然后将该值与缓存在map中的key值比对,找到对应的方法用反射来调用。

package com.zy.isc.core_receive;import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.dc.eai.data.CompositeData;
import com.dc.eai.data.Field;
import com.dcfs.esb.client.converter.PackUtil;
import com.zy.isc.common.ServiceBean;/*** <p>Title: ServiceReceiveExcutor</p>* <p>Description:接收ISC消息执行具体的业务方法 </p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class ServiceReceiveExecutor {private static final Logger logger = LoggerFactory.getLogger(ServiceReceiveExecutor.class);//读取配置文件private static ResourceBundle resource;static{resource = ResourceBundle.getBundle("IscConfigPlus");}public static String getValue(String key){return resource.getString(key);}/*** <p>Description:根据消息子主题和业务码执行具体的业务方法 </p>* @param message* @return*/public static Object execute(Object message) throws Exception{logger.info("===== unpack message start =====");Map<String, Object> map = unpackMSG(message);String uniqueId = null;CompositeData compositeData = null;CompositeData body = null;if (map != null && map.size() > 0) {uniqueId = (String) map.get("uniqueId");compositeData = (CompositeData) map.get("compositeData");body = compositeData.getStruct("BODY");}else {logger.info("message is null");return null;}logger.info("===== unpack message end =====");try {if(StringUtils.isBlank(uniqueId)){logger.error("uniqueId is null,no method matches , message infomation:\r\n"+message);throw new Exception("uniqueId is null,no method matches , message infomation:\r\n"+message);}boolean isContainsKey = InitServiceMethodMapping.getMethodMap().containsKey(uniqueId);if (isContainsKey) {ServiceBean serviceBean = InitServiceMethodMapping.getMethodMap().get(uniqueId);logger.info("request biz interface's ID:["+uniqueId+"],biz interface description["+serviceBean.getDesc()+"]");return serviceBean.getMethod().invoke(serviceBean.getObject(),body);}else {logger.info("no method maches the request,message information\r\n" + compositeData );}} catch (Exception e) {logger.error("biz method exception,args:\r\n",e);throw e;}return null;}/*** <p>Description: 标准报文体解包,将报文中的消息子主题和交易码拼接后作为业务逻辑方法的唯一标识</p>* @param message* @return*/public static Map<String,Object> unpackMSG(Object message){Map<String, Object> retMap = new HashMap<String, Object>();if (message != null) {//解析出报文体,存到map中CompositeData compositeData  = PackUtil.unpackXmlStr((String)message);//打印此日志方便查看报文,生产环境去掉logger.debug("message content:\r\n" + compositeData);retMap.put("compositeData", compositeData);CompositeData body = compositeData.getStruct("BODY");retMap.put("body", body);Map<String, Object>dataMap = new HashMap<String, Object>();Enumeration<String> keys = resource.getKeys();while (keys.hasMoreElements()) {String key = (String) keys.nextElement();String value = getValue(key);CompositeData struct = compositeData.getStruct(value);if (struct != null && struct.size()>0) {logger.debug("key:value ——> " + key+":"+value);dataMap.put(key, struct);}}logger.debug("dataMap:\r\n"+dataMap);if (dataMap != null && dataMap.size()>0) {CompositeData iscSysHeadCompositeData = (CompositeData) dataMap.get("iscSysHead");CompositeData iscPubHeadCompositeData = (CompositeData) dataMap.get("iscPubHead");Field subTopicField = iscSysHeadCompositeData.getField("SUB_TOPIC");Field serviceCodeField = iscPubHeadCompositeData.getField("SERVICE_CODE");if (subTopicField != null) {String subTopic = subTopicField.getValue().toString();logger.debug("message subtopic: " + subTopic);retMap.put("subTopic", subTopic);}else {retMap.put("subTopic", "");}if (serviceCodeField != null) {String serviceCode = serviceCodeField.getValue().toString();logger.debug("message serviceCode: " + serviceCode);retMap.put("serviceCode", serviceCode);}else {retMap.put("serviceCode", "");}String subTopic = retMap.get("subTopic").toString();String serviceCode = retMap.get("serviceCode").toString();String uniqueId = subTopic + serviceCode;retMap.put("uniqueId", uniqueId);}else {logger.info("dataMap is null,uniqueId is null");}}return retMap;}
}

这个ServiceReceiveExecutor类会将消息中解析出来的报文body通过反射参数传到具体的业务逻辑处理类中,最后就是具体的业务逻辑处理类了,这个类或者方法可以按相同的方式进行任意扩展

       

package com.zy.isc.service;import java.util.HashMap;
import java.util.Map;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import com.dc.eai.data.CompositeData;
import com.dc.eai.data.Field;
import com.zy.isc.common.IscClassMapping;
import com.zy.isc.common.IscMethodMapping;/*** <p>Title: UserInfoService</p>* <p>Description: 接收BECIF广播用户信息Service</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Service
@IscClassMapping
public class UserInfoService {private static final Logger logger = LoggerFactory.getLogger(UserInfoService.class);//服务类//@Resource(name = "userService")//private UserService userService;//name 唯一标识 = 子主题(20005)+ 交易码(000012)@IscMethodMapping(name="20005000012",desc="xxx业务需求描述")public void userInfoCombine(CompositeData compositeData) throws Exception{Field clientNoField = compositeData.getField("CLIENT_NO");Field clientNo1Field = compositeData.getField("CLIENT_NO1");String clientNo = null;String clientNo1 = null;if (clientNoField != null) {clientNo = clientNoField.getValue().toString();logger.info("combine becif:" + clientNo);}if (clientNo1Field != null) {clientNo1 = clientNo1Field.getValue().toString();logger.info("combined becif: " + clientNo1);}  if (StringUtils.isNotBlank(clientNo1) && StringUtils.isNotBlank(clientNo)) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("clientNo", clientNo);paramMap.put("aClentNo", clientNo1);//boolean flag = scfpUserService.checkBecifExist(clientNo1);boolean flag = false;logger.info("becif exist:" + flag);if (flag) {try {//scfpUserService.combineUserBecif(paramMap);} catch (Exception e) {logger.error("userInfoCombine() method exception:" + e.getMessage());}}}}
}

最后还有一个容器销毁时释放缓存的监听器

package com.zy.isc.listener;import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.pingan.isc.ISCMessageBroker;/*** <p>Title: StartupListener</p>* <p>Description:</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class StartupListener implements ServletContextListener {private static final Logger LOGGER = LoggerFactory.getLogger(StartupListener.class);@Overridepublic void contextInitialized(ServletContextEvent arg0) {LOGGER.info("===== MSG listener preparation =====");}@Overridepublic void contextDestroyed(ServletContextEvent arg0) {ISCMessageBroker.destroyed();LOGGER.info("===== ISCMessageBroker destroyed,resource release =====");}}
这样整个小框架就完毕了,使用时只需要在spring的配置文件中将SpringContextReceiveUtil这个类的bean配置好,在web.xml中配置StartupListener这个监听器就可以使用了。接收消息按UserInfoService类方法上面注解唯一标识来区分。后续还有其他消息要接收,直接按照这种注解方式在UserInfoService类中扩展或者另外新增类似UserInfoService类都可以。

http://chatgpt.dhexx.cn/article/FnZHAR4r.shtml

相关文章

一文带你玩转实时数据同步方案

1、概述 1.1、目标 实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库&#xff0c;目标数据包括mysql数据库和hbase数据库。 下面是实时数据同步的数据流转图&#xff0c;mysql的增量订阅数据经过canal和kafka&#xff0c;数据最终实时…

两台服务器同时写文件 怎么做同步,两台服务器做实时数据同步

两台服务器做实时数据同步 内容精选 换一换 DCS Memcached即将下线&#xff0c;部分Region已暂停售卖&#xff0c;建议使用Redis4.0/5.0。本章节主要描述Memcached主备实例。Memcached主备实例在单机实例基础上&#xff0c;增强服务高可用以及数据高可靠性。Memcached主备实例具…

实时数据同步方案

一.Flume收集各数据库日志&#xff0c;准实时抽取到HDFS 安装HDP&#xff0c;包含Flume 方案优点&#xff1a; 1.配置简单&#xff0c;不用编程&#xff1a;只要在flume.conf文件中配置source、channel及sink的相关属性 2.采用普通SQL轮询的方式实现&#xff0c;具有通用性&…

如何实现数据自动化的实时同步?

企业在日常业务中&#xff0c;比如总分支机构之间、数据中心之间、不同节点之间、跨国业务之间等&#xff0c;都需要将文件及时的传输&#xff0c;以供协同使用。所以&#xff0c;很多企业会选择一些同步工具或软件。 谈到文件同步备份大家使用较多的可能是Rsync、同步盘等一些…

像素是什么,一个像素有多大,像素和分辨率的关系

图片的像素和分辨率 对于像素和分辨率这两个词&#xff0c;主要见于图片和显示设备上。只要你用到手机里的照相功能&#xff0c;你都要接触到这两个概念。只是大多数人都是一知半解&#xff0c;而更多的人却根本就不知道&#xff0c;白白浪费了手机里500万、800万像素的摄影头&…

屏幕尺寸、分辨率、DPI、PPI

屏幕尺寸 下面这张图是华为荣耀7的尺寸图&#xff0c;图上写的是5.2英寸。我们所说的这个5.2英寸是手机屏幕对角线的长度。 我们平时是不用英寸这个单位的&#xff0c;我们用的是毫米&#xff0c;厘米&#xff0c;米这些单位。那么英寸和毫米&#xff0c;厘米之间怎样的换算关…

像素(Pixel)、DPI与PPI一看就明白

像素&#xff08;Pixel&#xff09;、DPI与PPI 像素&#xff08;Pixel&#xff09;DPI 英文全写是(Dots Per Inch&#xff0c;每英寸点数)PPI 英文全写是(Pixels Per Inch&#xff0c;每英寸像素数)比喻来区分应用&#xff08;这张图熟悉吗&#xff09;1.分辨率啥意思&#xff…

关于像素、分辨率、PPI、DPI等概念的分析

关于影像图的比例尺和分辨率&#xff1a;https://blog.csdn.net/liliiii/article/details/40261953 当我们说到 像素、分辨率、DPI、PPI等专业术语的时候&#xff0c;一般会涉及到图像、屏幕、打印机等等。 像素&#xff08;Pixel&#xff09;为图像显示的基本单位&#xff0c;…

分辨率 PPI DPI概念定义详解

我们在开发中&#xff0c;涉及到UI显示时&#xff0c;经常会遇到的一些概念&#xff0c;比如分辨率&#xff0c;ppi&#xff0c;dpi等&#xff0c;这些概念&#xff0c;在百度百科中&#xff0c;发现都有对它们的定义&#xff0c;一些博客中&#xff0c;也有对这几个概念的对比…

DPI与PPI的区别

开发中不免会遇到分辨率、DPI、PPI和屏幕尺寸等术语&#xff0c;那就弄弄清楚这些概念的真正含义。 分辨率 分辨率这个词在很多地方都有&#xff0c;比如相机、视频、扫描仪。这里说的就是显示器的分辨率。显示器是由一个个像素点(pixel)所组成的&#xff0c;一般所说的显示器…

传感器尺寸、像素、DPI分辨率、英寸、毫米的关系

虽然网上有很多这种资料&#xff0c;但是太过于复杂&#xff0c;每个人的说法都不一样&#xff0c;看的让人云里雾里的&#xff0c;我总结了一下&#xff0c;不知道对不对&#xff01; 1. 1英寸25.4mm 2. 传感器尺寸&#xff1a;传感器的尺寸是指传感器的大小&#xff0c;一般…

dpi 、 dip 、分辨率、屏幕尺寸、px、density 关系以及换算

一、基本概念 dip &#xff1a; Density independent pixels &#xff0c;设备无关像素。dp &#xff1a;就是dippx &#xff1a; 像素dpi &#xff1a;dots per inch &#xff0c; 直接来说就是一英寸多少个像素点。常见取值 120&#xff0c;160&…

什么是 PPI、DPI 及 像素、分辨率的区别?

什么是 PPI、DPI 及 像素、分辨率的区别&#xff1f; AbstractPPIDPI像素 Pixels分辨率 Resolution Abstract 笔者通过本篇文章重点来阐明相对迷惑的PPI、DPI概念&#xff0c;PPI 和 DPI 是处理图像时需要深入理解的概念&#xff0c;两者都指的时分辨率或清晰度&#xff0c;但…

DPI、像素与分辨率的区别和联系

DPI&#xff08;Dot Per Inch&#xff09; 表示打印分辨率&#xff0c;指每英寸长度上的点数。DPI 又可细分为水平分辨率和垂直分辨率&#xff0c;例如一张 “1英寸*1英寸” &#xff08;1英寸2.54cm&#xff09;的图片&#xff0c;如果它的水平分辨率是100 dpi&#xff0c;垂直…

一文详解像素、DPI、分辨率之间的关系

1.像素 像素&#xff1a;是指在由一个数字序列表示的图像中的一个最小单位&#xff0c;称为像素。 像素可以用一个数表示&#xff0c;比如一个“0.3兆像素”数码相机&#xff0c;它有额定30万像素&#xff1b;也可以用一对数字表示&#xff0c;例如“640x480显示器”&#xff0…

分辨率、Dpi 的关系

概念&#xff1a;分辨率&#xff0c;指的是图像或者显示屏在长和宽上各拥有的像素个数。比如一张照片分辨率为1920x1080&#xff0c;意思是这张照片是由横向1920个像素点和纵向1080个像素点构成&#xff0c;一共包含了1920x1080个像素点。dpi是分辨率的表示单位之一。它是英文D…

图片分辨率像素与DPI的关系

来自网络的一张图片&#xff1a;https://www.duitang.com/blog/?id788101874 这张图片大小(通过鼠标右键点击菜单"属性"获得): 这里面图片的分辨率和dpi两种值。 现在讲讲两者关系。 将图片复制粘贴到word,我们可以获得图片的"原始尺寸"。 如下图所示…

SPSS新手教程——进行距离分析的方法

我们在使用IBM SPSS Statistics来进行数据分析的时候&#xff0c;难免会遇上这种情况:变量非常多&#xff0c;多到我们不能对其一一控制的地步&#xff0c;但每个变量都有分析的价值&#xff0c;同时又彼此重叠。这个时候最直接的方法就是把所有变量按照一定的标准来进行分类&a…

SPSS的入门

1.SPSS的起源 SPSS&#xff08;全称&#xff1a;Statistical Product and Service Solutions&#xff09;是世界上最早的统计分析软件&#xff0c;由美国斯坦福大学的三位研究生Norman H. Nie、C.Hadlai (Tex) Hull和Dale H. Bent于1968年研发成功。SPSS采用类似EXCEL表格的方…

SPSS 独立样本t检验方法

检验变量&#xff1a;要做检验的变量 分组变量&#xff1a;选择给数据分组的变量 这里只能检验两个组间的差异&#xff0c;因此只能输入两个组 组1&#xff0c;组2&#xff1a;输入你分组变量中的值 比如&#xff0c;我这里检验得是ADHD与CN之间的差异&#xff0c;所以我输入…