关于数据同步的几种实现

article/2025/9/11 23:08:27

关于数据同步的几种实现

概述

关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现数据的同步。通过程序编码实现数据同步,其主要的实现思路很容易理解,即有就更新,无则新增,其他情况日志记录,就不做过多的介绍,这里主要讲述的是第二个层面的数据同步,即在数据库层面实现数据同步。

数据库层面的数据库同步主要有三种方式:通过发布/订阅的方式实现同步,通过SQL JOB方式实现数据同步,通过Service Broker 消息队列的方式实现数据同步。

下面分别就这三种数据同步方式,一一详解。

1.    通过发布/订阅的方式实现同步

发布/订阅是Sql Server自带的一种数据库备份的机制,通过该机制可以快速的实现数据的备份同步,不用编写任何的代码。

此种数据同步的方式存在的以下的一些问题:

  1. 表结构不能更改,同步双方的表结构必须一致,一旦表结构发生更改需要重新生成数据库快照。
  2. 对于大数据量的同步没有可靠的保证。
  3. 网络不稳定的情况下同步也不能保证。

总的来说,这种数据备份同步的方式,在表结构一致、数据量不是特别大的情况下还是非常高效的一种同步方式。

网上有很多的关于如何使用发布/订阅的方式实现数据同步的操作示例,这里就不再重复的演示了,有兴趣想要了解的朋友可以参考下面这篇文章:

http://kb.cnblogs.com/page/103975/

2.    通过SQL JOB方式实现数据同步

通过Sql Job定时作业的方式实现同步其基本原理就是通过目标服务器和源服务器的连接,然后通过编写Sql语句,从源服务器中读取数据,再更新到目标服务器。

这种数据同步的方式比较灵活。创建过sql定时作业之后,主要需要执行以下关键的两步。

2.1     创建数据库连接(一般作为定时作业执行的第一步)

不同数据库之间的连接可以通过系统的存储过程实现。下面就直接用一个示例来讲一下如何创建数据库连接。

--添加一个连接

--系统存储过程sp_addlinkedserver 参数:

----------------------1:目标服务器的IP或别名,本例中为:'WIN-S1PO3UA6J7I';----------------------2:'' (srvproduct,默认);

----------------------3:'SQLOLEDB'(provider,默认值);

----------------------4:目标服务器的IP或别名(datasrc),本例中为:'WIN-S1PO3UA6J7I'

exec sp_addlinkedserver 'WIN-S1PO3UA6J7I','','SQLOLEDB','WIN-S1PO3UA6J7I'

--添加登录用户连接

--系统存储过程sp_addlinkedsrvlogin 参数:

----------------------1:目标服务器的IP或别名,本例中为:'WIN-S1PO3UA6J7I';

----------------------2:'false',默认值;

----------------------3:null,默认值;

----------------------4:'sa',登录用户名;

----------------------5:'pass@word1',登录密码;

exec sp_addlinkedsrvlogin 'WIN-S1PO3UA6J7I','false',null,'sa','pass@word1'

创建数据库连接主要用到了以上的两个存储过程,但是在实际操作的过程中可能会遇到“仍有对服务器XXX的远程登录或连接登录问题”这样的问题,如果遇到此类问题,在执行上边的添加连接和登录用户连接之前还需要先删除某个已存在的链接,具体如下:

--系统存储过程sp_droplinkedsrvlogin 参数:

----------------------1:目标服务器的IP或别名,本例中为:'WIN-S1PO3UA6J7I';----------------------2:null

exec sp_droplinkedsrvlogin 'WIN-S1PO3UA6J7I',null

--系统存储过程sp_dropserver 参数:

----------------------1:目标服务器的IP或别名,本例中为:'WIN-S1PO3UA6J7I'

exec sp_dropserver 'WIN-S1PO3UA6J7I'

2.2     使用SQL 语句 实现数据同步

主要的同步思路:

1:在目标数据库中先清空要同步的表的数据

2:使用insert into Table (Cloumn....) select Column..... from 服务器别名或IP.目标数据库名.dbo.TableName 的语法将数据从源数据库读取并插入到目标数据库

Truncate  table  Org_DepartmentsExt –删除现有系统中已存在的部门表

insert into Org_DepartmentsExt –从名为WIN-S1PO3UA6J7I的服务器上的DBFrom数据库上获取源数据,并同步到目标数据库中

     (

      [DeptID]

      ,[DeptStatus]

      ,[DeptTel]

      ,[DeptBrief]

      ,[DeptFunctions] 

     )

SELECT [DeptID]

      ,[DeptStatus]

      ,[DeptTel]

      ,[DeptBrief]

      ,[DeptFunctions]

  FROM [WIN-S1PO3UA6J7I].[DBFrom].[dbo].[Org_DepartmentsExt]

