import os import json from datetime import datetime from pathlib import Path from typing import List, Literal, Optional, Set from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel BASE_DIR = Path(__file__).resolve().parent # Markdown 根目录:指向当前后端项目中的 `mengyanote` 文件夹 MARKDOWN_ROOT = BASE_DIR / "mengyanote" # ignore.json 文件路径 IGNORE_FILE = MARKDOWN_ROOT / "ignore.json" def load_ignore_list() -> Set[str]: """从 ignore.json 加载需要忽略的文件夹列表""" if not IGNORE_FILE.exists(): return set() try: with open(IGNORE_FILE, 'r', encoding='utf-8') as f: data = json.load(f) return set(data.get('ignore', [])) except Exception: return set() # 加载忽略列表 IGNORE_LIST = load_ignore_list() class NodeType(str): FOLDER: Literal["folder"] = "folder" FILE: Literal["file"] = "file" class DirectoryNode(BaseModel): name: str path: str # 相对于 MARKDOWN_ROOT 的路径,使用 / 作为分隔符 type: Literal["folder", "file"] children: Optional[List["DirectoryNode"]] = None DirectoryNode.update_forward_refs() class FileContent(BaseModel): path: str content: str word_count: int = 0 file_size: int = 0 # 文件大小,字节 created_time: str = "" modified_time: str = "" app = FastAPI(title="MengyaNote Backend", version="1.0.0") app.add_middleware( CORSMiddleware, # 现在允许任意来源方便本地和静态托管访问,如 http://localhost:9090 allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def is_markdown_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() == ".md" def should_skip(entry: Path) -> bool: """判断是否应该跳过该文件或文件夹""" name = entry.name # 跳过隐藏文件/文件夹 if name.startswith("."): return True # 跳过 ignore.json 文件本身 if name == "ignore.json": return True # 跳过 ignore.json 中配置的文件夹 if entry.is_dir() and name in IGNORE_LIST: return True return False def build_directory_tree(root: Path) -> List[DirectoryNode]: """从文件系统构建目录树,结构尽量与原先 JSON 保持一致。""" if not root.exists() or not root.is_dir(): return [] def walk(current: Path, rel: Path) -> DirectoryNode: name = current.name rel_path_str = rel.as_posix() if rel.as_posix() != "." else "" if current.is_dir(): children_nodes: List[DirectoryNode] = [] for child in sorted(current.iterdir(), key=lambda p: (p.is_file(), p.name.lower())): if should_skip(child): continue child_rel = rel / child.name # 只收录 Markdown 文件和非空目录 if child.is_dir(): node = walk(child, child_rel) # 如果目录下完全没有 md 文件/子目录,可以选择丢弃 if node.children: children_nodes.append(node) elif is_markdown_file(child): children_nodes.append( DirectoryNode( name=child.name, path=child_rel.as_posix(), type="file", children=None, ) ) return DirectoryNode( name=name, path=rel_path_str or name, type="folder", children=children_nodes, ) else: # 单独文件的情况一般不会作为根调用 return DirectoryNode( name=name, path=rel_path_str or name, type="file", children=None, ) nodes: List[DirectoryNode] = [] for child in sorted(MARKDOWN_ROOT.iterdir(), key=lambda p: (p.is_file(), p.name.lower())): if should_skip(child): continue rel = Path(child.name) if child.is_dir(): node = walk(child, rel) if node.children: nodes.append(node) elif is_markdown_file(child): nodes.append( DirectoryNode( name=child.name, path=rel.as_posix(), type="file", children=None, ) ) return nodes def resolve_markdown_path(relative_path: str) -> Path: """将前端传入的相对路径安全地转换为磁盘路径,防止目录穿越。""" # 统一使用 / 分隔符 safe_path = relative_path.replace("\\", "/").lstrip("/") candidate = MARKDOWN_ROOT / safe_path try: candidate_resolved = candidate.resolve() except FileNotFoundError: candidate_resolved = candidate if not str(candidate_resolved).startswith(str(MARKDOWN_ROOT.resolve())): raise HTTPException(status_code=400, detail="非法路径") return candidate_resolved @app.get("/api/tree", response_model=List[DirectoryNode]) def get_directory_tree() -> List[DirectoryNode]: """ 获取 Markdown 目录树。 返回结构与原来的 directoryTree.json 尽量保持兼容: - name: 文件或文件夹名 - path: 相对路径(使用 /) - type: 'folder' | 'file' - children: 子节点数组 """ tree = build_directory_tree(MARKDOWN_ROOT) return tree @app.get("/api/file", response_model=FileContent) def get_markdown_file(path: str = Query(..., description="相对于根目录的 Markdown 路径")) -> FileContent: """ 获取指定 Markdown 文件内容。 Query 参数: - path: 例如 'AI/大语言模型的API 调用.md' """ file_path = resolve_markdown_path(path) if not file_path.exists() or not file_path.is_file() or not is_markdown_file(file_path): raise HTTPException(status_code=404, detail="文件不存在") try: content = file_path.read_text(encoding="utf-8") except UnicodeDecodeError: # 回退编码 content = file_path.read_text(encoding="utf-8", errors="ignore") # 获取文件统计信息 file_stat = file_path.stat() # 计算字数(去除空格和换行符) word_count = len(content.replace(" ", "").replace("\n", "").replace("\r", "").replace("\t", "")) # 获取文件大小(字节) file_size = file_stat.st_size # 获取创建时间和修改时间 try: # Windows 上 st_ctime 是创建时间,Linux 上是元数据更改时间 created_time = datetime.fromtimestamp(file_stat.st_ctime).strftime("%Y年%m月%d日 %H:%M:%S") except: created_time = "未知" try: modified_time = datetime.fromtimestamp(file_stat.st_mtime).strftime("%Y年%m月%d日 %H:%M:%S") except: modified_time = "未知" rel = file_path.relative_to(MARKDOWN_ROOT).as_posix() return FileContent( path=rel, content=content, word_count=word_count, file_size=file_size, created_time=created_time, modified_time=modified_time ) @app.get("/api/health") def health_check(): """简单健康检查接口。""" return {"status": "ok"} if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=int(os.getenv("PORT", 8000)), reload=True)