初始化提交

This commit is contained in:
2025-12-29 22:08:58 +08:00
commit c9259b51c7
9 changed files with 1367 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__
test

214
README.md Normal file
View File

@@ -0,0 +1,214 @@
# 多数据库交互脚本
一个支持多种数据库的 Python 交互式命令行工具,可以方便地连接和操作各种数据库。
## 支持的数据库
- ✅ MySQL
- ✅ MongoDB
- ✅ Redis
- ✅ SQLite
- ✅ PostgreSQL
## 系统支持
- ✅ Windows
- ✅ Linux
- ✅ macOS
- ✅ Termux (Android)
## 安装依赖
```bash
# 安装所有依赖
pip install -r requirements.txt
# 或者单独安装需要的数据库驱动
pip install pymysql # MySQL
pip install pymongo # MongoDB
pip install redis # Redis
pip install psycopg2-binary # PostgreSQL
# SQLite 是 Python 内置模块,无需安装
```
## 使用方法
### 1. 启动脚本
```bash
python main.py
```
### 2. 选择数据库类型
启动后会提示选择数据库类型:
```
请选择数据库类型:
1. MySQL
2. MongoDB
3. Redis
4. SQLite
5. PostgreSQL
```
### 3. 输入连接信息
根据提示输入数据库的连接信息(主机、端口、用户名、密码等)。
### 4. 执行命令
连接成功后,就可以输入相应的命令进行数据库操作。
## 命令示例
### MySQL / PostgreSQL / SQLite
```sql
-- 查询
SELECT * FROM users;
SELECT name, age FROM users WHERE age > 18;
-- 插入
INSERT INTO users (name, age) VALUES ('张三', 25);
-- 更新
UPDATE users SET age = 26 WHERE name = '张三';
-- 删除
DELETE FROM users WHERE name = '张三';
-- 创建表
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER);
```
### MongoDB
MongoDB 使用 JSON 格式的命令:
```json
// 查询所有文档
{"collection": "users", "operation": "find", "query": {}}
// 条件查询
{"collection": "users", "operation": "find", "query": {"age": {"$gt": 18}}}
// 插入文档
{"collection": "users", "operation": "insert", "document": {"name": "张三", "age": 25}}
// 更新文档
{"collection": "users", "operation": "update", "query": {"name": "张三"}, "update": {"$set": {"age": 26}}}
// 删除文档
{"collection": "users", "operation": "delete", "query": {"name": "张三"}}
// 统计数量
{"collection": "users", "operation": "count", "query": {}}
```
### Redis
```
# 字符串操作
SET name 张三
GET name
DEL name
# 哈希操作
HSET user:1 name 张三
HSET user:1 age 25
HGET user:1 name
HGETALL user:1
# 列表操作
LPUSH mylist item1 item2 item3
LRANGE mylist 0 -1
# 集合操作
SADD myset member1 member2
SMEMBERS myset
# 查询所有键
KEYS *
# 测试连接
PING
```
## 内置命令
- `help` - 显示帮助信息
- `connect` - 连接到数据库
- `disconnect` - 断开数据库连接
- `status` - 查看连接状态
- `switch <db>` - 切换数据库类型 (mysql/mongodb/redis/sqlite/postgres)
- `quit``exit` - 退出程序
## 文件说明
- `main.py` - 主程序入口
- `mysql.py` - MySQL 数据库操作模块
- `mongodb.py` - MongoDB 数据库操作模块
- `redis.py` - Redis 数据库操作模块
- `sqlite.py` - SQLite 数据库操作模块
- `postgres.py` - PostgreSQL 数据库操作模块
- `requirements.txt` - 依赖包列表
## 注意事项
1. 首次使用前请确保已安装相应的数据库驱动
2. SQLite 会在当前目录创建数据库文件
3. 确保数据库服务已启动并可以访问
4. 使用完毕后建议执行 `disconnect` 命令断开连接
## 常见问题
### Q: 连接失败怎么办?
A: 请检查:
- 数据库服务是否已启动
- 主机地址和端口是否正确
- 用户名和密码是否正确
- 防火墙是否允许连接
### Q: Redis 连接时提示 "module 'redis' has no attribute 'Redis'" 怎么办?
A: 这个问题已经修复。如果仍然遇到,请:
1. 确保安装了正确的 redis 包:`pip install redis`
2. 检查是否有其他名为 redis.py 的文件冲突
3. 运行测试脚本检查依赖:`python test_import.py`
### Q: 如何检查依赖是否正确安装?
A: 运行测试脚本:
```bash
python test_import.py
```
这将检查所有必需的数据库驱动是否正确安装。
### Q: 如何在 Termux 中使用?
A: 在 Termux 中需要先安装 Python 和必要的依赖:
```bash
pkg install python
pkg install build-essential # 编译某些包需要
pip install -r requirements.txt
```
### Q: 如何切换数据库?
A: 使用 `switch <数据库类型>` 命令,例如:
```
switch mysql
switch mongodb
```
### Q: MongoDB 需要用户认证怎么办?
A: 在配置时会询问是否需要认证,选择 y 后输入用户名和密码即可。
## 许可证
MIT License

