分享一个自己写的用python比对数据库表数据的脚本

    xiaoxiao2022-06-22  18

    最近在做一个数据库异构复制的项目,客户表示需要一个数据比对的工具,我就自己写了一个异构数据库的比对python脚本.这个比对脚本只能比对数量,不能比对具体的记录.使用的sql语句也是最基础的select count(*) 这种,没有开并发所以对大表可能比对时间稍长.

    基本原理是将需要比对的数据写到一张表里,先读取那个表里的数据,取出需要比对的表.然后创建多进程,同时在原端和目标端count.然后将count的结果写到一个excel文件中.

    其中最关键的就是那张表.只要将那张表里的数据搞对了,基本就不会有什么问题.

    目前支持的数据库有oracle,mysql,postgresql,sqlserver.程序分为三个部分

    1.数据库配置文件

    首先需要在python代码的相同目录下写一个名为check.ini的配置文件.下面一个配置文件例子:

    [DATA] #配置原端数据库,下面的ORACLE需要与后面的项匹配 source=ORACLE #配置目标端数据库,下面的POSTGRESQL需要与后面的项匹配 target=POSTGRESQL #配置比对表的数据库,需要与下面的配置项匹配 check_node=ORACLE #配置比对表数据库的用户,如果是oracle是用户,如果是mysql,pg,mssql则是数据库名 check_owner=suq #配置比对表的表名,区分大小写 check_table=check_table #配置mysql的连接串.注意MYSQL必须大写而且必须是以MYSQL开头,例如想比对多个mysql可以写MYSQL1,MYSQL2等 #下面的几个配置同样需要以相应例子开头,因为程序就是以项的开头来确认是哪种数据库的 [MYSQL] db_host=192.168.56.25 db_port=3306 db_user=root db_pwd=root db_dbname=major [ORACLE] db_host=192.168.56.30 db_port=1521 db_user=dsg db_pwd=dsg db_sid=bre1 [POSTGRESQL] db_host=192.168.56.50 db_port=5432 db_user=postgres db_pwd=postgres db_dbname=msgdb [MSSQL] db_host=192.168.56.101 db_port=1433 db_user=sa db_pwd=sa db_dbname=master

    2.创建一个比对表.

    例如我上面的例子放在suq用户下的check_table中

    具体的表结构如下:

    SQL> desc check_table Name Null? Type ----------------------------------------- -------- ---------------------------- SOWNER VARCHAR2(30) SNAME VARCHAR2(30) TOWNER VARCHAR2(30) TNAME VARCHAR2(30) 分别表示原端的用户名,表名,目标端用户名表名,如果不是用户的那么就是数据库名.

    看一下表内我的测试数据:

    SQL> select * from check_table; SOWNER SNAME TOWNER TNAME ------------------------------ ------------------------------ ------------------------------ ------------------------------ suq "t1" suq t1 suq "t2" suq t2 suq "t3" suq t3 suq "t4" suq t4 这里的数据要特别注意,必须写对否则可能运行会报错.需要注意的一般原因是不同的数据库对大小写敏感不同.因此建议在写好这些数据后,手动到数据库查一下,例如

    select count(*) from suq."t1"

    看这样的sql对不对.

    3.就是主程序

    需要注意的是我连接各种数据库分别使用的如下python模块,写excel使用XlsxWriter模块:

    C:\Users\think>pip list cx-Oracle (5.2.1) MySQL-python (1.2.4) psycopg2 (2.6.2) pymssql (2.1.3) XlsxWriter (0.8.5)下面是具体的python代码:

    #coding:utf-8 import cx_Oracle as ora import MySQLdb as my import psycopg2 as post import pymssql as ms import ConfigParser as conf import multiprocessing as mul import xlsxwriter import time def connect(cfg,db): if db[0:5] == 'MYSQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = my.connect(host=db_host,port=int(db_port),user=db_user,passwd=db_pwd,db=db_dbname) return conn elif db[0:6] == 'ORACLE': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_sid=cfg.get(db,'db_sid') conn = ora.connect(db_user,db_pwd,db_host+':'+db_port+'/'+db_sid) return conn elif db[0:10] == 'POSTGRESQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = post.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname) return conn elif db[0:5] == 'MSSQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = ms.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname) return conn def check(cfg,db,check_owner,check_table): conn=connect(cfg,db) cursor=conn.cursor() sql='select * from '+check_owner+'.'+check_table cursor.execute(sql) table_list=[] alldata=cursor.fetchall() for i in alldata: table_list.append(i) #print table_list return table_list def getcount(cfg,db,sql,q): conn = connect(cfg,db) cursor=conn.cursor() try: cursor.execute(sql) countval = cursor.fetchall()[0][0] q.put(countval) except Exception,e: countval="Error : "+str(e) q.put(countval) def isdigit(num): try: int(num) return True except: return False def comp(cfg,source,target,tablelist): ###excel start xlsxname='check_'+str(time.strftime("%Y%m%d%H%M", time.localtime()))+'.xlsx' workbook=xlsxwriter.Workbook(xlsxname) top=workbook.add_format({'border':6,'align':'center','bg_color':'cccccc','font_size':13,'bold':True}) format_data_normal=workbook.add_format({'align':'center','font_size':13}) format_data_warn=workbook.add_format({'align':'center','font_size':13,'bg_color':'ff0000'}) format_data_err=workbook.add_format({'align':'center','font_size':13,'bg_color':'ffff00'}) worksheet = workbook.add_worksheet('sheet1') worksheet.set_column('A:A',12) worksheet.set_column('B:B',40) worksheet.set_column('C:C',12) worksheet.set_column('D:D',12) worksheet.set_column('E:E',40) worksheet.set_column('F:F',12) worksheet.set_column('G:G',12) title=[u'源端用户',u'源端表名',u'源端数据量',u'目标端用户',u'目标端表名',u'目标端数据量',u'差异条数'] worksheet.write_row('A1',title,top) ###excel stop length=len(tablelist) for i in range(length): check_result=[] sowner=tablelist[i][0] sname=tablelist[i][1] towner=tablelist[i][2] tname=tablelist[i][3] sql_s='select count(*) from '+sowner+'.'+sname sql_t='select count(*) from '+towner+'.'+tname #sql_t='select count(*) from '+towner+'.'+'\"'+tname+'\"' q1=mul.Queue() q2=mul.Queue() p1=mul.Process(target = getcount,args = (cfg,source,sql_s,q1)) p2=mul.Process(target = getcount,args = (cfg,target,sql_t,q2)) p1.start() p2.start() count_s=q1.get() count_t=q2.get() p1.join p2.join check_result.append(sowner) check_result.append(sname) check_result.append(count_s) check_result.append(towner) check_result.append(tname) check_result.append(count_t) print '%s %s %s %s %s %s' %(sowner,sname,count_s,towner,tname,count_t) #print check_result if isdigit(count_s) and isdigit(count_t): check_result.append(count_s-count_t) if count_s == count_t: worksheet.write_row('A'+str(2+i),check_result,format_data_normal) else: worksheet.write_row('A'+str(2+i),check_result,format_data_warn) else: check_result.append("Error") worksheet.write_row('A'+str(2+i),check_result,format_data_err) workbook.close() if __name__ == "__main__": print "AT time {0}".format(time.ctime()) print "Begin compare ..." cfg=conf.ConfigParser() cfg.read('check.ini') source=cfg.get('DATA','source') target=cfg.get('DATA','target') check_node=cfg.get('DATA','check_node') check_owner=cfg.get('DATA','check_owner') check_table=cfg.get('DATA','check_table') tablelist=check(cfg,check_node,check_owner,check_table) comp(cfg,source,target,tablelist) print "AT time {0}".format(time.ctime()) print "compare complete!" raw_input("Press <ENTER>")

    执行这段代码后就会读取check.ini文件,获取需要比对的原端和目标端数据库的信息,以及比对表的信息,首先将比对的表获取写到一个数组中.然后使用for循环对表进行count,再写到excel中.excel名为check_XXXX.xlsx.xxx为时间.如果在执行sql的时候报错,那么excel中以黄色标出,如果比对原端和目标端数据不一致以红色标出.

    下面是我比对oracle和pg中的一个结果:

    转载请注明原文地址: https://ju.6miu.com/read-1123065.html

    最新回复(0)