以上这两步便是通过SQL Job实现数据同步的关键步骤,在完成以上两步之后,如果没有其他的表要进行同步,则可创建同步计划以完善定时作业。带作业创建完后,便可以执行。

这里主要只是演示了通过Sql Job方式实现数据同步的关键步骤。网上有很多具体的实例演示。有兴趣的朋友可以参考以下文章进行练习检验:

http://www.cnblogs.com/tyb1222/archive/2011/05/27/2060075.html

3.    通过SQL Server Service Broker 消息队列的方式实现数据同步

3.1 SQL Server Service Broker概述

SQL Server Service Broker 是数据库引擎的组成部分,为 SQL Server 提供队列和可靠的消息传递。既可用于使用单个 SQL Server 实例的应用程序,也可用于在多个实例间分发工作的应用程序。

在单个 SQL Server 实例内,Service Broker 提供了一个功能强大的异步编程模型。数据库应用程序通常使用异步编程来缩短交互式响应时间,并增加应用程序总吞吐量。

在多个SQL Server实例之间Service Broker 还可以提供可靠的消息传递服务。Service Broker 可帮助开发人员通过称为服务的独立、自包含的组件来编写应用程序。需要使用这些服务中所包含功能的应用程序可以使用消息来与这些服务进行交互。Service Broker 使用 TCP/IP 在实例间交换消息。Service Broker 中所包含的功能有助于防止未经授权的网络访问,并可以对通过网络发送的消息进行加密。

3.2 具体的实现演示

在这一小节里,主要是通过一个完整的数据同步的流程向大家演示,如何实现同一个数据库实例不同数据库的数据同步。关于不同的数据库实例间的数据库的数据同步整体上跟同一个实例的数据库同步是一样的,只不过在不同的数据库实例间同步时还需启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作。

这里边用到了大量的SQL Server XML的东西,如果有不理解的地方可以参考以下链接:http://www.cnblogs.com/Olive116/p/3355840.html

这是我在做技术准备时,自己的一点学习记录。

下面就是具体的实现步骤:

3.2.1为数据库启动Service Broker活动

    这一步主要是用来对要进行数据同步的数据启用Service Broker 活动,并且授信。

USE master
GO
--如果数据库DBFrom、DBTo不存在,则创建相应的数据库
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name ='DBFrom')
CREATE DATABASE DBFrom
GO
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name ='DBTo')
CREATE DATABASE DBTo
GO
--分别为该数据库启用Service Broker活动并且授权信任
ALTER DATABASE DBFrom SET ENABLE_BROKER
GO
ALTER DATABASE DBFrom SET TRUSTWORTHY ON
GO
ALTER AUTHORIZATION ON DATABASE::DBFrom To sa
GO
ALTER DATABASE DBTo SET ENABLE_BROKER
GO
ALTER DATABASE DBTo SET TRUSTWORTHY ON
GO
ALTER AUTHORIZATION ON DATABASE::DBTo TO sa
GO

 

3.2.2 创建数据库主密匙

这一步主要用来创建数据库主密匙,上边有提到Service Broker可以对要发送的消息进行加密。

Use DBFrom
go
create master key
encryption by password='pass@word1'
goUse DBTo
go
create master key
encryption by password='pass@word1'
go

 

3.2.3 创建消息类型、协定

这里主要用来创建消息类型和消息协定,源数据库和目标数据库的消息类型和协定都要一致。