447
main.py Normal file
View File

@@ -0,0 +1,447 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多数据库交互脚本
支持 MySQL, MongoDB, Redis, SQLite, PostgreSQL
跨平台支持: Windows, Linux, Termux 等
"""
import sys
import json
# 导入数据库模块(处理可能的导入错误)
try:
from mysql_db import MySQLDatabase
from mongodb_db import MongoDatabase
from redis_db import RedisDatabase
from sqlite_db import SQLiteDatabase
from postgres_db import PostgreSQLDatabase
except ImportError as e:
print(f"错误: 导入数据库模块失败")
print(f"详细信息: {e}")
print("\n请确保已安装所需的依赖包:")
print(" pip install -r requirements.txt")
sys.exit(1)
def print_banner():
"""打印欢迎信息"""
banner = """
╔══════════════════════════════════════════╗
║ 多数据库交互工具 v1.0 ║
║ 支持: MySQL, MongoDB, Redis, ║
║ SQLite, PostgreSQL ║
╚══════════════════════════════════════════╝
"""
print(banner)
def print_help():
"""打印帮助信息"""
help_text = """
========================================================================
脚本通用命令:
- help : 显示帮助信息
- connect : 连接到数据库
- disconnect : 断开数据库连接
- status : 查看连接状态
- switch <db> : 切换数据库类型 (mysql/mongodb/redis/sqlite/postgres)
- show databases: 列出所有数据库 (仅MySQL/PostgreSQL)
- use <database>: 切换到指定数据库 (仅MySQL/PostgreSQL)
- quit/exit : 退出程序
========================================================================
MySQL/PostgreSQL/SQLite 通用SQL语句:
SELECT * FROM users;
INSERT INTO users (name, age) VALUES ('张三', 25);
UPDATE users SET age = 26 WHERE name = '张三';
DELETE FROM users WHERE name = '张三';
========================================================================
SQLite 点命令:
.tables - 列出所有表
.schema - 显示所有表的结构
.schema <table> - 显示特定表的结构
.databases - 列出数据库信息
.indexes - 列出所有索引
.indexes <table> - 列出特定表的索引
========================================================================
MongoDB Shell 命令:
show dbs - 列出所有数据库
show collections - 列出当前数据库的集合
use <database> - 切换数据库
========================================================================
MongoDB JSON 命令:
{"collection": "users", "operation": "find", "query": {}}
{"collection": "users", "operation": "insert", "document": {"name": "张三", "age": 25}}
{"collection": "users", "operation": "update", "query": {"name": "张三"}, "update": {"$set": {"age": 26}}}
{"collection": "users", "operation": "delete", "query": {"name": "张三"}}
========================================================================
Redis 键值命令:
SET name 张三
GET name
HSET user:1 name 张三 age 25
HGETALL user:1
KEYS *
========================================================================
"""
print(help_text)
def get_database_config(db_type):
"""
获取数据库配置
:param db_type: 数据库类型
:return: 配置字典
"""
config = {}
if db_type == 'mysql':
print("\n请输入 MySQL 配置信息:")
config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost'
config['port'] = int(input("端口 [3306]: ").strip() or '3306')
config['user'] = input("用户名 [root]: ").strip() or 'root'
config['password'] = input("密码: ").strip()
# 默认先列出数据库,然后让用户选择
config['database'] = None # 先不指定数据库
config['_list_databases'] = True # 标记需要列出数据库
elif db_type == 'mongodb':
print("\n请输入 MongoDB 配置信息:")
config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost'
config['port'] = int(input("端口 [27017]: ").strip() or '27017')
config['database'] = input("数据库名 [test]: ").strip() or 'test'
# 询问是否需要认证
need_auth = input("是否需要用户认证? (y/n) [n]: ").strip().lower()
if need_auth == 'y':
config['username'] = input("用户名: ").strip()
config['password'] = input("密码: ").strip()
else:
config['username'] = None
config['password'] = None
elif db_type == 'redis':
print("\n请输入 Redis 配置信息:")
config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost'
config['port'] = int(input("端口 [6379]: ").strip() or '6379')
config['db'] = int(input("数据库编号 [0]: ").strip() or '0')
password = input("密码 (留空表示无密码): ").strip()
config['password'] = password if password else None
elif db_type == 'sqlite':
print("\n请输入 SQLite 配置信息:")
config['database'] = input("数据库文件路径 [database.db]: ").strip() or 'database.db'
elif db_type == 'postgres':
print("\n请输入 PostgreSQL 配置信息:")
config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost'
config['port'] = int(input("端口 [5432]: ").strip() or '5432')
config['user'] = input("用户名 [postgres]: ").strip() or 'postgres'
config['password'] = input("密码: ").strip()
# 默认先列出数据库,然后让用户选择
config['database'] = 'postgres' # 先连接到默认数据库
config['_list_databases'] = True # 标记需要列出数据库
return config
def create_database_instance(db_type, config):
"""
创建数据库实例
:param db_type: 数据库类型
:param config: 配置字典
:return: 数据库实例
"""
if db_type == 'mysql':
return MySQLDatabase(**config)
elif db_type == 'mongodb':
return MongoDatabase(**config)
elif db_type == 'redis':
return RedisDatabase(**config)
elif db_type == 'sqlite':
return SQLiteDatabase(**config)
elif db_type == 'postgres':
return PostgreSQLDatabase(**config)
else:
return None
def format_output(data):
"""
格式化输出结果
:param data: 数据
:return: 格式化后的字符串
"""
if isinstance(data, list):
# 检查是否是字符串列表(如 .schema 的输出)
if data and all(isinstance(item, str) for item in data):
# 直接输出,保留换行符
return '\n\n'.join(data)
else:
# JSON 格式化
return json.dumps(data, ensure_ascii=False, indent=2)
elif isinstance(data, dict):
return json.dumps(data, ensure_ascii=False, indent=2)
else:
return str(data)
def main():
"""主函数"""
print_banner()
# 选择数据库类型
print("请选择数据库类型:")
print("1. MySQL")
print("2. MongoDB")
print("3. Redis")
print("4. SQLite")
print("5. PostgreSQL")
choice = input("\n请输入选项 (1-5): ").strip()
db_types = {
'1': 'mysql',
'2': 'mongodb',
'3': 'redis',
'4': 'sqlite',
'5': 'postgres'
}
if choice not in db_types:
print("无效的选项!")
return
current_db_type = db_types[choice]
print(f"\n已选择: {current_db_type.upper()}")
# 获取配置并创建数据库实例
config = get_database_config(current_db_type)
list_databases = config.pop('_list_databases', False)
db = create_database_instance(current_db_type, config)
if not db:
print("创建数据库实例失败!")
return
# 是否自动连接
#auto_connect = input("\n是否立即连接? (y/n) [y]: ").strip().lower()
#if auto_connect != 'n':
success, message = db.connect()
print(f"\n{message}")
connected = success
# 如果需要列出数据库
if connected and list_databases and hasattr(db, 'list_databases'):
success_list, result = db.list_databases()
if success_list:
print("\n可用的数据库:")
for i, db_name in enumerate(result, 1):
print(f" {i}. {db_name}")
# 让用户选择数据库
if current_db_type in ['mysql', 'postgres']:
while True:
db_choice = input("\n请输入数据库名或编号 (直接回车跳过): ").strip()
if not db_choice:
break
# 判断是数字还是数据库名
selected_db = None
if db_choice.isdigit():
index = int(db_choice) - 1
if 0 <= index < len(result):
selected_db = result[index]
else:
print(f"错误: 编号超出范围,请输入 1-{len(result)} 之间的数字")
retry = input("是否重新输入? (y/n) [y]: ").strip().lower()
if retry == 'n':
break
continue
else:
selected_db = db_choice
# 尝试切换数据库
use_success, use_msg = db.use_database(selected_db)
print(use_msg)
if use_success:
break
else:
retry = input("是否重新输入? (y/n) [y]: ").strip().lower()
if retry == 'n':
break
else:
print(f"\n{result}")
print("\n输入 'help' 查看帮助信息")
print("=" * 50)
# 主循环
while True:
try:
# 显示提示符
prompt = f"\n[{current_db_type.upper()}{'*' if connected else ''}]> "
command = input(prompt).strip()
if not command:
continue
# 处理特殊命令
cmd_lower = command.lower()
if cmd_lower in ['quit', 'exit']:
if connected:
print(db.close())
print("\n再见!")
break
elif cmd_lower == 'help':
print_help()
continue
elif cmd_lower == 'connect':
if connected:
print("已经连接到数据库")
else:
success, message = db.connect()
print(message)
connected = success
continue
elif cmd_lower == 'disconnect':
if not connected:
print("未连接到数据库")
else:
print(db.close())
connected = False
continue
elif cmd_lower == 'status':
status = "已连接" if connected else "未连接"
print(f"数据库类型: {current_db_type.upper()}")
print(f"连接状态: {status}")
if connected and hasattr(db, 'database') and db.database:
print(f"当前数据库: {db.database}")
continue
elif cmd_lower == 'show databases':
if not connected:
print("请先连接到数据库 (使用 'connect' 命令)")
elif not hasattr(db, 'list_databases'):
print("当前数据库类型不支持此命令")
else:
success, result = db.list_databases()
if success:
print("\n可用的数据库:")
for i, db_name in enumerate(result, 1):
print(f" {i}. {db_name}")
else:
print(f"\n{result}")
continue
elif cmd_lower.startswith('use '):
if not connected:
print("请先连接到数据库 (使用 'connect' 命令)")
elif not hasattr(db, 'use_database'):
print("当前数据库类型不支持此命令")
else:
db_input = command.split(maxsplit=1)[1].strip()
# 先获取数据库列表
list_success, db_list = db.list_databases()
# 判断是数字还是数据库名
selected_db = None
if db_input.isdigit() and list_success:
index = int(db_input) - 1
if 0 <= index < len(db_list):
selected_db = db_list[index]
else:
print(f"错误: 编号超出范围,请使用 'show databases' 查看可用数据库")
continue
else:
selected_db = db_input
# 尝试切换数据库
success, message = db.use_database(selected_db)
print(message)
# 如果失败,询问是否重试
if not success:
retry = input("是否重新输入数据库名? (y/n) [y]: ").strip().lower()
if retry != 'n':
while True:
new_db = input("请输入数据库名或编号: ").strip()
if not new_db:
break
# 再次判断数字或名称
if new_db.isdigit() and list_success:
idx = int(new_db) - 1
if 0 <= idx < len(db_list):
new_db = db_list[idx]
else:
print(f"错误: 编号超出范围")
continue
success2, message2 = db.use_database(new_db)
print(message2)
if success2:
break
retry_again = input("是否继续重试? (y/n) [y]: ").strip().lower()
if retry_again == 'n':
break
continue
elif cmd_lower.startswith('switch '):
new_type = cmd_lower.split()[1]
if new_type not in ['mysql', 'mongodb', 'redis', 'sqlite', 'postgres']:
print("无效的数据库类型!")
continue
# 关闭当前连接
if connected:
print(db.close())
connected = False
# 切换数据库
current_db_type = new_type
config = get_database_config(current_db_type)
db = create_database_instance(current_db_type, config)
print(f"已切换到 {current_db_type.upper()}")
continue
# 执行数据库命令
if not connected:
print("请先连接到数据库 (使用 'connect' 命令)")
continue
success, result = db.execute(command)
if success:
print(f"\n执行成功:")
print(format_output(result))
else:
print(f"\n执行失败:")
print(result)
except KeyboardInterrupt:
print("\n\n操作已取消")
continue
except EOFError:
print("\n\n再见!")
break
except Exception as e:
print(f"\n发生错误: {str(e)}")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n程序已退出")
sys.exit(0)

178
mongodb_db.py Normal file
View File

@@ -0,0 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MongoDB 数据库操作模块
支持基本的文档操作
"""
import sys
import json
try:
from pymongo import MongoClient
except ImportError as e:
print(f"错误: 无法导入 pymongo 库,请确保已安装: pip install pymongo")
print(f"详细错误: {e}")
sys.exit(1)
class MongoDatabase:
"""MongoDB 数据库连接和操作类"""
def __init__(self, host='localhost', port=27017, database='test', username=None, password=None):
"""
初始化 MongoDB 连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param database: 数据库名
:param username: 用户名(可选)
:param password: 密码(可选)
"""
self.host = host
self.port = port
self.database_name = database
self.username = username
self.password = password
self.client = None
self.db = None
def connect(self):
"""建立数据库连接"""
try:
# 构建连接参数
if self.username and self.password:
# 使用用户名密码认证
self.client = MongoClient(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
authSource=self.database_name, # 认证数据库
serverSelectionTimeoutMS=5000
)
else:
# 无需认证
self.client = MongoClient(
host=self.host,
port=self.port,
serverSelectionTimeoutMS=5000
)
# 测试连接
self.client.admin.command('ping')
self.db = self.client[self.database_name]
return True, f"MongoDB 连接成功 (数据库: {self.database_name})"
except Exception as e:
return False, f"MongoDB 连接失败: {str(e)}"
def execute(self, command):
"""
执行 MongoDB 命令
:param command: MongoDB 命令字符串JSON 格式或 shell 命令)
示例: {"collection": "users", "operation": "find", "query": {}}
或: show dbs, show collections, use <database>
"""
if self.db is None:
return False, "未连接到数据库"
# 处理 MongoDB shell 命令
cmd_lower = command.strip().lower()
# show dbs - 列出所有数据库
if cmd_lower in ['show dbs', 'show databases']:
try:
db_list = self.client.list_database_names()
result = []
for db_name in db_list:
db_stats = self.client[db_name].command('dbstats')
size_mb = db_stats.get('dataSize', 0) / (1024 * 1024)
result.append(f"{db_name}: {size_mb:.2f} MB")
return True, result
except Exception as e:
return False, f"执行失败: {str(e)}"
# show collections - 列出当前数据库的所有集合
elif cmd_lower in ['show collections', 'show tables']:
try:
collections = self.db.list_collection_names()
return True, collections
except Exception as e:
return False, f"执行失败: {str(e)}"
# use <database> - 切换数据库
elif cmd_lower.startswith('use '):
try:
db_name = command.strip().split(maxsplit=1)[1]
self.db = self.client[db_name]
self.database_name = db_name
return True, f"已切换到数据库: {db_name}"
except Exception as e:
return False, f"切换数据库失败: {str(e)}"
try:
# 解析命令
if isinstance(command, str):
cmd = json.loads(command)
else:
cmd = command
collection_name = cmd.get('collection')
operation = cmd.get('operation')
if not collection_name or not operation:
return False, "命令必须包含 collection 和 operation 字段"
collection = self.db[collection_name]
# 根据操作类型执行不同的命令
if operation == 'find':
query = cmd.get('query', {})
limit = cmd.get('limit', 0)
result = list(collection.find(query).limit(limit))
# 转换 ObjectId 为字符串
for doc in result:
if '_id' in doc:
doc['_id'] = str(doc['_id'])
return True, result
elif operation == 'insert':
document = cmd.get('document')
if not document:
return False, "插入操作需要 document 字段"
result = collection.insert_one(document)
return True, f"插入成功, ID: {result.inserted_id}"
elif operation == 'update':
query = cmd.get('query', {})
update = cmd.get('update')
if not update:
return False, "更新操作需要 update 字段"
result = collection.update_many(query, update)
return True, f"更新了 {result.modified_count} 条记录"
elif operation == 'delete':
query = cmd.get('query', {})
result = collection.delete_many(query)
return True, f"删除了 {result.deleted_count} 条记录"
elif operation == 'count':
query = cmd.get('query', {})
count = collection.count_documents(query)
return True, f"共有 {count} 条记录"
else:
return False, f"不支持的操作: {operation}"
except json.JSONDecodeError:
return False, "命令格式错误,请使用 JSON 格式"
except Exception as e:
return False, f"执行失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.client:
self.client.close()
self.client = None
self.db = None
return "MongoDB 连接已关闭"
return "连接已经关闭"

