291 lines
9.5 KiB
Python
291 lines
9.5 KiB
Python
from flask import current_app, jsonify
|
||
from flask_mail import Message
|
||
from flask_jwt_extended import create_access_token, create_refresh_token
|
||
from models import db, User, VerificationCode
|
||
from email_validator import validate_email, EmailNotValidError
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
|
||
class AuthService:
|
||
@staticmethod
|
||
def send_verification_email(email, code, purpose='register'):
|
||
"""发送验证码邮件"""
|
||
try:
|
||
subject = '验证码' if purpose == 'register' else '密码重置验证码'
|
||
body = f"""
|
||
您好!
|
||
|
||
感谢您使用 Nano Banana API 转售平台。
|
||
|
||
您的验证码是:{code}
|
||
|
||
此验证码有效期为10分钟,请勿泄露给他人。
|
||
|
||
如果不是您本人操作,请忽略此邮件。
|
||
|
||
---
|
||
Nano Banana API 平台
|
||
"""
|
||
msg = Message(
|
||
subject=f'[Nano Banana] {subject}',
|
||
recipients=[email],
|
||
body=body
|
||
)
|
||
|
||
# 这里使用 Flask-Mail 发送
|
||
from extensions import mail
|
||
mail.send(msg)
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f'发送邮件失败: {str(e)}')
|
||
return False
|
||
|
||
@staticmethod
|
||
def send_verification_code(data):
|
||
"""发送验证码逻辑"""
|
||
email = data.get('email', '').strip().lower()
|
||
purpose = data.get('purpose', 'register')
|
||
|
||
if not email:
|
||
return {'error': '邮箱不能为空'}, 400
|
||
|
||
# 验证邮箱格式
|
||
try:
|
||
validate_email(email, check_deliverability=False)
|
||
except EmailNotValidError:
|
||
return {'error': '邮箱格式不正确'}, 400
|
||
|
||
# 如果是注册,检查邮箱是否已存在
|
||
if purpose == 'register':
|
||
user = User.query.filter_by(email=email).first()
|
||
if user:
|
||
# 如果用户已存在且已验证邮箱,则提示已注册
|
||
if user.email_verified:
|
||
return {'error': '该邮箱已被注册'}, 400
|
||
# 如果用户存在但未验证,可以继续发送验证码(重发)
|
||
else:
|
||
# 创建临时用户记录用于验证码关联
|
||
user = User(email=email, username=email.split('@')[0])
|
||
user.set_password('temp_pending_registration')
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
else:
|
||
# 密码重置:检查用户是否存在
|
||
user = User.query.filter_by(email=email).first()
|
||
if not user:
|
||
return {'error': '用户不存在'}, 404
|
||
|
||
# 生成验证码
|
||
code = VerificationCode.generate_code()
|
||
|
||
# 删除旧的未使用验证码
|
||
VerificationCode.query.filter_by(
|
||
email=email,
|
||
purpose=purpose,
|
||
used=False
|
||
).delete()
|
||
|
||
# 创建新的验证码
|
||
verification = VerificationCode(
|
||
user_id=user.id,
|
||
email=email,
|
||
code=code,
|
||
purpose=purpose,
|
||
expired_at=datetime.utcnow() + timedelta(minutes=10)
|
||
)
|
||
db.session.add(verification)
|
||
db.session.commit()
|
||
|
||
# 发送邮件
|
||
send_success = AuthService.send_verification_email(email, code, purpose)
|
||
|
||
if send_success:
|
||
return {
|
||
'message': '验证码已发送到您的邮箱',
|
||
'email': email,
|
||
'purpose': purpose
|
||
}, 200
|
||
else:
|
||
return {'error': '邮件发送失败,请检查邮箱配置'}, 500
|
||
|
||
@staticmethod
|
||
def register(data):
|
||
"""用户注册逻辑"""
|
||
if not data or not data.get('email') or not data.get('password') or not data.get('code'):
|
||
return {'error': '邮箱、密码和验证码不能为空'}, 400
|
||
|
||
email = data.get('email').strip().lower()
|
||
password = data.get('password')
|
||
code = data.get('code')
|
||
username = data.get('username', '').strip()
|
||
|
||
try:
|
||
validate_email(email, check_deliverability=False)
|
||
except EmailNotValidError:
|
||
return {'error': '邮箱格式不正确'}, 400
|
||
|
||
if len(password) < 6:
|
||
return {'error': '密码长度至少为6位'}, 400
|
||
|
||
verification = VerificationCode.query.filter_by(
|
||
email=email,
|
||
code=code,
|
||
purpose='register',
|
||
used=False
|
||
).first()
|
||
|
||
if not verification:
|
||
return {'error': '验证码不存在或已过期'}, 400
|
||
|
||
if not verification.is_valid():
|
||
return {'error': '验证码已过期'}, 400
|
||
|
||
existing_user = User.query.filter_by(email=email).filter(
|
||
User.id != verification.user_id
|
||
).first()
|
||
|
||
if existing_user:
|
||
return {'error': '该邮箱已被注册'}, 400
|
||
|
||
user = User.query.get(verification.user_id)
|
||
user.username = username or email.split('@')[0]
|
||
user.set_password(password)
|
||
user.email_verified = True
|
||
user.email_verified_at = datetime.utcnow()
|
||
user.is_active = True
|
||
|
||
verification.used = True
|
||
|
||
try:
|
||
db.session.commit()
|
||
access_token = create_access_token(identity=user.id)
|
||
refresh_token = create_refresh_token(identity=user.id)
|
||
|
||
return {
|
||
'message': '注册成功',
|
||
'access_token': access_token,
|
||
'refresh_token': refresh_token,
|
||
'user': user.to_dict()
|
||
}, 201
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(f'注册错误: {str(e)}')
|
||
return {'error': '注册失败,请稍后重试'}, 500
|
||
|
||
@staticmethod
|
||
def login(data):
|
||
"""用户登录逻辑"""
|
||
if not data or not data.get('email') or not data.get('password'):
|
||
return {'error': '邮箱和密码不能为空'}, 400
|
||
|
||
email = data.get('email').strip().lower()
|
||
password = data.get('password')
|
||
|
||
user = User.query.filter_by(email=email).first()
|
||
|
||
if not user or not user.check_password(password):
|
||
return {'error': '邮箱或密码错误'}, 401
|
||
|
||
if not user.is_active:
|
||
return {'error': '账户已被禁用'}, 403
|
||
|
||
# 如果用户未验证邮箱 (理论上注册流程保证了已验证,但为了兼容旧数据)
|
||
if not user.email_verified:
|
||
return {'error': '邮箱未验证,请先完成注册验证'}, 403
|
||
|
||
access_token = create_access_token(identity=user.id)
|
||
refresh_token = create_refresh_token(identity=user.id)
|
||
|
||
return {
|
||
'message': '登录成功',
|
||
'access_token': access_token,
|
||
'refresh_token': refresh_token,
|
||
'user': user.to_dict()
|
||
}, 200
|
||
|
||
@staticmethod
|
||
def change_password(user_id, data):
|
||
"""修改密码"""
|
||
user = User.query.get(user_id)
|
||
|
||
if not user:
|
||
return {'error': '用户不存在'}, 404
|
||
|
||
old_password = data.get('old_password')
|
||
new_password = data.get('new_password')
|
||
|
||
if not old_password or not new_password:
|
||
return {'error': '参数不能为空'}, 400
|
||
|
||
if not user.check_password(old_password):
|
||
return {'error': '原密码错误'}, 401
|
||
|
||
if len(new_password) < 6:
|
||
return {'error': '新密码长度至少为6位'}, 400
|
||
|
||
user.set_password(new_password)
|
||
db.session.commit()
|
||
|
||
return {'message': '密码修改成功'}, 200
|
||
|
||
@staticmethod
|
||
def reset_password(data):
|
||
"""重置密码"""
|
||
email = data.get('email', '').strip().lower()
|
||
code = data.get('code')
|
||
new_password = data.get('new_password')
|
||
|
||
if not email or not code or not new_password:
|
||
return {'error': '参数不能为空'}, 400
|
||
|
||
# 验证验证码
|
||
verification = VerificationCode.query.filter_by(
|
||
email=email,
|
||
code=code,
|
||
purpose='password_reset',
|
||
used=False
|
||
).first()
|
||
|
||
if not verification or not verification.is_valid():
|
||
return {'error': '验证码不存在或已过期'}, 400
|
||
|
||
user = User.query.get(verification.user_id)
|
||
if not user:
|
||
return {'error': '用户不存在'}, 404
|
||
|
||
if len(new_password) < 6:
|
||
return {'error': '密码长度至少为6位'}, 400
|
||
|
||
# 更新密码
|
||
user.set_password(new_password)
|
||
verification.used = True
|
||
db.session.commit()
|
||
|
||
return {'message': '密码重置成功'}, 200
|
||
|
||
@staticmethod
|
||
def verify_code(data):
|
||
"""验证验证码是否有效"""
|
||
email = data.get('email', '').strip().lower()
|
||
code = data.get('code')
|
||
purpose = data.get('purpose', 'register')
|
||
|
||
if not email or not code:
|
||
return {'error': '参数不能为空'}, 400
|
||
|
||
verification = VerificationCode.query.filter_by(
|
||
email=email,
|
||
code=code,
|
||
purpose=purpose,
|
||
used=False
|
||
).first()
|
||
|
||
if not verification:
|
||
return {'error': '验证码错误'}, 400
|
||
|
||
if not verification.is_valid():
|
||
return {'error': '验证码已过期'}, 400
|
||
|
||
return {'message': '验证码有效'}, 200
|