Files
PyDBLink/postgres_db.py
2025-12-29 22:08:58 +08:00

111 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PostgreSQL 数据库操作模块
支持基本的增删改查操作
"""
import sys
try:
import psycopg2
from psycopg2.extras import RealDictCursor
except ImportError as e:
print(f"错误: 无法导入 psycopg2 库,请确保已安装: pip install psycopg2-binary")
print(f"详细错误: {e}")
sys.exit(1)
class PostgreSQLDatabase:
"""PostgreSQL 数据库连接和操作类"""
def __init__(self, host='localhost', port=5432, user='postgres', password='', database='postgres'):
"""
初始化 PostgreSQL 连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param user: 用户名
:param password: 密码
:param database: 数据库名默认postgres系统数据库
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
cursor_factory=RealDictCursor,
connect_timeout=5
)
return True, "PostgreSQL 连接成功"
except Exception as e:
return False, f"PostgreSQL 连接失败: {str(e)}"
def execute(self, sql, params=None):
"""
执行 SQL 语句
:param sql: SQL 语句
:param params: 参数(可选)
:return: 执行结果
"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
# 判断是查询还是修改操作
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'WITH')):
result = cursor.fetchall()
# 转换为字典列表
return True, [dict(row) for row in result]
else:
self.connection.commit()
return True, f"影响行数: {cursor.rowcount}"
except Exception as e:
self.connection.rollback()
return False, f"执行失败: {str(e)}"
def list_databases(self):
"""列出所有数据库"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname")
result = cursor.fetchall()
databases = [db['datname'] for db in result]
return True, databases
except Exception as e:
return False, f"获取数据库列表失败: {str(e)}"
def use_database(self, database):
"""切换到指定数据库(需要重新连接)"""
if not self.connection:
return False, "未连接到数据库"
try:
# PostgreSQL 需要重新连接来切换数据库
self.connection.close()
self.database = database
return self.connect()
except Exception as e:
return False, f"切换数据库失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
return "PostgreSQL 连接已关闭"
return "连接已经关闭"