#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MongoDB 数据库操作模块 支持基本的文档操作 """ import sys import json try: from pymongo import MongoClient except ImportError as e: print(f"错误: 无法导入 pymongo 库,请确保已安装: pip install pymongo") print(f"详细错误: {e}") sys.exit(1) class MongoDatabase: """MongoDB 数据库连接和操作类""" def __init__(self, host='localhost', port=27017, database='test', username=None, password=None): """ 初始化 MongoDB 连接参数 :param host: 数据库主机地址 :param port: 数据库端口 :param database: 数据库名 :param username: 用户名(可选) :param password: 密码(可选) """ self.host = host self.port = port self.database_name = database self.username = username self.password = password self.client = None self.db = None def connect(self): """建立数据库连接""" try: # 构建连接参数 if self.username and self.password: # 使用用户名密码认证 self.client = MongoClient( host=self.host, port=self.port, username=self.username, password=self.password, authSource=self.database_name, # 认证数据库 serverSelectionTimeoutMS=5000 ) else: # 无需认证 self.client = MongoClient( host=self.host, port=self.port, serverSelectionTimeoutMS=5000 ) # 测试连接 self.client.admin.command('ping') self.db = self.client[self.database_name] return True, f"MongoDB 连接成功 (数据库: {self.database_name})" except Exception as e: return False, f"MongoDB 连接失败: {str(e)}" def execute(self, command): """ 执行 MongoDB 命令 :param command: MongoDB 命令字符串(JSON 格式或 shell 命令) 示例: {"collection": "users", "operation": "find", "query": {}} 或: show dbs, show collections, use """ if self.db is None: return False, "未连接到数据库" # 处理 MongoDB shell 命令 cmd_lower = command.strip().lower() # show dbs - 列出所有数据库 if cmd_lower in ['show dbs', 'show databases']: try: db_list = self.client.list_database_names() result = [] for db_name in db_list: db_stats = self.client[db_name].command('dbstats') size_mb = db_stats.get('dataSize', 0) / (1024 * 1024) result.append(f"{db_name}: {size_mb:.2f} MB") return True, result except Exception as e: return False, f"执行失败: {str(e)}" # show collections - 列出当前数据库的所有集合 elif cmd_lower in ['show collections', 'show tables']: try: collections = self.db.list_collection_names() return True, collections except Exception as e: return False, f"执行失败: {str(e)}" # use - 切换数据库 elif cmd_lower.startswith('use '): try: db_name = command.strip().split(maxsplit=1)[1] self.db = self.client[db_name] self.database_name = db_name return True, f"已切换到数据库: {db_name}" except Exception as e: return False, f"切换数据库失败: {str(e)}" try: # 解析命令 if isinstance(command, str): cmd = json.loads(command) else: cmd = command collection_name = cmd.get('collection') operation = cmd.get('operation') if not collection_name or not operation: return False, "命令必须包含 collection 和 operation 字段" collection = self.db[collection_name] # 根据操作类型执行不同的命令 if operation == 'find': query = cmd.get('query', {}) limit = cmd.get('limit', 0) result = list(collection.find(query).limit(limit)) # 转换 ObjectId 为字符串 for doc in result: if '_id' in doc: doc['_id'] = str(doc['_id']) return True, result elif operation == 'insert': document = cmd.get('document') if not document: return False, "插入操作需要 document 字段" result = collection.insert_one(document) return True, f"插入成功, ID: {result.inserted_id}" elif operation == 'update': query = cmd.get('query', {}) update = cmd.get('update') if not update: return False, "更新操作需要 update 字段" result = collection.update_many(query, update) return True, f"更新了 {result.modified_count} 条记录" elif operation == 'delete': query = cmd.get('query', {}) result = collection.delete_many(query) return True, f"删除了 {result.deleted_count} 条记录" elif operation == 'count': query = cmd.get('query', {}) count = collection.count_documents(query) return True, f"共有 {count} 条记录" else: return False, f"不支持的操作: {operation}" except json.JSONDecodeError: return False, "命令格式错误,请使用 JSON 格式" except Exception as e: return False, f"执行失败: {str(e)}" def close(self): """关闭数据库连接""" if self.client: self.client.close() self.client = None self.db = None return "MongoDB 连接已关闭" return "连接已经关闭"