SQLServer 跟踪语句转为Jmeter脚本

更新时间:2020-05-14 16:14:44点击次数:235次
待测系统为CS架构,后台为SQLserver数据库,目前针对CS架构没有有效的压力测试工具,只能通过抓取业务SQL,压数据库的方式进行测试。因为涉及到的SQL语句比较多,所以通过SQL监控工具,获取到SQL语句,然后转为Jmeter脚本(xml)的形式。脚本分为量大部分:解析抓取到的SQL、将解析的SQL转为Jmeter脚本。多个跟踪文件,在同一个文件夹中,通过轮询方式,转为对应的线程组。

解析跟踪文件

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019-12-24 13:29
# @Author  : 无名氏
# @File    : resolvsqlxml.py
import warnings
warnings.filterwarnings("ignore")  #忽略warning信息
 
from xml.dom.minidom import parse
from datetime import datetime
import xml.dom.minidom
# import string
import os,time,re
import codecs
import chardet
import pandas as pd
 
 
class ResolveXmlfile():
    def __init__(self,xmlfile,directory):
        self.xmlfile =  xmlfile
        self.xml_directory = directory
        self.df = pd.DataFrame(columns=('sys_name','db', 'time', 'sqltext', 'timestamp'))  #创建DF,定义列名
        self.exclude_sql_list = ['SP:Starting', 'Trace Stop', 'Trace Start', 'Trace Pause']  #筛选掉指定类型的语句
 
    def get_xml_file(self):
        '''
        判断是否有传递文件,True 只处理传递的文件,false 处理文件夹下的所有文件
        '''
        if self.xmlfile:
            print(self.xmlfile)
            file_path = os.path.join(self.xml_directory, self.xmlfile)
            (file_name, extension) = os.path.splitext(self.xmlfile)
            print(file_path, file_name)
            self.reslove_xmlfile(file_path, file_name)
        else:
            for dirpath, dirnames, files in os.walk(self.xml_directory):
                for file in files:
                    file_path = os.path.join(dirpath, file)
                    (file_name, extension) = os.path.splitext(file)
                    # print(file_path,file_name)
                    self.reslove_xmlfile(file_path,file_name)
        # sqldf = self.df.sort_values(by="timestamp", axis=0, ascending=True, inplace=False)  # 按照时间戳排序
        return self.df
 
    def get_xmlnode(self,node, name):  # 获取xml节点
        return node.getElementsByTagName(name) if node else []
 
    def get_nodevalue(self,node, index=0):  #获取节点Value---01  <Column id="41" name="LoginSid"></Column>
        return node.childNodes[index].nodeValue if node else ''
 
    def get_attrvalue(self,node, attrname):  #获取节点属性值---LoginSid  <Column id="41" name="LoginSid">01</Column>
        return node.getAttribute(attrname) if node else ''
 
    def reslove_xmlfile(self,file_path,file_name):
        try:
            dom_tree = xml.dom.minidom.parse(file_path)
            dom = dom_tree.documentElement
            event_node_list = self.get_xmlnode(dom, 'Event')
            for event_node in event_node_list:
                # print('==========')
                event_name = self.get_attrvalue(event_node,'name')
                if event_name and event_name not in self.exclude_sql_list:
                    column_node_list = self.get_xmlnode(event_node, 'Column')
                    db_value, app_value, sql_value, startTime_value = '', '', '', '' #给需要抓取的字段赋初始值空值
                    column_dict = {}
                    for column_node in column_node_list:
                        column_name = self.get_attrvalue(column_node, 'name')
                        if column_name in ('DatabaseName','ApplicationName','TextData','StartTime'):
                            column_dict[column_name] = self.get_nodevalue(column_node)
                    if "TextData" in column_dict.keys():
                        # print('column_dict',column_dict.keys())
                        db_value = column_dict.get("DatabaseName", '')
                        sql_value = column_dict.get("TextData", '')
                        app_value = column_dict.get("ApplicationName", '')
                        startTime_value = column_dict.get("StartTime", '')
                        if startTime_value:
                            timestamp_value = self.timestamp_conversion(startTime_value)
                        else:
                            timestamp_value=''
                    # print('sql:',sql_value)
 
                    #筛选异常数据  'sys_name','db', 'time', 'sqltext', 'timestamp'
                    if (startTime_value != '') and (db_value not in ('msdb', 'master')) and ('SQLAgent' not in app_value) and (sql_value != 'exec sp_reset_connection'):
                        self.df = self.df.append(pd.DataFrame({
                                                                'sys_name': [file_name],
                                                                'sqltext': [sql_value],
                                                                 'db': [db_value],
                                                                 'time': [startTime_value],
                                                                 'timestamp': [timestamp_value]
                                                                }))
 
 
        except Exception as e:
            print(e)
 
 
 
 
 
    def timestamp_conversion(self,time_value):
        '''
        time.time() 生成当前的时间戳,格式为10位整数的浮点数。
        time.strftime()根据时间元组生成时间格式化字符串。
        time.strptime()根据时间格式化字符串生成时间元组。time.strptime()与time.strftime()为互操作。
        timearray = time.strptime(time1, "%Y-%m-%d %H:%M:%S.%f")
        time.localtime()根据时间戳生成当前时区的时间元组。
        time.mktime()根据时间元组生成时间戳。
        time.mktime(timearray)
        '''
        #抓取的xml数据 时间格式为2020-03-27T11:21:05.163+08:00 去除+8:00和T
        time_value = time_value.split('+')[0].replace('T',' ')
        if '.' in time_value: #判断时间是否有毫秒数据
            time_array = datetime.strptime(time_value, "%Y-%m-%d %H:%M:%S.%f")   #
            millisecond_stamp = int(time_array.microsecond/1000)
            timestamp= int(time.mktime(time_array.timetuple())*1000)
            timestamp = millisecond_stamp+timestamp
        else:
            time_array = datetime.strptime(time_value, "%Y-%m-%d %H:%M:%S")
            timestamp = int(time.mktime(time_array.timetuple())*1000)
        return timestamp
 
    def PrintSQl(self,sqldf):
        j = 1
        print(sqldf.iterrows())
        for  index,row in sqldf.iterrows():  #需要优化
            print('\n\n>>>>>>>>>>>>>>>', j, '<<<<<<<<<<<<<<<<<<<')
            print('sys_name :', row['sys_name'])
            # print('STARTIME :', row['time'])
            # print('DBNAME :', row['dbname'])
            # print('-------------', j, '-------------')
            # print('SQLTEXT :', row['sqltext'])
            j = j + 1
        print('数据库跟踪xml文件解析完成')
    def group_df(self,sqldf):
        # print(sqldf.groupby('sys_name').groups.keys())
        # for file in sqldf.groupby('sys_name').groups.keys():
        #     print(file)
        #     sqldf.groupby('sys_name')
        for groupname,grouplist in sqldf.groupby('sys_name'):
            print(groupname)
            print(type(grouplist))
            for index, row in grouplist.iterrows():
                print(row['sqltext'])
