应对SaaS系统报表性能问题的临时方案

在为我们的SaaS系统开发基础报表时,随着客户需求的增加,导出报表的字段越来越多,这导致统计报表的导出非常占用资源。最终,数据库服务器的CPU时不时被占用100%,影响了系统的整体性能。面对客户报告系统缓慢的问题,我们需要经常手动查杀一些慢查询。为了减轻手动操作的负担,今天编写了一个自动查杀慢查询的脚本。这个脚本会定期检查数据库中的慢查询,并自动终止占用资源过多的查询。以下是实现这一功能的步骤和脚本示例。当然这只是临时方案解决不了根本性问题,还是需要可能在减少用户体验的基础上从技术上和产品需求上共同沟通解决。

import psycopg2
import sched
import time
import threading

def connect_to_db():
    try:
        conn = psycopg2.connect(
            database="db_repair",
            user="postgres",
            password="xxxxxx",
            host="xxxxxxxxxx",
            port="5432"
        )
        #print("Opened database successfully")
        return conn
    except Exception as e:
        print(f"Error connecting to database: {e}")
        return None

def kill():
    print('执行'+str(time.time()))
    conn = connect_to_db()
    if conn is None:
        return
    cur = conn.cursor()
    try:
        cur.execute('''select 'select pg_terminate_backend(' || 进程id ||');' as "终止sql执行的语句" from (
                select distinct 进程id from (
                    select
                        c.relname 对象名称,
                        l.locktype 可锁对象的类型,
                        l.pid 进程id,
                        l.mode 持有的锁模式,
                        l.granted 是否已经对锁进行授权,
                        l.fastpath,
                        psa.datname 数据库名称,
                        psa.wait_event 等待事件,
                        psa.state 查询状态,
                        backend_xmin 是否执事务快照,
                        psa.query 执行语句,
                        now() - query_start 持续时间
                    from
                        pg_locks l
                        inner join pg_stat_activity psa on ( psa.pid = l.pid )
                        left outer join pg_class c on ( l.relation = c.oid )
                        -- where l.relation = 'tb_base_apparatus'::regclass
                    where    relkind ='r'
                        order by query_start asc
                ) aa where 持续时间 > '00:6:00'
            ) temp;''')
        rows = cur.fetchall()
        for row in rows:
            for sql in row:
                print(sql)
                cur.execute(sql)
                conn.commit()
    except Exception as e:
        print(f"Error executing query: {e}")
    finally:
        cur.close()
        conn.close()

def schedule_task(scheduler, interval, action, action_args=()):
    scheduler.enter(interval, 1, schedule_task, (scheduler, interval, action, action_args))
    action(*action_args)

def run_scheduler():
    scheduler = sched.scheduler(time.time, time.sleep)
    interval = 28  # interval in seconds
    scheduler.enter(0, 1, schedule_task, (scheduler, interval, kill))
    scheduler.run()

if __name__ == "__main__":
    print("Starting scheduled task...")
    scheduler_thread = threading.Thread(target=run_scheduler)
    scheduler_thread.start()
    scheduler_thread.join()