Use DBFrom
go
--数据同步—消息类型
create message type [http://oa.founder.com/Data/Sync]
validation=well_formed_xml
go
--数据同步--错误反馈消息类型
create message type [http://oa.founder.com/Data/Sync/Error]
validation=well_formed_xml
go
--数据同步协议
create contract[http://oa.founder.com/Data/SyncContract]
(
[http://oa.founder.com/Data/Sync]
sent by initiator,
[http://oa.founder.com/Data/Sync/Error]
sent by target
)
goUse DBTo
go
--数据同步—消息类型
create message type [http://oa.founder.com/Data/Sync]
validation=well_formed_xml
go
--数据同步--错误反馈消息类型
create message type [http://oa.founder.com/Data/Sync/Error]
validation=well_formed_xml
go
--数据同步协议create contract[http://oa.founder.com/Data/SyncContract]
(
[http://oa.founder.com/Data/Sync]
sent by initiator,
[http://oa.founder.com/Data/Sync/Error]
sent by target
)
Go

 

创建过之后效果如下图:

                       

3.2.4 创建消息队列

    这里主要用来创建消息队列,源数据库和目标数据库都要创建,队列名字可以自主命名。

use DBFrom
go
create queue [DBFrom_DataSyncQueue]
with status=on
gouse DBTo
go
create queue [DBFrom_DataSyncQueue]
with status=on
go

 

创建之后效果如下图:

 

3.2.5 创建数据同步服务

这里我们通过利用上边创建的消息协定和消息队列来创建数据同步的服务。

use DBFrom
go
create service [http://oa.founder.com/DBFrom/Data/SyncService]
on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract])
go--数据同步服务
use DBTo
go
create service [http://oa.founder.com/DBTo/Data/SyncService]
on queue dbo.[DBFrom_DataSyncQueue]([http://oa.founder.com/Data/SyncContract])
go

 

    创建后效果如下图:

 

3.2.6 在源数据库上创建服务配置列表

这里需要在源数据库上创建一个服务配置列表,主要用来保存之前创建过的服务名称,本例只是用来演示,所以只创建了一个服务,只能是同步一个数据表,如果有多个数据表需要同步,则需创建多个服务,所以这里创建一个服务配置列表,用来存储多个服务的服务名称。

需要注意的是,下面的脚本在执行完创建表的操作之后又插入了一条数据,也就是上边我们创建的服务名,如果有多个服务的话,依次插入该表即可。

use DBFrom
go
--同步数据--目标服务配置
create table SyncDataFarServices
(
ServiceID uniqueidentifier,
ServiceName nvarchar(256)
)
go
--将上边创建的服务名,插入此表中
insert into SyncDataFarServices (ServiceID,ServiceName)
values
(NEWID(),'http://oa.founder.com/DBTo/Data/SyncService')
go

 

效果如下图:

 

 

3.2.7 发送数据同步消息

    这里创建了一个存储过程主要用来发送同步消息,该消息内容主要包括操作类型、主键、表名、正文内容,分别对应@DMLType,@PrimaryKeyField,@TableName,@XMLData。然后通过创建一个游标来条的读取上边创建的服务列表中的列表信息,向不同的服务发送消息。

   

 Use DBFrom
go
--发送同步数据消息
Create procedure UP_SyncDataSendMsg
(
@PrimaryKeyField nvarchar(128),
@TableName nvarchar(128),
@DMLType char(1),
@XMLData xml
)
as
beginSET @XMLData.modify('insert <DMLType>{sql:variable("@DMLType")}</DMLType>  as first into /');SET @XMLData.modify('insert <PrimaryKeyField>{sql:variable("@PrimaryKeyField")}</PrimaryKeyField>  as first into /');SET @XMLData.modify('insert <Table>{sql:variable("@TableName")}</Table> as first into /');DECLARE FarServices CURSOR FOR SELECT ServiceName FROM SyncDataFarServices;  open FarServicesdeclare @FarServiceName nvarchar(256);fetch FarServices into @FarServiceName;while @@FETCH_STATUS=0beginbegin Transactiondeclare @Conv_Handler uniqueidentifierbegin DIALOG conversation @Conv_Handler --开始一个会话from service [http://oa.founder.com/DBFrom/Data/SyncService]to service @FarServiceNameon contract [http://oa.founder.com/Data/SyncContract];          send on conversation @Conv_HandlerMessage type [http://oa.founder.com/Data/Sync](@XMLData);fetch FarServices into @FarServiceName;commit;endclose FarServices;deallocate FarServices;
end
go

 

 

3.2.8 创建数据同步异常信息记录表

这里创建该表主要用来记录在数据同步过程中出现的异常信息。

use DBFrom
go
create Table dbo.SyncException
(
ErrorID uniqueidentifier,
ConversationHandleID uniqueidentifier,
ErrorNumber int,
ErrorSeverity int,
ErrorState int,
ErrorProcedure nvarchar(126),
ErrorLine int,
ErrorMessage nvarchar(2048),
MessageContent nvarchar(max),
CreateDate DateTime
)
go--修改异常信息记录表
alter table dbo.SyncException
add
PrimaryKeyField nvarchar(128),
TableName nvarchar(128),
DMLType char(1),
DBName nvarchar(128)
Go

 

 

效果如下图:

 

3.2.9 数据同步反馈

这里主要用来在源数据库中接收队列中的消息,将同时出错的信息,解析一下,然后插入到异常信息记录表里边。

--数据同步回馈

use DBFrom
go
create procedure UP_SyncDataFeedback
as
begin
set nocount on
--会话变量声明
declare @ConversationHandle uniqueidentifier;--会话句柄
declare @Msg_Body nvarchar(max);
declare @Msg_Type_Name sysname;
--变量赋值
while(1=1)
beginbegin transaction--从队列中接收消息waitfor(receive top(1)@Msg_Type_Name=message_type_name,@ConversationHandle=[conversation_handle],@Msg_Body=message_bodyfrom dbo.[DBFrom_DataSyncQueue]),timeout 1000--如果接收到消息处理,否则跳过if(@@ROWCOUNT<=0)break;if @Msg_Type_Name='http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'end conversation @ConversationHandle;else if @Msg_Type_Name='http://oa.founder.com/Data/Sync/Error'begindeclare @DataSource xml;set @DataSource=Convert(xml,@Msg_Body);insert into dbo.SyncException(ErrorID,ConversationHandleID,ErrorNumber,ErrorSeverity,ErrorState,ErrorProcedure,ErrorLine,ErrorMessage,
PrimaryKeyField,TableName,DMLType,MessageContent,DBName,CreateDate)selectNEWID(),@ConversationHandle,T.c.value('./@ErrNumber','INT'),T.c.value('./@ErrSeverity','INT'),T.c.value('./@ErrState','INT'),T.c.value('./@ErrProcedure','Nvarchar(126)'),T.c.value('./@ErrLine','INT'),T.c.value('./@ErrMessage','nvarchar(2048)'),T.c.value('./@PrimaryKeyField','nvarchar(128)'),T.c.value('./@TableName','nvarchar(128)'),T.c.value('./@DMLType','char(1)'),T.c.value('./@MessageContent','nvarchar(max)'),T.c.value('./@DBName','nvarchar(128)'),GETDATE()from @DataSource.nodes('/row') as T(c);endelse if @Msg_Type_Name='http://schemas.microsoft.com/SQL/ServiceBroker/Error'end conversation @ConversationHandle;commit Transaction;
end
end
commit;
go

 

3.2.10对Service Broker队列使用内部激活,并指定将调用的存储过程

    这里主要用来激活源数据库的消息队列,并为其指定调用的存储过程,即上边3.2.9 中创建的存储过程。

--对Service Broker队列使用内部激活,并指定将调用的存储过程
use DBFrom
go
alter queue dbo.DBFrom_DataSyncQueue with activation
(status=on,max_queue_Readers=1,procedure_name=UP_SyncDataFeedback,execute as owner
);
Go

 

3.2.11 在源数据库中为需要同步的数据表创建触发器

这里就以用户表为例,具体操作如下,这里通过查询系统的Inserted和Deleted临时表来判断执行同步的操作类型是更新(U)、新增(A)还是删除(D),最后调用3.2.7 中创建的存储过程来对数据进行处理并发送。

use DBFrom
Go
--用户信息同步
Create Trigger UT_DataSync_Users
on dbo.Org_Users
after insert,update,delete
as
set nocount on ;
--变量声明
declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@DMLType char(1);
declare @InsertCount int ,@DeleteCount int ;
declare @XMLData xml;
--变量赋值
set @PrimaryKeyField='ID' --组合主键,多个主键使用","隔开
set @TableName='Org_Users'
set @InsertCount=(select COUNT(*) from inserted)
set @DeleteCount=(select COUNT(*) from deleted)
if @InsertCount=@DeleteCount and @InsertCount<>0  ----Updatebeginselect @XMLData=(select * from inserted For xml raw,binary base64,ELEMENTS XSINIL);set @DMLType='U';end
else if(@InsertCount<>0 and @DeleteCount=0) ----Insertbeginselect @XMLData=(select * from inserted for xml raw ,Binary base64,ELEMENTS XSINIL)set @DMLType='A';end
else----Deletebeginselect @XMLData=(select *from deleted for xml raw,binary base64,ELEMENTS XSINIL)set @DMLType='D';endif(@XMLData is not null)beginexec UP_SyncDataSendMsg @PrimaryKeyField,@TableName,@DMLType,@XMLData;endgo

 

3.2.12 目标数据库中创建,字符分割函数

该函数主要是用来进行字符分割,用来处理主键有多个字段的情况。

--目标数据库

use DBTo
go
--转换用‘,'分割的字符串@str
create Function dbo.uf_SplitString
(
@str nvarchar(max),
@Separator nchar(1)=','
)
returns nvarchar(2000)
as
begindeclare @Fields xml;--结果字段列表declare @Num int;-----记录循环次数declare @Pos int;-----记录开始搜索位置declare @NextPos int;--搜索位置临时变量declare @FieldValue nvarchar(256);--搜索结果set @Num=0;set @Pos=1;set @Fields=CONVERT(xml,'<Fields></Fields>');while (@Pos<=LEN(@Str))  beginselect @NextPos=CHARINDEX(@Separator,@Str,@Pos)if(@NextPos=0 OR @NextPos is null)select @NextPos=LEN(@Str)+1;select @FieldValue=RTRIM(ltrim(substring(@Str,@Pos,@NextPos-@Pos)))select @Pos=@NextPos+1set @Num=@Num+1;if @FieldValue<> ''beginset @Fields.modify('insert <Field>{sql:variable("@FieldValue")}</Field> as last into /Fields[1]');           end   endreturn Convert(nvarchar(2000),@Fields);
end
go

 

3.2.13 将解析过的消息信息,根据操作类型的不同同步到数据表中

    这是所有的数据同步中最关键也是最复杂的一步了,在整个开发的过程中,大部分时间都花在这上边了,具体的操作都在下面解释的很清楚了。

--将XML数据源中的数据同步到数据表中(包括增删改)Use DBTo
go
create function dbo.UF_XMLDataSourceToSQL
(@DataSource XML,--数据源@TableName varchar(128),--同步数据表名称@PrimaryKeyField varchar(128),--需要同步的表的主键,主键为多个时用‘,'隔开@DMLType char(1) --A:新建;U:编辑;D:删除
)
returns nvarchar(4000)
as
begin--变量声明及数据初始化--声明数据表@TableName列Column相关信息变量declare @ColumnName nvarchar(128),@DataType nvarchar(128),@MaxLength int;--声明用于拼接SQL的变量declare @FieldsList nvarchar(4000),@QueryStatement nvarchar(4000);declare @Sql nvarchar(4000);declare @StrLength int;--变量初始化set @FieldsList='  ';--初始化变量不为null,否则对变量使用'+='操作符无效set @QueryStatement='  ';--主键信息,根据参数求解如:<Fields><Field>ID1</Field><Field>ID2</Field></Fields>declare @PKs xml;--当前字段是否主键-在‘更新’,‘删除’同步数据时使用declare @IsPK nvarchar(128);--初始化游标--游标内容包括目标数据表TableName列信息DECLARE ColumnNameList CURSOR FOR SELECT COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME=@TableName AND 
DATA_TYPE<>'xml';--数据处理if @DMLType='A'--插入数据beginopen ColumnNameListfetch ColumnNameList into @ColumnName,@DataType,@MaxLength;while @@FETCH_STATUS=0begin--判断数据源列中是否存在属性:@ColumnName--判断数据源列中是否存在--元素:@ColumnNameIf @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1begin--拼接SQLset @FieldsList+=(@ColumnName+',');set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素读取(包含空值情况)if @MaxLength is not null and @MaxLength<>-1beginset @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';endelse if @MaxLength=-1 and @DataType<>'xml'--已调整beginset @QueryStatement+='(MAX)';endset @QueryStatement+=(''') as '+@ColumnName+',');endfetch ColumnNameList into @ColumnName,@DataType,@MaxLengthendclose ColumnNameList;deallocate ColumnNameList;set @StrLength=LEN(@FieldsList);--去掉@FieldsList结尾的’,'set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);       set @StrLength=LEN(@QueryStatement);--去掉@QueryStatement结尾的’,'set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);set @Sql=N'insert into '+@TableName+'('+@FieldsList+') select '+@QueryStatement+' from @DataSource.nodes(''/row/'') as T(c)';endelse if @DMLType='U'--更新数据begin--更新语句where 后的条件表达式declare @Condition nvarchar(1000);set @Condition='  ';set @PKs=CONVERT(xml,dbo.uf_SplitString(@PrimaryKeyField,','));Open ColumnNameListfetch ColumnNameList into @ColumnName,@DataType,@MaxLength;while @@FETCH_STATUS=0begin--判断数据源列中是否存在元素:@ColumnNameif @DataSource.exist('/row/*[local-name()=sql:variable("@ColumnName")]')=1beginset @IsPK=null;SELECT @IsPk=Fs.F FROM (SELECT T.c.value('.[text()]','Nvarchar(128)') AS F FROM @PKs.nodes('/Fields/Field') AS T(c))Fs Where Fs.F=@ColumnNameif @IsPK is null or @IsPK=''begin--非主键,更新字段值set @FieldsList+=(@ColumnName+'=Source.'+@ColumnName+',');endelsebegin--主键,作为要更新条件set @Condition+=@TableName+'.'+@ColumnName+'=Source.'+@ColumnName+' And ';end--XML查询set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素读取(包含空值情况)if @MaxLength is not null and @MaxLength<>-1beginset @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';endelse if @MaxLength=-1 and @DataType<>'xml'beginset @QueryStatement+='(max)';endset @QueryStatement+=(''') as '+@ColumnName+',');endfetch ColumnNameList Into @ColumnName,@DataType,@MaxLengthendclose ColumnNameList;Deallocate ColumnNameList;          --去掉@FieldsList结尾的','set @StrLength=LEN(@FieldsList);set @FieldsList=SUBSTRING(@FieldsList,1,@StrLength-1);        --去掉@QueryStatement结尾的','set @StrLength=LEN(@QueryStatement);set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);--去掉@Condition结尾的‘and'set @StrLength=LEN(rtrim(@Condition));set @Condition=SUBSTRING(rtrim(@Condition),1,@StrLength-3);           set @Sql=N'USE DBTo ; update '+@TableName+' set '+@FieldsList+' from (select '+@QueryStatement+' from @DataSource.nodes(''/row'') as T(c)) Source where '+@Condition;end  else if @DMLType='D' --删除数据begin--更新语句where后的条件表达式declare @LinkField nvarchar(1000);set @LinkField='  ';set @PKs=CONVERT(xml,dbo.uf_SplistString(@PrimaryKeyField,','));open ColumnNameListfetch ColumnNameList into @ColumnName,@DataType,@MaxLength;while @@FETCH_STATUS=0beginif @DataSource.exist('row/*[local-name()=sql:variable("@ColumnName")]')=1beginset @IsPK=null;--初始化--当前字段是否为主键select @IsPK=Fs.F from (select T.c.value('.[text()]','nvarchar(128)') as F from @PKs.nodes('/Fields/Field') as T(c))Fs where Fs.F=@ColumnName--主键if @IsPK is not null and @IsPK<>''begin--主键删除条件set @LinkField+='Target.'+@ColumnName+'=Source.'+@ColumnName+' And ';--XML 查询set @QueryStatement+=('T.c.value(''(./'+@ColumnName+'[not(@xsi:nil)])[1]'','''+@DataType);--元素读取(包含空值情况)if(@MaxLength is not null and @MaxLength<>-1)beginset @QueryStatement+='('+CONVERT(nvarchar,@MaxLength)+')';endelse if @MaxLength=-1 and @DataType<>'xml'beginset @QueryStatement+='(max)';endset @QueryStatement+=(''') as '+@ColumnName+',');end endfetch ColumnNameList into @ColumnName,@DataType,@MaxLengthendclose ColumnNameList;deallocate ColumnNameList;         --去除@QueryStateMent结尾的','set @StrLength=LEN(@QueryStatement);set @QueryStatement=SUBSTRING(@QueryStatement,1,@StrLength-1);          --去除@LinkField 结尾的’Add‘set @StrLength=LEN(rtrim(@LinkField));set @LinkField=SUBSTRING(rtrim(@LinkField),1,@StrLength-3);         set @Sql=N'Delete from '+@TableName+' from '+@TableName+' as Target inner join (select '+@QueryStatement+ ' from @DataSource.nodes(''/row'') as T(c))Source on '+@LinkField;end    Return @Sql--'hello'
end
go

 

3.2.14 解析并处理从队列中读取的消息

这里主要用来读取队列中的消息,并将消息进行处理,最终处理成一定的格式,并调用3.2.13中的存储过程,将数据同步到数据库中。

--将数据同步到数据表中
create procedure UP_SyncDataToTable
as
begin
set nocount on
--会话变量声明
declare @ConversationHandle uniqueidentifier;--会话句柄
declare @Msg_Body nvarchar(max);
declare @Msg_Type_Name sysname;
declare @ErrorNumber int ;
--变量赋值
while(1=1)beginbegin transaction--从队列中接收消息waitfor(receive top(1)@Msg_Type_Name=message_type_name,@ConversationHandle=[conversation_handle],@Msg_Body=message_body
--         from dbo.[DBTo_DataSyncQueue]from dbo.[DBFrom_DataSyncQueue]),timeout 500--如果接收到消息-处理,否则跳过if @@ROWCOUNT<=0beginbreak;end           if @Msg_Type_Name='http://oa.founder.com/Data/Sync'begin--声明变量declare @DMLType char(1);declare @PrimaryKeyField nvarchar(128),@TableName nvarchar(128),@Sql nvarchar(4000);declare @DataSource xml--受影响的行数declare @EffectRowCount int;declare @ErrMsg xml;begin try--变量赋值set @DataSource=convert(xml,@Msg_Body);--数据源set @PrimaryKeyField=@DataSource.value('(/PrimaryKeyField)[1][text()]','nvarchar(128)');--主键列表set @TableName=@DataSource.value('(/Table)[1][text()]','nvarchar(128)');--操作数据表set @DMLType=@DataSource.value('/DMLType[1][text()]','char(1)');--操作类型set @Sql=dbo.UF_XMLDataSourceToSQL(@DataSource,@TableName,@PrimaryKeyField,@DMLType);exec sp_executesql @Sql,N'@DataSource XML',@DataSource;end trybegin catchdeclare @DBName nvarchar(128)select @DBName=Name from master..SysDataBases where dbid=(select dbid from master..sysprocesses where spid=@@SPID)set @ErrorNumber=ERROR_NUMBER();set @ErrMsg=(select ERROR_NUMBER() as ErrNumber,ERROR_SEVERITY() as ErrSeverity,ERROR_STATE() as ErrState,ERROR_PROCEDURE() as ErrProcedure,ERROR_LINE() as ErrLine,ERROR_MESSAGE() as ErrMessage,@PrimaryKeyField as PrimaryKeyField,@TableName as TableName,@DMLType as DMLType,@Msg_Body as MessageContent,@DBName as DBNamefor XML raw);--GOTO 错误处理标签goto Err_Handle;   end catch--结束会话End Conversation @ConversationHandleif @ErrorNumber is not nullbegin        --错误处理区域Err_Handle:if @ErrMsg is not nullbegindeclare @test nvarchar(128);--发送失败消息send on conversation @ConversationHandlemessage type [http://oa.founder.com/Data/Sync/Error](@ErrMsg)end--结束会话end conversation @ConversationHandle--break;--回滚--不可回滚,否则将无法发送失败消息--GoTO  Err_Lab;end end commit transactionend
end
go

 

3.2.15 对目标数据库的消息队列进行内部激活

这里主要是用来激活目标数据库的消息队列,主要用来实现数据的同步以及同步出错的错误信息的反馈。

--对Service Broker队列使用内部激活,并指定将要调用的存储过程
use DBTo
go
--alter Queue dbo.[DBTo_DataSyncQueue] with activation
alter Queue dbo.[DBFrom_DataSyncQueue] with activation
(status=on,max_queue_readers=1,Procedure_name=UP_SyncDataToTable,Execute as self
)
Go

 

    完成以上这些步骤以后,就可以实现同一数据库实例上两个不同的数据库之间的数据同步。即如果DBFrom数据库中的Org_Users中的某一条信息发生变化,会马上的自动同步到DBTo数据库中的Org_Users 表。如果是想要实现不同的数据库实例间的数据库的表的同步,则可以参考以下链接:

http://www.cnblogs.com/downmoon/archive/2011/05/05/2037830.html

在创建启用传输安全、对话安全,创建路由、远程服务绑定等额外的操作之后,剩下的操作跟在同一数据库实例中的操作是一样的。

       此外,本文还参考了如下的链接:

http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html

       希望可以给大家一些启发和帮助。具体的源码有兴趣的朋友可以留下邮箱。

 


http://chatgpt.dhexx.cn/article/k4vc0I5W.shtml

相关文章

数据同步技术

本次旨在分享数据同步技术的相关知识点&#xff0c;包括数据同步概述、数据同步工具、数据库、数据同步到大数据平台。 首先来介绍一下数据同步的概念&#xff1a; 数据同步是为保持数据源与目的地数据一致性而进行的数据传输、处理的过程。 数据同步的场景&#xff1a; 1、主…

几种常见的数据同步方式

数据仓库的特性之一是集成&#xff0c;即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层&#xff0c;一般情况下&#xff0c;这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中)&#xff0c;将数据采集并导入到数仓中(通常是Hive…

内网穿透frpc ,frps的使用

情况是这样的&#xff0c;公司内网中一个设备接了路由器下发的地址&#xff0c;内网地址是192.168.1.100&#xff0c;可以访问我的台式机&#xff0c;但我的台式机访问192.168.1.100是无法连通的 这种情况下&#xff0c;在我机器上运行frps.exe&#xff0c;frps.ini如下 [com…

Frp内网穿透——frps服务端部署

由于现在IPv4地址的短缺&#xff0c;在国内不可能每个设备都会分配到一个公网IP&#xff0c;因此从公网中访问自己的私有设备向来是一件难事儿。本次带大家了解一下frp内网穿透的服务端教学&#xff0c;让你也能够部署一个内网穿透服务。 frp简介 通俗的说&#xff0c;frp是一…

记一次使用frpc/frps进行内网穿透

1. 前提条件&#xff1a; 有一个公网ip&#xff0c;这里用x.x.x.x代替 2. 配置 【服务器端】 S_NUMBER是一个端口号 #服务端口 bind_port S_NUMBER #监听地址 bind_addr 0.0.0.0 #认证token token xxxx【客户端】(也就是需要被内网穿透的服务器) C_NUMBER是一个端口号 …

内网穿透神器Frps一键安装脚本及设置教程

frps 是一个高性能的反向代理应用&#xff0c;可以帮助您轻松地进行内网穿透&#xff0c;对外网提供服务&#xff0c;支持 tcp, http, https 等协议类型&#xff0c;并且 web 服务支持根据域名进行路由转发。 *因为frps是go语言写的&#xff0c;所以在路由器上使用的时候&#…

frpc和frps 内网穿透越狱插件

内网穿透、frp、frpc、frps https://zhaoboy9692.github.io/repo 越狱源 https://zhaoboy9692.github.io/repo 苦于在ios越狱下没有frp穿透使用 特地开发了的越狱插件 基于最新frp0.48编译 ios14.6测试没问题 有问题及时反馈

使用frps和frpc实现内网穿透

内网穿透的作用包括跨网段访问一个局域网中的一台主机。 如上图&#xff0c;假设我们想要通过主机A访问主机C&#xff0c;但是主机A和主机C绑定的都是私有ip地址&#xff0c;所以它们之间是无法直接进行通信的。要想使得A和C能够进行通信&#xff0c;就需要用到内网穿透的技术。…

frp服务端(frps) 安装及使用

FRP官方文档 https://gofrp.org/docs/ 服务端安装 环境 ubuntu 22.04 下载 Github 的 Release 中下载到最新版本的客户端和服务端二进制文件 可以指定你的目录&#xff0c;这里用 /usr/local/frp cd /usr/local/frp wget https://github.com/fatedier/frp/releases/dow…

CentOS Frp内网穿透:Frps+Nginx反向代理

目录 服务器使用配置 一、Nginx安装 二、Frps安装 三、frpc安装 服务器使用配置 CentOS 7.6 CPU: 2核 内存: 4GB 一、Nginx安装 参考《Centos配置Nginxtomcat》&#xff0c;这里就不做过多阐述 二、Frps安装 这里使用的是阿里源 #下载脚本 wget https://code.aliyun.com…

nginx反向代理frps frpc穿透

frps 和 nginx 在同一台机器&#xff0c;假设ip192.168.166.17 1. frps服务器端配置 测试时&#xff0c;frps服务器跟nginx在同一台机器(192.168.166.17)&#xff0c;理论上可以不在同一台机器&#xff0c;nginx可以代理http请求&#xff0c;发给frps服务端。 frps.ini # fr…

利用空闲服务器搭建frps服务端-实现穿透代理

利用frps代理Tcp或者udp或其它类型的连接 1、什么是frps/frpc frps是代理的服务端、frpc是代理的客户端&#xff0c;使用方数据传输到服务端&#xff0c;服务端再将数据传输到提供方&#xff0c;从而达到相互访问的目的。 2、什么是穿透 穿透就是客户端A和客户端B都没有公网…

利用frps进行内网穿透

这里使用的是传统穿透方法&#xff0c;需要一个有公网ip的中转节点去告知 看最下面&#xff0c;用最新版的frps 1、注意 服务器和客户机之间的数据传输全部经过中转服务器&#xff0c;传输速度将受制于中转服务器的上下行带宽。 2、穿透原理 其实就是客户端A绑定端口发送数…

使用frps建立内网穿透从而实现外界连接内网电脑的全教程

1. 说明 我有台服务器&#xff0c;但它在内网里&#xff0c;我需要通过ssh方式访问它&#xff0c;目前可以采用&#xff1a;向日葵等商业软件&#xff0c;RustDesk等开源软件。或者&#xff0c;《自建内网穿透服务器》。 本教程把实现上述功能的所有步骤罗列出来&#xff0c;以…

frp 内网穿透服务器搭建frps服务端和frpc客户端

1 工具 一台具有公网ip的服务器 2 下载frp frp下载地址 打开上面的frp下载地址 公网服务器上 打开下载文件 frps是服务端&#xff0c;在公网服务器上部署 frpc是客户端&#xff0c;在需要内网穿透的电脑上部署 1. frps配置 首先我这用的是win公网服务器 &#xff08;linux…

内网穿透配置(FRP)

目录 0、内网穿透的一般场景 1、内网穿透配置 a、frp软件下载 b、frp 的配置 3、通过 frp 实现远程连接 4、设置 frpc / frps 开机启动的方法 5、设置frp安全连接的方法 0、内网穿透的一般场景 放假回家怎么远程连接学校实验室的服务器&#xff1f; 先分析一波&#x…

FRP入门篇

目录 一、前言 1、概述 2、原理 3、支持功能 4、适用场景 二、环境准备 三、使用 1、安装包下载 2、服务端部署 2.1、上传安装包 2.3、启动服务端 3、客户端部署 3.1、代理服务准备 3.2、上传安装包 3.3、客户端配置 3.4、启动客户端 4、功能验证 一、前言 1、…

frps内网穿透

1 原理讲解 frp工作原理 服务端运行&#xff0c;监听一个主端口&#xff0c;等待客户端的连接&#xff1b; 客户端连接到服务端的主端口&#xff0c;同时告诉服务端要监听的端口和转发类型&#xff1b;服务端fork新的进程监听客户端指定的端口&#xff1b; 外网用户连接到客户…

2021大数据架构、高性能、数据治理面试题

2021大数据架构、高性能、数据治理面试题 需要自取&#xff1a;https://url80.ctfile.com/f/32319880-516640957-aba608 &#xff08;访问密码&#xff1a;1000&#xff09;

Java架构师和大数据架构师的区别是什么?哪个更有发展前景?

[Java]是我们耳熟能详的编程语言&#xff0c;[大数据]更是当今科技的明星技术&#xff0c;那Java和Java大数据架构学习的内容是一样的吗&#xff1f;两者有什么区别呢&#xff1f;今天千锋广州Java的老师就从Java和大数据架构的以下方面谈谈两者的区别。 [ 01 [架构师](需要考…