113
mysql_db.py Normal file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL 数据库操作模块
支持基本的增删改查操作
"""
import sys
try:
import pymysql
from pymysql.cursors import DictCursor
except ImportError as e:
print(f"错误: 无法导入 pymysql 库,请确保已安装: pip install pymysql")
print(f"详细错误: {e}")
sys.exit(1)
class MySQLDatabase:
"""MySQL 数据库连接和操作类"""
def __init__(self, host='localhost', port=3306, user='root', password='', database=None):
"""
初始化 MySQL 连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param user: 用户名
:param password: 密码
:param database: 数据库名(可选,不指定则只连接到服务器)
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
"""建立数据库连接"""
try:
conn_params = {
'host': self.host,
'port': self.port,
'user': self.user,
'password': self.password,
'charset': 'utf8mb4',
'cursorclass': DictCursor
}
# 如果指定了数据库,则连接到该数据库
if self.database:
conn_params['database'] = self.database
self.connection = pymysql.connect(**conn_params)
msg = f"MySQL 连接成功" + (f" (数据库: {self.database})" if self.database else " (未选择数据库)")
return True, msg
except Exception as e:
return False, f"MySQL 连接失败: {str(e)}"
def execute(self, sql, params=None):
"""
执行 SQL 语句
:param sql: SQL 语句
:param params: 参数(可选)
:return: 执行结果
"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
# 判断是查询还是修改操作
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESC', 'DESCRIBE')):
result = cursor.fetchall()
return True, result
else:
self.connection.commit()
return True, f"影响行数: {cursor.rowcount}"
except Exception as e:
self.connection.rollback()
return False, f"执行失败: {str(e)}"
def list_databases(self):
"""列出所有数据库"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute("SHOW DATABASES")
result = cursor.fetchall()
databases = [db['Database'] for db in result]
return True, databases
except Exception as e:
return False, f"获取数据库列表失败: {str(e)}"
def use_database(self, database):
"""切换到指定数据库"""
if not self.connection:
return False, "未连接到数据库"
try:
self.connection.select_db(database)
self.database = database
return True, f"已切换到数据库: {database}"
except Exception as e:
return False, f"切换数据库失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
return "MySQL 连接已关闭"
return "连接已经关闭"