xmlfile = ['']
filepath = 'XMlFile/'
exclude_sql_list = ['SP:Starting', 'Trace Stop', 'Trace Start', 'Trace Pause']
if __name__ == "__main__":
    run = ResolveXmlfile(xmlfile,filepath,exclude_sql_list)
    # run.handlxml()
    sql_pd = run.get_xml_file()
    run.group_df(sql_pd)
 
    # run.test()
    # run.PrintSQl(sql_pd)
    # run.timestamp_conversion()
 
 
 
 
 
 
转Jmeter脚本

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019-12-20 20:52
# @Author  : 无名氏
# @File    : WriteJmxFile.py
 
import xml.dom.minidom as Dom
from xml.dom import minidom
import random
import time,json
from Fun.GetConf import *
import pandas as pd
from resolvsqlxml import *
import chardet
import re
 
# from Fun.logger import *
 
class SqlToJmeterml():
    def __init__(self,jmxfile,jmxpath,confpath,logger):
        self.dom = minidom.Document()
        self.hashtree_lev1 = self.dom.createElement('hashTree')  # 创建节点hashTree ,父节点jmeternode
        self.hashtree_lev2 = self.dom.createElement('hashTree')
        self.hashTree_lev3 = self.dom.createElement('hashTree')
        self.none_value = self.dom.createTextNode('')
        self.true_value = self.dom.createTextNode('true')
        self.false_value = self.dom.createTextNode('false')
        self.lastnode = ''
        self.jmxfile = jmxfile
        self.path = confpath
        self.logger= logger
        self.jmxpath = jmxpath
 
    def get_node_confinfo(self,node,elementProp):
        attr_list = json.loads(GetConfigure(self.path, node, 'Itemlist'))
        node_list = json.loads(GetConfigure(self.path, node, 'Nodelist'))
        if elementProp:
            elementProp_attr_list = json.loads(GetConfigure(self.path, node, 'elementPropItemlist'))
            elementProp_node_list = json.loads(GetConfigure(self.path, node, 'elementPropNodelist'))
            return attr_list, node_list, elementProp_attr_list, elementProp_node_list
        else:
            return attr_list, node_list
 
 
    def jmeterTestPlan(self):
        # 添加JmeterTestPlan节点--根节点
        jmeterTestPlan_attr_list = [{'version': '1.2'}, {'properties': '5.0'}, {'jmeter': '5.2'}]
        jmeterTestPlan_Node = self.AddNode('jmeterTestPlan', jmeterTestPlan_attr_list, None)
        self.dom.appendChild(jmeterTestPlan_Node)
        jmeterTestPlan_Node.appendChild(self.hashtree_lev1)
 
 
    def Testplan(self):
        try:
            TestPlan_attr,TestPlan_node,TestPlan_elementProp_attr,TestPlan_elementProp_node = self.get_node_confinfo('TestPlan','elementProp')
            # 创建节点,
            TestPlan_elementProp = self.AddNode('elementProp', TestPlan_elementProp_attr, TestPlan_elementProp_node)
            TestPlan = self.AddNode('TestPlan', TestPlan_attr, TestPlan_node)
            # 将节点添加到父节点中
            TestPlan.appendChild(TestPlan_elementProp)
            self.hashtree_lev1.appendChild(TestPlan)
            self.hashtree_lev1.appendChild(self.hashtree_lev2)
            print('TestPlan节点创建完毕')
        except Exception as e:
            print(e)
 
    def JDBCDataSource(self):
        try:
            #获取需要配置的数据库
            db_sections = GetConfigure(self.path, 'database','')
            for db_section in db_sections:
                db = GetConfigure(self.path, 'database',db_section)
                url = GetConfigure(self.path, db, 'url')
                dbname = GetConfigure(self.path, db, 'name')
                username = GetConfigure(self.path, db, 'username')
                password = GetConfigure(self.path, db, 'password')
                dburl = 'jdbc:sqlserver://' + url + ';databaseName=' + dbname + ';'
                jdbc_testname = dict(testname=dbname)  #定义数据库连接名称
                jdbc_attr,jdbc_node = self.get_node_confinfo('JDBC', '')
                jdbc_attr.append(jdbc_testname)
                for node in jdbc_node:
                    (key, value), = node.items()
                    if isinstance(value, list):
                        if value[0] == 'dataSource':
                            value.append(dbname)
                        elif value[0] == 'dbUrl':
                            value.append(dburl)
                        elif value[0] == 'password':
                            value.append(password)
                        elif value[0] == 'username':
                            value.append(username)
                #创建JDBC节点
                jdbcdatasource = self.AddNode('JDBCDataSource', jdbc_attr, jdbc_node)
                self.hashtree_lev2.appendChild(jdbcdatasource)
                #添加每个节点之间的<hashTree/>
                jdbc_hashTree = self.dom.createElement('hashTree')
                self.hashtree_lev2.appendChild(jdbc_hashTree)
 
            self.logger.info('JDBCDataSource节点创建完毕')
            # print('JDBCDataSource节点创建完毕')
        except Exception as e:
            print('JDBCDataSource节点处理异常', e)
 
 
 
    def ThreadGroup(self,file):
        try:
            ThreadGroup_attr, ThreadGroup_node, ThreadGroup_elementProp_attr, ThreadGroup_elementProp_node = self.get_node_confinfo('ThreadGroup', 'elementProp')
            ThreadGroupname = dict(testname=file)  #定义线程组名称
            ThreadGroup_attr.append(ThreadGroupname)
            #创建ThreadGroup_elementProp节点
            threadGroup_elementprop = self.AddNode('elementProp', ThreadGroup_elementProp_attr, ThreadGroup_elementProp_node)
            #创建ThreadGroup节点
            threadgroup = self.AddNode('ThreadGroup', ThreadGroup_attr, ThreadGroup_node)
            #将threadGroup_elementprop作为子节点加入threadgroup节点
            threadgroup.appendChild(threadGroup_elementprop)
            #将threadgroup节点加入hashtree_lev2节点
            self.hashtree_lev2.appendChild(threadgroup)
            #将更新后的hashtree_lev2加入一级节点hashtree_lev1
            self.hashtree_lev1.appendChild(self.hashtree_lev2)
            print(file,'ThreadGroup节点创建完毕')
        except Exception as e:
            print(file,'ThreadGroup节点创建失败', e)
 
 
 
 
 
    def sys_testgroup(self,sqldf):
 
        for sysname, syspd in sqldf.groupby('sys_name'):
            syspd = syspd.sort_values(by="timestamp", axis=0, ascending=True, inplace=False)  # 按照时间戳排序
            hashTree_lev3 = self.dom.createElement('hashTree')
            print('\n开始添加',sysname,'业务请求')
            self.ThreadGroup(sysname)
            self.JDBCSampler(sysname,syspd,hashTree_lev3)
 
 
    def JDBCSampler(self,sysname,syspd,hashTree_lev3):
        j = 1
        # item = {}
        # print(sqldf.iterrows())
        for index, row in syspd.iterrows():  # 需要优化
            # Itemlist = json.loads(GetConfigure(path, 'JDBCSampler', 'Itemlist'))
            # Nodelist = json.loads(GetConfigure(path, 'JDBCSampler', 'Nodelist'))
            jdbcsampler_attr, jdbcsampler_node = self.get_node_confinfo('JDBCSampler', '')
 
            if ('insert' in row['sqltext']) or ('INSERT' in row['sqltext']):
                # item['testname'] = row['sys_name'] + '_INSERT_' + str(j)  # 请求名称
                jdbcsampler_testname = dict(testname = row['sys_name'] + '_INSERT_' + str(j)) #请求名称
 
            elif ('update' in row['sqltext']) or ('UPDATE' in row['sqltext']):
                # item['testname'] = row['sys_name'] + '_UPDATE_' + str(j)  # 请求名称
                jdbcsampler_testname = dict(testname = row['sys_name'] + '_UPDATE_' + str(j))  # 请求名称
            elif (any(name in row['sqltext'] for name in
                      ['select config', 'YY_CONFIG', 'select def_value', 'SELECT CONFIG'])):
                # item['testname'] = row['sys_name'] + '_CONFIG_' + str(j)  # 请求名称
                jdbcsampler_testname = dict(testname = row['sys_name'] + '_CONFIG_' + str(j))  # 请求名称
            else:
                # item['testname'] = row['sys_name'] + '_' + str(j)  # 请求名称
                jdbcsampler_testname = dict(testname = row['sys_name'] + '_' + str(j))
 
            jdbcsampler_attr.append(jdbcsampler_testname)
            for node in jdbcsampler_node:
                (key, value), = node.items()
                if isinstance(value, list):
                    if value[0] == 'dataSource':
                        value.append(row['db'])
                    elif value[0] == 'query':
                        sqltext = row['sqltext']
                        value.append(sqltext)
            jdbcsampler = self.AddNode('JDBCSampler', jdbcsampler_attr, jdbcsampler_node)
            hashTree_lev3.appendChild(jdbcsampler)
            jdbcsampler_hashTree = self.dom.createElement('hashTree')
            hashTree_lev3.appendChild(jdbcsampler_hashTree)  # 未添加执行计划
            self.hashtree_lev2.appendChild(hashTree_lev3)
            j = j + 1
        print(sysname,'SQL业务请求添加完成,共计添加:', j - 1)
 
 
 
 
 
    def add_jdbcsample(self,path,sqldf):
        print(sqldf.iterrows())
        for index, row in sqldf.iterrows():
            print(index,row['sys_name'])
 
    def AddJDBCSampler(self,path,sqldf,sysname):
        # try:
        j = 1
        item={}
        print(sqldf.iterrows())
        print('====')
        for index,row in sqldf.iterrows():  #需要优化
            Itemlist = json.loads(GetConfigure(path, 'JDBCSampler', 'Itemlist'))
            Nodelist = json.loads(GetConfigure(path, 'JDBCSampler', 'Nodelist'))
            if ('insert' in row['sqltext']) or ('INSERT' in row['sqltext']):
                item['testname'] = row['sys_name']+'_INSERT_'+str(j) #请求名称
            elif ('update' in row['sqltext']) or ('UPDATE' in row['sqltext']):
                item['testname'] = row['sys_name'] + '_UPDATE_' + str(j)  # 请求名称
            # elif ('select config' in row['sqltext']) or ('YY_CONFIG' in row['sqltext']) or ('select def_value' in row['sqltext']):
            #     item['testname'] = sysname + '_CONFIG_' + str(j)  # 请求名称
            elif (any(name in row['sqltext'] for name in ['select config','YY_CONFIG','select def_value','SELECT CONFIG'])):
                item['testname'] = row['sys_name'] + '_CONFIG_' + str(j)  # 请求名称
            else:
                item['testname'] = row['sys_name'] + '_' + str(j)  # 请求名称
 
            Itemlist.append(item)
            for node in Nodelist:
                (key,value),= node.items()
                if isinstance(value,list):
                    if value[0] == 'dataSource':
                        value.append(row['dbname'])
                    elif value[0] == 'query':
                        sqltext = row['sqltext']
                        value.append(sqltext)
            JDBCSamplerNode = self.AddNode('JDBCSampler', Itemlist, Nodelist)
            self.hashTreelevel3.appendChild(JDBCSamplerNode)
            self.hashTreeall = self.dom.createElement('hashTree')
            self.hashTreelevel3.appendChild(self.hashTreeall) #未添加执行计划
            self.hashTreelevel2.appendChild(self.hashTreelevel3)
            j = j + 1
        print('SQL业务请求添加完成,共计添加:', j-1)
 
 
 
    def AddTestPlanBeanshell(self,path):
 
        try:
            Itemlist = json.loads(GetConfigure(path,'BeanTestPlan','Itemlist'))
            Nodelist = json.loads(GetConfigure(path, 'BeanTestPlan', 'Nodelist'))
            Testlanscript = GetConfigure(path, 'BeanTestPlan', 'script')
            for node in Nodelist:
                (key, value), = node.items()
                if isinstance(value, list):
                    if value[0] == 'script':
                        value.append(Testlanscript)
            # print(Nodelist)
            TestPlanBeanshellnode = self.AddNode('BeanShellPostProcessor', Itemlist,Nodelist)
 
            # Testplan = hashTreeBeanShell.appendChild(TestPlanBeanshellnode)
            # print('BeanTestPlan节点处理完毕')
            return TestPlanBeanshellnode
        except Exception as f:
            print('BeanTestPlan节点处理异常', f)
 
 
    def AddNode(self,nodename, Attributelist,nodelist):
        # none = self.dom.createTextNode('')
        #添加node元素
        nodename = self.dom.createElement(nodename)
        for item in Attributelist:
            (key, value), = item.items()
            nodename.setAttribute(key, value)
        nodename.appendChild(self.none_value)
        #node添加子节点
        if nodelist:
            for node in nodelist:
                (key, value), = node.items()
                node = self.dom.createElement(key)
                if isinstance (value,list):
                    nodetext = self.dom.createTextNode(value[1])
                    nodevalue = value[0]
                else:
                    nodetext = self.dom.createTextNode('')
                    nodevalue = value
                node.appendChild(nodetext)
                node.setAttribute('name', nodevalue)
                nodename.appendChild(node)
        return nodename
 
    def ReplaceSsqlParam(self,sqltext,patid):
        date00 = re.compile(r'\d+00:00:\d{1,2}')
        date59 = re.compile(r'\d+23:00:\d{1,2}')
        # date = re.compile(r'\d+:(?!00|59)\d{1,2}:\d{1,2}')  #2019120420:02:35
        patidrule = '=.('+str(patid)+')'
        patid = re.compile(patidrule)
 
        #替换sql语句中的时间有关的条件
        if ('00:00:00' and '23:59:59') in sqltext:
            sqltext = date00.sub('${date00}', sqltext)
            sqltext = date59.sub('${date59}', sqltext)
        else:
            sqltext = date.sub('${date}', sqltext)
 
        sqltext = patid.sub('= ${PATID}', sqltext)
 
 
 
 
 
        return sqltext
 
 
        newsqltext = date00.sub('${date}', sqltext)
 
 
 
    def WriteXml(self):
        try:
 
            #判断输出文件夹是否存在,否则则创建
            if not os.path.exists(self.jmxpath):
                os.makedirs(self.jmxpath)
 
            file = open(os.path.join(self.jmxpath,self.jmxfile), 'w', encoding='utf-8')  # 指定文件编码格式
            # file = open(self.jmxfile, 'w',encoding='utf-8')#指定文件编码格式
            self.dom.writexml(file, indent='', addindent='  ', newl='\r',encoding='utf-8')
            #第一个参数是目标文件对象,第二个参数是根节点的缩进格式,第三个参数是其他子节点的缩进格式,
            # 第四个参数制定了换行格式,第五个参数制定了xml内容的编码。
            file.close()
            print('\n',self.jmxfile,'文件写入成功')
        except Exception as e:
            print('文件写入失败')
 
 
 
jmx = '11111.jmx'
confpath = r'Conf/conf.ini'
 
if __name__ == "__main__":
    # db = ['cis','his']
 
    # confpath = 'D:\Code\YongPyCodes\WriteXmlToJmeter\Conf\conf.ini'
    run = SqlToJmeterml(jmx,confpath)
    run.jmeterTestPlan()
    run.Testplan()
    # run.ThreadGroup()
    run.JDBCDataSource()
    run.sys_testgroup()
    # run.AddTestPlan()
    # run.AddTestPlanBeanshell(confpath)
    #
    # run.AddJdbc(confpath,db)
    # run.AddThreadGroup(confpath)
    run.WriteXml()
目录结构


本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责,本站只提供参考并不构成任何投资及应用建议。本站是一个个人学习交流的平台,网站上部分文章为转载,并不用于任何商业目的,我们已经尽可能的对作者和来源进行了通告,但是能力有限或疏忽,造成漏登,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

  • 项目经理 点击这里给我发消息
  • 项目经理 点击这里给我发消息
  • 项目经理 点击这里给我发消息
  • 项目经理 点击这里给我发消息