258 lines
7.5 KiB
Python
258 lines
7.5 KiB
Python
import os
|
||
import json
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List, Literal, Optional, Set
|
||
|
||
from fastapi import FastAPI, HTTPException, Query
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
# Markdown 根目录:指向当前后端项目中的 `mengyanote` 文件夹
|
||
MARKDOWN_ROOT = BASE_DIR / "mengyanote"
|
||
# ignore.json 文件路径
|
||
IGNORE_FILE = MARKDOWN_ROOT / "ignore.json"
|
||
|
||
|
||
def load_ignore_list() -> Set[str]:
|
||
"""从 ignore.json 加载需要忽略的文件夹列表"""
|
||
if not IGNORE_FILE.exists():
|
||
return set()
|
||
|
||
try:
|
||
with open(IGNORE_FILE, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
return set(data.get('ignore', []))
|
||
except Exception:
|
||
return set()
|
||
|
||
|
||
# 加载忽略列表
|
||
IGNORE_LIST = load_ignore_list()
|
||
|
||
|
||
class NodeType(str):
|
||
FOLDER: Literal["folder"] = "folder"
|
||
FILE: Literal["file"] = "file"
|
||
|
||
|
||
class DirectoryNode(BaseModel):
|
||
name: str
|
||
path: str # 相对于 MARKDOWN_ROOT 的路径,使用 / 作为分隔符
|
||
type: Literal["folder", "file"]
|
||
children: Optional[List["DirectoryNode"]] = None
|
||
|
||
|
||
DirectoryNode.update_forward_refs()
|
||
|
||
|
||
class FileContent(BaseModel):
|
||
path: str
|
||
content: str
|
||
word_count: int = 0
|
||
file_size: int = 0 # 文件大小,字节
|
||
created_time: str = ""
|
||
modified_time: str = ""
|
||
|
||
|
||
app = FastAPI(title="MengyaNote Backend", version="1.0.0")
|
||
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
# 现在允许任意来源方便本地和静态托管访问,如 http://localhost:9090
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def is_markdown_file(path: Path) -> bool:
|
||
return path.is_file() and path.suffix.lower() == ".md"
|
||
|
||
|
||
def should_skip(entry: Path) -> bool:
|
||
"""判断是否应该跳过该文件或文件夹"""
|
||
name = entry.name
|
||
|
||
# 跳过隐藏文件/文件夹
|
||
if name.startswith("."):
|
||
return True
|
||
|
||
# 跳过 ignore.json 文件本身
|
||
if name == "ignore.json":
|
||
return True
|
||
|
||
# 跳过 ignore.json 中配置的文件夹
|
||
if entry.is_dir() and name in IGNORE_LIST:
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def build_directory_tree(root: Path) -> List[DirectoryNode]:
|
||
"""从文件系统构建目录树,结构尽量与原先 JSON 保持一致。"""
|
||
if not root.exists() or not root.is_dir():
|
||
return []
|
||
|
||
def walk(current: Path, rel: Path) -> DirectoryNode:
|
||
name = current.name
|
||
rel_path_str = rel.as_posix() if rel.as_posix() != "." else ""
|
||
|
||
if current.is_dir():
|
||
children_nodes: List[DirectoryNode] = []
|
||
for child in sorted(current.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
|
||
if should_skip(child):
|
||
continue
|
||
child_rel = rel / child.name
|
||
# 只收录 Markdown 文件和非空目录
|
||
if child.is_dir():
|
||
node = walk(child, child_rel)
|
||
# 如果目录下完全没有 md 文件/子目录,可以选择丢弃
|
||
if node.children:
|
||
children_nodes.append(node)
|
||
elif is_markdown_file(child):
|
||
children_nodes.append(
|
||
DirectoryNode(
|
||
name=child.name,
|
||
path=child_rel.as_posix(),
|
||
type="file",
|
||
children=None,
|
||
)
|
||
)
|
||
|
||
return DirectoryNode(
|
||
name=name,
|
||
path=rel_path_str or name,
|
||
type="folder",
|
||
children=children_nodes,
|
||
)
|
||
else:
|
||
# 单独文件的情况一般不会作为根调用
|
||
return DirectoryNode(
|
||
name=name,
|
||
path=rel_path_str or name,
|
||
type="file",
|
||
children=None,
|
||
)
|
||
|
||
nodes: List[DirectoryNode] = []
|
||
for child in sorted(MARKDOWN_ROOT.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
|
||
if should_skip(child):
|
||
continue
|
||
rel = Path(child.name)
|
||
if child.is_dir():
|
||
node = walk(child, rel)
|
||
if node.children:
|
||
nodes.append(node)
|
||
elif is_markdown_file(child):
|
||
nodes.append(
|
||
DirectoryNode(
|
||
name=child.name,
|
||
path=rel.as_posix(),
|
||
type="file",
|
||
children=None,
|
||
)
|
||
)
|
||
|
||
return nodes
|
||
|
||
|
||
def resolve_markdown_path(relative_path: str) -> Path:
|
||
"""将前端传入的相对路径安全地转换为磁盘路径,防止目录穿越。"""
|
||
# 统一使用 / 分隔符
|
||
safe_path = relative_path.replace("\\", "/").lstrip("/")
|
||
candidate = MARKDOWN_ROOT / safe_path
|
||
try:
|
||
candidate_resolved = candidate.resolve()
|
||
except FileNotFoundError:
|
||
candidate_resolved = candidate
|
||
|
||
if not str(candidate_resolved).startswith(str(MARKDOWN_ROOT.resolve())):
|
||
raise HTTPException(status_code=400, detail="非法路径")
|
||
|
||
return candidate_resolved
|
||
|
||
|
||
@app.get("/api/tree", response_model=List[DirectoryNode])
|
||
def get_directory_tree() -> List[DirectoryNode]:
|
||
"""
|
||
获取 Markdown 目录树。
|
||
|
||
返回结构与原来的 directoryTree.json 尽量保持兼容:
|
||
- name: 文件或文件夹名
|
||
- path: 相对路径(使用 /)
|
||
- type: 'folder' | 'file'
|
||
- children: 子节点数组
|
||
"""
|
||
tree = build_directory_tree(MARKDOWN_ROOT)
|
||
return tree
|
||
|
||
|
||
@app.get("/api/file", response_model=FileContent)
|
||
def get_markdown_file(path: str = Query(..., description="相对于根目录的 Markdown 路径")) -> FileContent:
|
||
"""
|
||
获取指定 Markdown 文件内容。
|
||
|
||
Query 参数:
|
||
- path: 例如 'AI/大语言模型的API 调用.md'
|
||
"""
|
||
file_path = resolve_markdown_path(path)
|
||
|
||
if not file_path.exists() or not file_path.is_file() or not is_markdown_file(file_path):
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
try:
|
||
content = file_path.read_text(encoding="utf-8")
|
||
except UnicodeDecodeError:
|
||
# 回退编码
|
||
content = file_path.read_text(encoding="utf-8", errors="ignore")
|
||
|
||
# 获取文件统计信息
|
||
file_stat = file_path.stat()
|
||
|
||
# 计算字数(去除空格和换行符)
|
||
word_count = len(content.replace(" ", "").replace("\n", "").replace("\r", "").replace("\t", ""))
|
||
|
||
# 获取文件大小(字节)
|
||
file_size = file_stat.st_size
|
||
|
||
# 获取创建时间和修改时间
|
||
try:
|
||
# Windows 上 st_ctime 是创建时间,Linux 上是元数据更改时间
|
||
created_time = datetime.fromtimestamp(file_stat.st_ctime).strftime("%Y年%m月%d日 %H:%M:%S")
|
||
except:
|
||
created_time = "未知"
|
||
|
||
try:
|
||
modified_time = datetime.fromtimestamp(file_stat.st_mtime).strftime("%Y年%m月%d日 %H:%M:%S")
|
||
except:
|
||
modified_time = "未知"
|
||
|
||
rel = file_path.relative_to(MARKDOWN_ROOT).as_posix()
|
||
return FileContent(
|
||
path=rel,
|
||
content=content,
|
||
word_count=word_count,
|
||
file_size=file_size,
|
||
created_time=created_time,
|
||
modified_time=modified_time
|
||
)
|
||
|
||
|
||
@app.get("/api/health")
|
||
def health_check():
|
||
"""简单健康检查接口。"""
|
||
return {"status": "ok"}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
|
||
uvicorn.run("main:app", host="0.0.0.0", port=int(os.getenv("PORT", 8000)), reload=True)
|
||
|
||
|