110
postgres_db.py Normal file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PostgreSQL 数据库操作模块
支持基本的增删改查操作
"""
import sys
try:
import psycopg2
from psycopg2.extras import RealDictCursor
except ImportError as e:
print(f"错误: 无法导入 psycopg2 库,请确保已安装: pip install psycopg2-binary")
print(f"详细错误: {e}")
sys.exit(1)
class PostgreSQLDatabase:
"""PostgreSQL 数据库连接和操作类"""
def __init__(self, host='localhost', port=5432, user='postgres', password='', database='postgres'):
"""
初始化 PostgreSQL 连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param user: 用户名
:param password: 密码
:param database: 数据库名默认postgres系统数据库
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
cursor_factory=RealDictCursor,
connect_timeout=5
)
return True, "PostgreSQL 连接成功"
except Exception as e:
return False, f"PostgreSQL 连接失败: {str(e)}"
def execute(self, sql, params=None):
"""
执行 SQL 语句
:param sql: SQL 语句
:param params: 参数(可选)
:return: 执行结果
"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
# 判断是查询还是修改操作
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'WITH')):
result = cursor.fetchall()
# 转换为字典列表
return True, [dict(row) for row in result]
else:
self.connection.commit()
return True, f"影响行数: {cursor.rowcount}"
except Exception as e:
self.connection.rollback()
return False, f"执行失败: {str(e)}"
def list_databases(self):
"""列出所有数据库"""
if not self.connection:
return False, "未连接到数据库"
try:
with self.connection.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname")
result = cursor.fetchall()
databases = [db['datname'] for db in result]
return True, databases
except Exception as e:
return False, f"获取数据库列表失败: {str(e)}"
def use_database(self, database):
"""切换到指定数据库(需要重新连接)"""
if not self.connection:
return False, "未连接到数据库"
try:
# PostgreSQL 需要重新连接来切换数据库
self.connection.close()
self.database = database
return self.connect()
except Exception as e:
return False, f"切换数据库失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
return "PostgreSQL 连接已关闭"
return "连接已经关闭"

167
redis_db.py Normal file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Redis 数据库操作模块
支持基本的键值操作
"""
import sys
import os
import json
# 避免文件名冲突确保导入正确的redis库
try:
from redis import Redis as RedisClient
from redis import ConnectionError as RedisConnectionError
except ImportError as e:
print(f"错误: 无法导入 redis 库,请确保已安装: pip install redis")
print(f"详细错误: {e}")
sys.exit(1)
class RedisDatabase:
"""Redis 数据库连接和操作类"""
def __init__(self, host='localhost', port=6379, db=0, password=None):
"""
初始化 Redis 连接参数
:param host: Redis 主机地址
:param port: Redis 端口
:param db: 数据库编号0-15
:param password: 密码(可选)
"""
self.host = host
self.port = port
self.db = db
self.password = password
self.client = None
def connect(self):
"""建立数据库连接"""
try:
self.client = RedisClient(
host=self.host,
port=self.port,
db=self.db,
password=self.password,
decode_responses=True,
socket_connect_timeout=5
)
# 测试连接
self.client.ping()
return True, "Redis 连接成功"
except RedisConnectionError as e:
return False, f"Redis 连接失败: 无法连接到 Redis 服务器 - {str(e)}"
except Exception as e:
return False, f"Redis 连接失败: {str(e)}"
def execute(self, command):
"""
执行 Redis 命令
:param command: Redis 命令字符串
示例: "SET key value""GET key"
"""
if not self.client:
return False, "未连接到数据库"
try:
# 解析命令
parts = command.strip().split()
if not parts:
return False, "命令不能为空"
cmd = parts[0].upper()
args = parts[1:]
# 执行常用命令
if cmd == 'SET' and len(args) >= 2:
key = args[0]
value = ' '.join(args[1:])
result = self.client.set(key, value)
return True, "OK" if result else "FAIL"
elif cmd == 'GET' and len(args) >= 1:
key = args[0]
result = self.client.get(key)
return True, result if result is not None else "(nil)"
elif cmd == 'DEL' and len(args) >= 1:
result = self.client.delete(*args)
return True, f"删除了 {result} 个键"
elif cmd == 'EXISTS' and len(args) >= 1:
result = self.client.exists(*args)
return True, f"存在 {result} 个键"
elif cmd == 'KEYS' and len(args) >= 1:
pattern = args[0]
result = self.client.keys(pattern)
return True, result
elif cmd == 'HSET' and len(args) >= 3:
key = args[0]
field = args[1]
value = ' '.join(args[2:])
result = self.client.hset(key, field, value)
return True, f"设置字段成功: {result}"
elif cmd == 'HGET' and len(args) >= 2:
key = args[0]
field = args[1]
result = self.client.hget(key, field)
return True, result if result is not None else "(nil)"
elif cmd == 'HGETALL' and len(args) >= 1:
key = args[0]
result = self.client.hgetall(key)
return True, result
elif cmd == 'LPUSH' and len(args) >= 2:
key = args[0]
result = self.client.lpush(key, *args[1:])
return True, f"列表长度: {result}"
elif cmd == 'LRANGE' and len(args) >= 3:
key = args[0]
start = int(args[1])
stop = int(args[2])
result = self.client.lrange(key, start, stop)
return True, result
elif cmd == 'SADD' and len(args) >= 2:
key = args[0]
result = self.client.sadd(key, *args[1:])
return True, f"添加了 {result} 个元素"
elif cmd == 'SMEMBERS' and len(args) >= 1:
key = args[0]
result = self.client.smembers(key)
return True, list(result)
elif cmd == 'PING':
result = self.client.ping()
return True, "PONG" if result else "FAIL"
elif cmd == 'FLUSHDB':
result = self.client.flushdb()
return True, "OK" if result else "FAIL"
elif cmd == 'DBSIZE':
result = self.client.dbsize()
return True, f"数据库键数量: {result}"
else:
# 尝试执行原始命令
result = self.client.execute_command(*parts)
return True, result
except Exception as e:
return False, f"执行失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.client:
self.client.close()
self.client = None
return "Redis 连接已关闭"
return "连接已经关闭"

