优化项目架构

This commit is contained in:
2025-09-15 19:10:37 +08:00
parent 4119ed3445
commit 26b856d74e
1361 changed files with 4 additions and 0 deletions

View File

@@ -0,0 +1,361 @@
import socket
import threading
import json
import time
import sys
import logging
import colorama
from datetime import datetime
# 初始化colorama以支持跨平台彩色终端输出
colorama.init()
# 自定义日志格式化器,带有颜色和分类
class MinecraftStyleFormatter(logging.Formatter):
"""Minecraft风格的日志格式化器带有颜色和分类"""
# ANSI颜色代码
COLORS = {
'RESET': colorama.Fore.RESET,
'BLACK': colorama.Fore.BLACK,
'RED': colorama.Fore.RED,
'GREEN': colorama.Fore.GREEN,
'YELLOW': colorama.Fore.YELLOW,
'BLUE': colorama.Fore.BLUE,
'MAGENTA': colorama.Fore.MAGENTA,
'CYAN': colorama.Fore.CYAN,
'WHITE': colorama.Fore.WHITE,
'BRIGHT_BLACK': colorama.Fore.LIGHTBLACK_EX,
'BRIGHT_RED': colorama.Fore.LIGHTRED_EX,
'BRIGHT_GREEN': colorama.Fore.LIGHTGREEN_EX,
'BRIGHT_YELLOW': colorama.Fore.LIGHTYELLOW_EX,
'BRIGHT_BLUE': colorama.Fore.LIGHTBLUE_EX,
'BRIGHT_MAGENTA': colorama.Fore.LIGHTMAGENTA_EX,
'BRIGHT_CYAN': colorama.Fore.LIGHTCYAN_EX,
'BRIGHT_WHITE': colorama.Fore.LIGHTWHITE_EX,
}
# 日志级别颜色类似于Minecraft
LEVEL_COLORS = {
'DEBUG': COLORS['BRIGHT_BLACK'],
'INFO': COLORS['WHITE'],
'WARNING': COLORS['YELLOW'],
'ERROR': COLORS['RED'],
'CRITICAL': COLORS['BRIGHT_RED'],
}
# 类别及其颜色
CATEGORIES = {
'SERVER': COLORS['BRIGHT_CYAN'],
'NETWORK': COLORS['BRIGHT_GREEN'],
'CLIENT': COLORS['BRIGHT_YELLOW'],
'SYSTEM': COLORS['BRIGHT_MAGENTA'],
}
def format(self, record):
# 获取日志级别颜色
level_color = self.LEVEL_COLORS.get(record.levelname, self.COLORS['WHITE'])
# 从记录名称中确定类别默认为SERVER
category_name = getattr(record, 'category', 'SERVER')
category_color = self.CATEGORIES.get(category_name, self.COLORS['WHITE'])
# 格式化时间戳类似于Minecraft[HH:MM:SS]
timestamp = datetime.now().strftime('%H:%M:%S')
# 格式化消息
formatted_message = f"{self.COLORS['BRIGHT_BLACK']}[{timestamp}] {category_color}[{category_name}] {level_color}{record.levelname}: {record.getMessage()}{self.COLORS['RESET']}"
return formatted_message
class TCPServer:
def __init__(self, host='127.0.0.1', port=9000, buffer_size=4096):
"""初始化TCP服务器"""
self.host = host
self.port = port
self.buffer_size = buffer_size
self.socket = None
self.clients = {} # 存储客户端连接 {client_id: (socket, address)}
self.running = False
self.client_buffers = {} # 每个客户端的消息缓冲区
# 配置日志
self._setup_logging()
def _setup_logging(self):
"""设置Minecraft风格的日志系统"""
# 创建日志器
self.logger = logging.getLogger('TCPServer')
self.logger.setLevel(logging.INFO)
# 清除任何现有的处理器
if self.logger.handlers:
self.logger.handlers.clear()
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 设置格式化器
formatter = MinecraftStyleFormatter()
console_handler.setFormatter(formatter)
# 添加处理器到日志器
self.logger.addHandler(console_handler)
def log(self, level, message, category='SERVER'):
"""使用指定的分类和级别记录日志"""
record = logging.LogRecord(
name=self.logger.name,
level=getattr(logging, level),
pathname='',
lineno=0,
msg=message,
args=(),
exc_info=None
)
record.category = category
# 检查是否存在控制台输入锁,如果存在则使用锁来避免打乱命令输入
if hasattr(self, '_console_input_lock'):
with self._console_input_lock:
self.logger.handle(record)
else:
self.logger.handle(record)
def start(self):
"""启动服务器"""
try:
# 创建TCP套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 禁用Nagle算法
# 绑定地址和监听
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
self.log('INFO', f"服务器启动,监听 {self.host}:{self.port}", 'SERVER')
# 接受客户端连接的主循环
self._accept_clients()
except Exception as e:
self.log('ERROR', f"服务器启动错误: {e}", 'SYSTEM')
self.stop()
def _accept_clients(self):
"""接受客户端连接的循环"""
while self.running:
try:
# 接受新的客户端连接
client_socket, address = self.socket.accept()
client_id = f"{address[0]}:{address[1]}"
self.log('INFO', f"新客户端连接: {client_id}", 'NETWORK')
# 存储客户端信息
self.clients[client_id] = (client_socket, address)
self.client_buffers[client_id] = ""
# 创建处理线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_id,)
)
client_thread.daemon = True
client_thread.start()
# 通知客户端连接成功
self.send_data(client_id, {"type": "connection_status", "status": "connected"})
except KeyboardInterrupt:
self.log('INFO', "收到中断信号,服务器停止中...", 'SYSTEM')
break
except Exception as e:
self.log('ERROR', f"接受连接时出错: {e}", 'NETWORK')
time.sleep(1) # 避免CPU过度使用
def _handle_client(self, client_id):
"""处理客户端消息的线程"""
client_socket, _ = self.clients.get(client_id, (None, None))
if not client_socket:
return
# 设置超时,用于定期检查连接状态
client_socket.settimeout(30)
while self.running and client_id in self.clients:
try:
# 接收数据
data = client_socket.recv(self.buffer_size)
if not data:
# 客户端断开连接
self.log('INFO', f"客户端 {client_id} 断开连接", 'CLIENT')
self._remove_client(client_id)
break
# 处理接收的数据
self._process_data(client_id, data)
except socket.timeout:
# 发送保活消息
try:
self.send_data(client_id, {"type": "ping"})
except:
self.log('INFO', f"客户端 {client_id} 连接超时", 'CLIENT')
self._remove_client(client_id)
break
except Exception as e:
self.log('ERROR', f"处理客户端 {client_id} 数据时出错: {e}", 'CLIENT')
self._remove_client(client_id)
break
def _process_data(self, client_id, data):
"""处理从客户端接收的数据"""
# 将接收的字节添加到缓冲区
try:
decoded_data = data.decode('utf-8')
self.client_buffers[client_id] += decoded_data
# 处理可能包含多条JSON消息的缓冲区
self._process_buffer(client_id)
except UnicodeDecodeError as e:
self.log('ERROR', f"解码客户端 {client_id} 数据出错: {e}", 'CLIENT')
def _process_buffer(self, client_id):
"""处理客户端消息缓冲区"""
buffer = self.client_buffers.get(client_id, "")
# 按换行符分割消息
while '\n' in buffer:
message_end = buffer.find('\n')
message_text = buffer[:message_end].strip()
buffer = buffer[message_end + 1:]
# 处理非空消息
if message_text:
try:
# 解析JSON消息
message = json.loads(message_text)
self.log('INFO', f"从客户端 {client_id} 接收JSON: {message}", 'CLIENT')
# 处理消息 - 实现自定义逻辑
self._handle_message(client_id, message)
except json.JSONDecodeError:
# 非JSON格式作为原始文本处理
self.log('INFO', f"从客户端 {client_id} 接收文本: {message_text}", 'CLIENT')
self._handle_raw_message(client_id, message_text)
# 更新缓冲区
self.client_buffers[client_id] = buffer
def _handle_message(self, client_id, message):
"""处理JSON消息 - 可被子类覆盖以实现自定义逻辑"""
# 默认实现:简单回显
response = {
"type": "response",
"original": message,
"timestamp": time.time()
}
self.send_data(client_id, response)
def _handle_raw_message(self, client_id, message):
"""处理原始文本消息 - 可被子类覆盖以实现自定义逻辑"""
# 默认实现:简单回显
response = {
"type": "text_response",
"content": f"收到: {message}",
"timestamp": time.time()
}
self.send_data(client_id, response)
def send_data(self, client_id, data):
"""向指定客户端发送JSON数据"""
if client_id not in self.clients:
self.log('WARNING', f"客户端 {client_id} 不存在,无法发送数据", 'NETWORK')
return False
client_socket, _ = self.clients[client_id]
try:
# 转换为JSON字符串添加换行符
if isinstance(data, (dict, list)):
message = json.dumps(data) + '\n'
else:
message = str(data) + '\n'
# 发送数据
client_socket.sendall(message.encode('utf-8'))
return True
except Exception as e:
self.log('ERROR', f"向客户端 {client_id} 发送数据时出错: {e}", 'NETWORK')
self._remove_client(client_id)
return False
def broadcast(self, data, exclude=None):
"""向所有客户端广播消息,可选排除特定客户端"""
exclude = exclude or []
for client_id in list(self.clients.keys()):
if client_id not in exclude:
self.send_data(client_id, data)
def _remove_client(self, client_id):
"""断开并移除客户端连接"""
if client_id in self.clients:
client_socket, _ = self.clients[client_id]
try:
client_socket.close()
except:
pass
del self.clients[client_id]
if client_id in self.client_buffers:
del self.client_buffers[client_id]
self.log('INFO', f"客户端 {client_id} 已移除", 'CLIENT')
def stop(self):
"""停止服务器"""
self.running = False
# 关闭所有客户端连接
for client_id in list(self.clients.keys()):
self._remove_client(client_id)
# 关闭服务器套接字
if self.socket:
try:
self.socket.close()
except:
pass
self.log('INFO', "服务器已停止", 'SERVER')
# 使用示例
if __name__ == "__main__":
try:
# 创建并启动服务器
server = TCPServer()
# 以阻塞方式启动服务器
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread.start()
# 运行直到按Ctrl+C
print("服务器运行中按Ctrl+C停止...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n程序被用户中断")
if 'server' in locals():
server.stop()
sys.exit(0)