应对SaaS系统报表性能问题的临时方案

在为我们的SaaS系统开发基础报表时,随着客户需求的增加,导出报表的字段越来越多,这导致统计报表的导出非常占用资源。最终,数据库服务器的CPU时不时被占用100%,影响了系统的整体性能。面对客户报告系统缓慢的问题,我们需要经常手动查杀一些慢查询。为了减轻手动操作的负担,今天编写了一个自动查杀慢查询的脚本。这个脚本会定期检查数据库中的慢查询,并自动终止占用资源过多的查询。以下是实现这一功能的步骤和脚本示例。

import psycopg2
import sched
import time
import threading

def connect_to_db():
    try:
        conn = psycopg2.connect(
            database="db_repair",
            user="postgres",
            password="xxxxxx",
            host="xxxxxxxxxx",
            port="5432"
        )
        #print("Opened database successfully")
        return conn
    except Exception as e:
        print(f"Error connecting to database: {e}")
        return None

def kill():
    print('执行'+str(time.time()))
    conn = connect_to_db()
    if conn is None:
        return
    cur = conn.cursor()
    try:
        cur.execute('''select 'select pg_terminate_backend(' || 进程id ||');' as "终止sql执行的语句" from (
                select distinct 进程id from (
                    select
                        c.relname 对象名称,
                        l.locktype 可锁对象的类型,
                        l.pid 进程id,
                        l.mode 持有的锁模式,
                        l.granted 是否已经对锁进行授权,
                        l.fastpath,
                        psa.datname 数据库名称,
                        psa.wait_event 等待事件,
                        psa.state 查询状态,
                        backend_xmin 是否执事务快照,
                        psa.query 执行语句,
                        now() - query_start 持续时间
                    from
                        pg_locks l
                        inner join pg_stat_activity psa on ( psa.pid = l.pid )
                        left outer join pg_class c on ( l.relation = c.oid )
                        -- where l.relation = 'tb_base_apparatus'::regclass
                    where    relkind ='r'
                        order by query_start asc
                ) aa where 持续时间 > '00:6:00'
            ) temp;''')
        rows = cur.fetchall()
        for row in rows:
            for sql in row:
                print(sql)
                cur.execute(sql)
                conn.commit()
    except Exception as e:
        print(f"Error executing query: {e}")
    finally:
        cur.close()
        conn.close()

def schedule_task(scheduler, interval, action, action_args=()):
    scheduler.enter(interval, 1, schedule_task, (scheduler, interval, action, action_args))
    action(*action_args)

def run_scheduler():
    scheduler = sched.scheduler(time.time, time.sleep)
    interval = 28  # interval in seconds
    scheduler.enter(0, 1, schedule_task, (scheduler, interval, kill))
    scheduler.run()

if __name__ == "__main__":
    print("Starting scheduled task...")
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.start()
    scheduler_thread.join()