16
requirements.txt Normal file
View File

@@ -0,0 +1,16 @@
# 数据库交互脚本依赖包
# 安装命令: pip install -r requirements.txt
# MySQL 数据库
pymysql>=1.0.2
# MongoDB 数据库
pymongo>=4.0.0
# Redis 数据库
redis>=4.0.0
# PostgreSQL 数据库
psycopg2-binary>=2.9.0
# SQLite 是 Python 内置模块,无需额外安装

120
sqlite_db.py Normal file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SQLite 数据库操作模块
支持基本的增删改查操作
"""
import sqlite3
class SQLiteDatabase:
"""SQLite 数据库连接和操作类"""
def __init__(self, database='database.db'):
"""
初始化 SQLite 连接参数
:param database: 数据库文件路径
"""
self.database = database
self.connection = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = sqlite3.connect(self.database)
# 设置行工厂,使查询结果以字典形式返回
self.connection.row_factory = sqlite3.Row
return True, f"SQLite 连接成功 (数据库: {self.database})"
except Exception as e:
return False, f"SQLite 连接失败: {str(e)}"
def execute(self, sql, params=None):
"""
执行 SQL 语句或 SQLite 点命令
:param sql: SQL 语句或点命令(如 .tables, .schema
:param params: 参数(可选)
:return: 执行结果
"""
if not self.connection:
return False, "未连接到数据库"
# 处理 SQLite 点命令
cmd = sql.strip()
if cmd.startswith('.'):
try:
# .tables - 列出所有表
if cmd in ['.tables', '.table']:
cursor = self.connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
return True, tables
# .schema - 显示所有表的结构
elif cmd == '.schema':
cursor = self.connection.cursor()
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND sql IS NOT NULL ORDER BY name")
schemas = [row[0] for row in cursor.fetchall()]
return True, schemas
# .schema <table> - 显示特定表的结构
elif cmd.startswith('.schema '):
table_name = cmd.split(maxsplit=1)[1]
cursor = self.connection.cursor()
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
result = cursor.fetchone()
if result:
return True, result[0]
else:
return False, f"'{table_name}' 不存在"
# .databases - 列出数据库信息
elif cmd in ['.databases', '.database']:
return True, [f"main: {self.database}"]
# .indexes - 列出所有索引
elif cmd in ['.indexes', '.index']:
cursor = self.connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' ORDER BY name")
indexes = [row[0] for row in cursor.fetchall()]
return True, indexes
# .indexes <table> - 列出特定表的索引
elif cmd.startswith('.indexes ') or cmd.startswith('.index '):
table_name = cmd.split(maxsplit=1)[1]
cursor = self.connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND tbl_name=? ORDER BY name", (table_name,))
indexes = [row[0] for row in cursor.fetchall()]
return True, indexes
else:
return False, f"不支持的点命令: {cmd.split()[0]}"
except Exception as e:
return False, f"执行失败: {str(e)}"
# 执行标准 SQL 语句
try:
cursor = self.connection.cursor()
cursor.execute(sql, params or ())
# 判断是查询还是修改操作
if sql.strip().upper().startswith(('SELECT', 'PRAGMA')):
rows = cursor.fetchall()
# 转换为字典列表
result = [dict(row) for row in rows]
return True, result
else:
self.connection.commit()
return True, f"影响行数: {cursor.rowcount}"
except Exception as e:
self.connection.rollback()
return False, f"执行失败: {str(e)}"
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
return "SQLite 连接已关闭"
return "连接已经关闭"