Python使用Flask构建智能的静态文件服务器
2026-03-29 14:27:01
浏览次数:1
import os
import mimetypes
from datetime import datetime
from pathlib import Path
from flask import Flask, send_file, send_from_directory, request, jsonify, render_template_string, abort
from werkzeug.utils import secure_filename
import magic # 用于更准确的文件类型检测
from functools import wraps
import json
app = Flask(__name__)
# 配置文件
class Config:
# 默认服务目录(可修改)
BASE_DIR = os.path.expanduser("~/Downloads") # 默认为用户下载目录
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mp3', 'zip'}
MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB 文件大小限制
ENABLE_UPLOAD = True
ENABLE_SEARCH = True
ENABLE_PREVIEW = True
ENABLE_THUMBNAILS = True
PASSWORD_PROTECTED = False # 是否启用密码保护
PASSWORD = "admin123" # 默认密码(生产环境请修改)
app.config.from_object(Config)
# 创建HTML模板
HTML_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>智能文件服务器 - {{ current_path }}</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
border-radius: 15px;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
overflow: hidden;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
text-align: center;
}
.header h1 {
font-size: 2.5em;
margin-bottom: 10px;
}
.controls {
padding: 20px;
background: #f8f9fa;
border-bottom: 1px solid #dee2e6;
display: flex;
gap: 10px;
flex-wrap: wrap;
}
.search-box {
flex: 1;
min-width: 200px;
padding: 10px 15px;
border: 2px solid #667eea;
border-radius: 25px;
font-size: 16px;
outline: none;
transition: all 0.3s;
}
.search-box:focus {
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.3);
}
.btn {
padding: 10px 20px;
border: none;
border-radius: 25px;
cursor: pointer;
font-weight: 600;
transition: all 0.3s;
}
.btn-primary {
background: #667eea;
color: white;
}
.btn-primary:hover {
background: #5a6fd8;
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(0,0,0,0.2);
}
.breadcrumb {
padding: 20px;
background: #fff;
border-bottom: 1px solid #dee2e6;
}
.breadcrumb a {
color: #667eea;
text-decoration: none;
padding: 5px 10px;
border-radius: 5px;
transition: all 0.3s;
}
.breadcrumb a:hover {
background: #f0f2ff;
}
.file-list {
padding: 20px;
}
.file-item {
display: flex;
align-items: center;
padding: 15px;
border-bottom: 1px solid #e9ecef;
transition: all 0.3s;
cursor: pointer;
text-decoration: none;
color: inherit;
}
.file-item:hover {
background: #f8f9fa;
transform: translateX(10px);
box-shadow: 0 5px 15px rgba(0,0,0,0.1);
}
.file-icon {
width: 50px;
height: 50px;
background: #667eea;
color: white;
border-radius: 10px;
display: flex;
align-items: center;
justify-content: center;
margin-right: 15px;
font-size: 20px;
}
.file-info {
flex: 1;
}
.file-name {
font-weight: 600;
color: #333;
margin-bottom: 5px;
}
.file-meta {
font-size: 12px;
color: #6c757d;
display: flex;
gap: 15px;
}
.file-actions {
display: flex;
gap: 10px;
opacity: 0;
transition: opacity 0.3s;
}
.file-item:hover .file-actions {
opacity: 1;
}
.action-btn {
padding: 5px 15px;
border: 1px solid #667eea;
border-radius: 15px;
color: #667eea;
text-decoration: none;
font-size: 12px;
transition: all 0.3s;
}
.action-btn:hover {
background: #667eea;
color: white;
}
.upload-area {
padding: 20px;
border: 2px dashed #667eea;
border-radius: 10px;
text-align: center;
margin: 20px;
transition: all 0.3s;
}
.upload-area:hover {
background: #f0f2ff;
border-color: #5a6fd8;
}
.upload-btn {
display: inline-block;
padding: 10px 30px;
background: #667eea;
color: white;
border-radius: 25px;
cursor: pointer;
transition: all 0.3s;
}
.upload-btn:hover {
background: #5a6fd8;
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(0,0,0,0.2);
}
.preview-container {
padding: 20px;
background: #f8f9fa;
margin: 20px;
border-radius: 10px;
display: none;
}
.preview-container.active {
display: block;
}
@media (max-width: 768px) {
.container {
border-radius: 0;
}
.controls {
flex-direction: column;
}
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📁 智能文件服务器</h1>
<p>当前目录: {{ current_path }}</p>
</div>
{% if config.ENABLE_SEARCH %}
<div class="controls">
<input type="text"
class="search-box"
placeholder="搜索文件..."
id="searchInput"
onkeyup="searchFiles()">
<button class="btn btn-primary" onclick="refreshList()">刷新</button>
{% if config.ENABLE_UPLOAD %}
<button class="btn btn-primary" onclick="showUpload()">上传文件</button>
{% endif %}
</div>
{% endif %}
<div class="breadcrumb">
<a href="/">🏠 根目录</a>
{% for part in breadcrumb %}
/ <a href="{{ part.url }}">{{ part.name }}</a>
{% endfor %}
</div>
<div id="fileList" class="file-list">
{% for item in items %}
<a href="{{ item.url }}" class="file-item"
data-name="{{ item.name }}"
data-type="{{ item.type }}">
<div class="file-icon">
{% if item.type == 'directory' %}
📁
{% elif item.type == 'image' %}
🖼️
{% elif item.type == 'video' %}
🎬
{% elif item.type == 'audio' %}
🎵
{% elif item.type == 'pdf' %}
📄
{% elif item.type == 'text' %}
📝
{% else %}
📎
{% endif %}
</div>
<div class="file-info">
<div class="file-name">{{ item.name }}</div>
<div class="file-meta">
<span>📅 {{ item.modified }}</span>
{% if item.type != 'directory' %}
<span>⚖️ {{ item.size }}</span>
<span>📊 {{ item.mime }}</span>
{% endif %}
</div>
</div>
<div class="file-actions">
{% if item.type != 'directory' %}
<a href="{{ item.url }}" class="action-btn" download>下载</a>
{% if config.ENABLE_PREVIEW %}
<a href="#" class="action-btn" onclick="previewFile('{{ item.url }}')">预览</a>
{% endif %}
{% endif %}
</div>
</a>
{% endfor %}
</div>
{% if config.ENABLE_UPLOAD %}
<div id="uploadArea" class="upload-area" style="display: none;">
<h3>📤 上传文件</h3>
<form id="uploadForm" enctype="multipart/form-data">
<input type="file" name="file" id="fileInput" multiple
style="display: none;" onchange="uploadFiles()">
<div class="upload-btn" onclick="document.getElementById('fileInput').click()">
选择文件
</div>
<div id="uploadProgress" style="margin-top: 10px; display: none;">
<progress id="progressBar" value="0" max="100"></progress>
<span id="progressText">0%</span>
</div>
</form>
</div>
{% endif %}
<div id="previewContainer" class="preview-container"></div>
</div>
<script>
function searchFiles() {
const searchTerm = document.getElementById('searchInput').value.toLowerCase();
const fileItems = document.querySelectorAll('.file-item');
fileItems.forEach(item => {
const fileName = item.getAttribute('data-name').toLowerCase();
if (fileName.includes(searchTerm)) {
item.style.display = 'flex';
} else {
item.style.display = 'none';
}
});
}
function refreshList() {
window.location.reload();
}
function showUpload() {
const uploadArea = document.getElementById('uploadArea');
uploadArea.style.display = uploadArea.style.display === 'none' ? 'block' : 'none';
}
async function uploadFiles() {
const fileInput = document.getElementById('fileInput');
const files = fileInput.files;
const progressBar = document.getElementById('progressBar');
const progressText = document.getElementById('progressText');
const uploadProgress = document.getElementById('uploadProgress');
if (files.length === 0) return;
uploadProgress.style.display = 'block';
for (let i = 0; i < files.length; i++) {
const formData = new FormData();
formData.append('file', files[i]);
try {
const response = await fetch('/upload', {
method: 'POST',
body: formData
});
const result = await response.json();
if (result.success) {
progressBar.value = ((i + 1) / files.length) * 100;
progressText.textContent = Math.round(((i + 1) / files.length) * 100) + '%';
} else {
alert('上传失败: ' + result.error);
}
} catch (error) {
alert('上传出错: ' + error);
}
}
setTimeout(() => {
refreshList();
}, 1000);
}
function previewFile(url) {
const previewContainer = document.getElementById('previewContainer');
const ext = url.split('.').pop().toLowerCase();
previewContainer.innerHTML = '';
previewContainer.classList.add('active');
if (['jpg', 'jpeg', 'png', 'gif', 'bmp'].includes(ext)) {
previewContainer.innerHTML = `
<h3>图片预览</h3>
<img src="${url}" style="max-width: 100%; max-height: 500px; border-radius: 10px;">
`;
} else if (['mp4', 'webm', 'ogg'].includes(ext)) {
previewContainer.innerHTML = `
<h3>视频预览</h3>
<video controls style="max-width: 100%; max-height: 500px;">
<source src="${url}" type="video/${ext}">
</video>
`;
} else if (['mp3', 'wav', 'ogg'].includes(ext)) {
previewContainer.innerHTML = `
<h3>音频预览</h3>
<audio controls style="width: 100%;">
<source src="${url}" type="audio/${ext}">
</audio>
`;
} else if (ext === 'pdf') {
previewContainer.innerHTML = `
<h3>PDF预览</h3>
<iframe src="${url}" style="width: 100%; height: 500px; border: none;"></iframe>
`;
} else if (['txt', 'csv', 'json', 'xml', 'html', 'css', 'js'].includes(ext)) {
fetch(url)
.then(response => response.text())
.then(text => {
previewContainer.innerHTML = `
<h3>文本预览</h3>
<pre style="background: #f8f9fa; padding: 20px; border-radius: 5px;
max-height: 500px; overflow: auto; white-space: pre-wrap;">
${escapeHtml(text.substring(0, 10000))}
</pre>
`;
});
} else {
previewContainer.innerHTML = `
<h3>不支持预览的文件类型</h3>
<p>请下载后查看</p>
`;
}
// 点击其他地方关闭预览
setTimeout(() => {
document.addEventListener('click', function closePreview(e) {
if (!previewContainer.contains(e.target)) {
previewContainer.classList.remove('active');
document.removeEventListener('click', closePreview);
}
});
}, 100);
}
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
// 拖拽上传
document.addEventListener('DOMContentLoaded', function() {
const uploadArea = document.getElementById('uploadArea');
if (uploadArea) {
uploadArea.addEventListener('dragover', (e) => {
e.preventDefault();
uploadArea.style.background = '#e8f0fe';
});
uploadArea.addEventListener('dragleave', (e) => {
e.preventDefault();
uploadArea.style.background = '';
});
uploadArea.addEventListener('drop', (e) => {
e.preventDefault();
uploadArea.style.background = '';
const files = e.dataTransfer.files;
const fileInput = document.getElementById('fileInput');
// 创建一个新的FileList(由于FileList是只读的,需要特殊处理)
const dataTransfer = new DataTransfer();
for (let i = 0; i < files.length; i++) {
dataTransfer.items.add(files[i]);
}
fileInput.files = dataTransfer.files;
uploadFiles();
});
}
});
</script>
</body>
</html>
'''
def format_size(size):
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
def get_file_info(filepath):
"""获取文件详细信息"""
stat = os.stat(filepath)
mime_type = magic.Magic(mime=True).from_file(filepath) if hasattr(magic, 'Magic') else mimetypes.guess_type(filepath)[0]
# 判断文件类型
if mime_type:
if mime_type.startswith('image/'):
file_type = 'image'
elif mime_type.startswith('video/'):
file_type = 'video'
elif mime_type.startswith('audio/'):
file_type = 'audio'
elif mime_type == 'application/pdf':
file_type = 'pdf'
elif mime_type.startswith('text/'):
file_type = 'text'
else:
file_type = 'other'
else:
file_type = 'other'
return {
'name': os.path.basename(filepath),
'path': filepath,
'size': format_size(stat.st_size),
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'mime': mime_type or '未知',
'type': file_type
}
def check_auth():
"""检查身份验证(如果需要)"""
if not app.config['PASSWORD_PROTECTED']:
return True
auth = request.authorization
if auth and auth.username == 'admin' and auth.password == app.config['PASSWORD']:
return True
return False
def requires_auth(f):
"""身份验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
if not check_auth():
return jsonify({'error': '需要身份验证'}), 401
return f(*args, **kwargs)
return decorated
@app.route('/')
@app.route('/<path:subpath>')
def index(subpath=''):
"""主页面,显示文件列表"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
return ('需要身份验证', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
# 构建完整路径
full_path = os.path.join(app.config['BASE_DIR'], subpath)
# 安全检查:确保路径在BASE_DIR内
try:
full_path = os.path.abspath(full_path)
if not full_path.startswith(os.path.abspath(app.config['BASE_DIR'])):
abort(403)
except:
abort(404)
if not os.path.exists(full_path):
abort(404)
# 获取文件和目录列表
items = []
try:
for entry in os.scandir(full_path):
if entry.is_dir():
items.append({
'name': entry.name + '/',
'url': f'/{os.path.relpath(entry.path, app.config["BASE_DIR"])}',
'type': 'directory',
'modified': datetime.fromtimestamp(entry.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'size': '-'
})
else:
file_info = get_file_info(entry.path)
file_info['url'] = f'/file/{os.path.relpath(entry.path, app.config["BASE_DIR"])}'
items.append(file_info)
# 按类型和名称排序:目录在前,文件在后
items.sort(key=lambda x: (0 if x['type'] == 'directory' else 1, x['name'].lower()))
except PermissionError:
abort(403)
# 构建面包屑导航
breadcrumb = []
path_parts = subpath.split('/') if subpath else []
current_path = ''
for part in path_parts:
if part:
current_path += '/' + part
breadcrumb.append({
'name': part,
'url': current_path
})
return render_template_string(HTML_TEMPLATE,
items=items,
current_path=subpath or '/',
breadcrumb=breadcrumb,
config=app.config)
@app.route('/file/<path:filename>')
def serve_file(filename):
"""提供文件下载"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
abort(401)
file_path = os.path.join(app.config['BASE_DIR'], filename)
# 安全检查
try:
file_path = os.path.abspath(file_path)
if not file_path.startswith(os.path.abspath(app.config['BASE_DIR'])):
abort(403)
except:
abort(404)
if not os.path.exists(file_path):
abort(404)
return send_file(file_path, as_attachment=False)
@app.route('/upload', methods=['POST'])
def upload_file():
"""处理文件上传"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
abort(401)
if not app.config['ENABLE_UPLOAD']:
return jsonify({'success': False, 'error': '文件上传功能已禁用'})
if 'file' not in request.files:
return jsonify({'success': False, 'error': '没有选择文件'})
files = request.files.getlist('file')
upload_path = request.args.get('path', '')
target_dir = os.path.join(app.config['BASE_DIR'], upload_path)
# 创建目标目录(如果不存在)
os.makedirs(target_dir, exist_ok=True)
uploaded_files = []
for file in files:
if file.filename == '':
continue
# 安全检查文件名
filename = secure_filename(file.filename)
file_path = os.path.join(target_dir, filename)
# 避免文件名冲突
counter = 1
name, ext = os.path.splitext(filename)
while os.path.exists(file_path):
filename = f"{name}_{counter}{ext}"
file_path = os.path.join(target_dir, filename)
counter += 1
try:
file.save(file_path)
uploaded_files.append(filename)
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
return jsonify({
'success': True,
'message': f'成功上传 {len(uploaded_files)} 个文件',
'files': uploaded_files
})
@app.route('/search')
def search_files():
"""搜索文件"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
abort(401)
if not app.config['ENABLE_SEARCH']:
return jsonify({'error': '搜索功能已禁用'})
query = request.args.get('q', '').lower()
if not query:
return jsonify({'results': []})
results = []
for root, dirs, files in os.walk(app.config['BASE_DIR']):
for file in files:
if query in file.lower():
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, app.config['BASE_DIR'])
file_info = get_file_info(file_path)
file_info['url'] = f'/file/{rel_path}'
file_info['directory'] = os.path.relpath(root, app.config['BASE_DIR'])
results.append(file_info)
return jsonify({'results': results})
@app.route('/api/info')
def api_info():
"""获取服务器信息API"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
abort(401)
total_size = 0
file_count = 0
dir_count = 0
for root, dirs, files in os.walk(app.config['BASE_DIR']):
dir_count += len(dirs)
file_count += len(files)
for file in files:
try:
total_size += os.path.getsize(os.path.join(root, file))
except:
pass
return jsonify({
'base_directory': app.config['BASE_DIR'],
'total_files': file_count,
'total_directories': dir_count,
'total_size': format_size(total_size),
'features': {
'upload_enabled': app.config['ENABLE_UPLOAD'],
'search_enabled': app.config['ENABLE_SEARCH'],
'preview_enabled': app.config['ENABLE_PREVIEW'],
'password_protected': app.config['PASSWORD_PROTECTED']
}
})
@app.route('/api/create-folder', methods=['POST'])
def create_folder():
"""创建新文件夹"""
if app.config['PASSWORD_PROTECTED'] and not check_auth():
abort(401)
data = request.get_json()
folder_name = data.get('name', '').strip()
parent_path = data.get('path', '').strip()
if not folder_name:
return jsonify({'success': False, 'error': '文件夹名不能为空'})
# 安全检查文件夹名
folder_name = secure_filename(folder_name)
if not folder_name:
return jsonify({'success': False, 'error': '无效的文件夹名'})
target_dir = os.path.join(app.config['BASE_DIR'], parent_path, folder_name)
try:
os.makedirs(target_dir, exist_ok=False)
return jsonify({'success': True, 'message': f'文件夹 {folder_name} 创建成功'})
except FileExistsError:
return jsonify({'success': False, 'error': '文件夹已存在'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.errorhandler(404)
def not_found(error):
"""404错误处理"""
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>404 - 文件未找到</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
padding: 50px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
h1 { font-size: 4em; margin: 0; }
p { font-size: 1.2em; }
a {
color: white;
text-decoration: none;
padding: 10px 30px;
background: rgba(255,255,255,0.2);
border-radius: 25px;
display: inline-block;
margin-top: 20px;
}
</style>
</head>
<body>
<h1>404</h1>
<p>请求的文件或目录不存在</p>
<a href="/">返回首页</a>
</body>
</html>
'''), 404
@app.errorhandler(403)
def forbidden(error):
"""403错误处理"""
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>403 - 禁止访问</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
padding: 50px;
background: linear-gradient(135deg, #ff6b6b 0%, #c92a2a 100%);
color: white;
}
h1 { font-size: 4em; margin: 0; }
p { font-size: 1.2em; }
a {
color: white;
text-decoration: none;
padding: 10px 30px;
background: rgba(255,255,255,0.2);
border-radius: 25px;
display: inline-block;
margin-top: 20px;
}
</style>
</head>
<body>
<h1>403</h1>
<p>您没有权限访问此内容</p>
<a href="/">返回首页</a>
</body>
</html>
'''), 403
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='智能静态文件服务器')
parser.add_argument('--host', default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)')
parser.add_argument('--port', type=int, default=5000, help='监听端口 (默认: 5000)')
parser.add_argument('--dir', default='', help='服务目录 (默认: 用户下载目录)')
parser.add_argument('--password', default='', help='设置访问密码 (默认: 无密码)')
parser.add_argument('--debug', action='store_true', help='启用调试模式')
args = parser.parse_args()
# 更新配置
if args.dir:
app.config['BASE_DIR'] = os.path.abspath(args.dir)
if args.password:
app.config['PASSWORD_PROTECTED'] = True
app.config['PASSWORD'] = args.password
# 确保目录存在
os.makedirs(app.config['BASE_DIR'], exist_ok=True)
print(f"🚀 启动智能文件服务器...")
print(f"📁 服务目录: {app.config['BASE_DIR']}")
print(f"🌐 访问地址: http://{args.host}:{args.port}")
print(f"🔐 密码保护: {'已启用' if app.config['PASSWORD_PROTECTED'] else '未启用'}")
print(f"⚙️ 功能状态:")
print(f" • 文件上传: {'✅ 已启用' if app.config['ENABLE_UPLOAD'] else '❌ 已禁用'}")
print(f" • 文件搜索: {'✅ 已启用' if app.config['ENABLE_SEARCH'] else '❌ 已禁用'}")
print(f" • 文件预览: {'✅ 已启用' if app.config['ENABLE_PREVIEW'] else '❌ 已禁用'}")
print("\n📋 支持的API端点:")
print(" • GET / - 文件浏览器界面")
print(" • GET /file/<path> - 下载文件")
print(" • POST /upload - 上传文件")
print(" • GET /search?q=... - 搜索文件")
print(" • GET /api/info - 服务器信息")
print(" • POST /api/create-folder - 创建文件夹")
print("\n按 Ctrl+C 停止服务器")
# 检查是否需要安装python-magic
try:
import magic
except ImportError:
print("\n⚠️ 警告: 未找到python-magic库")
print(" 安装命令: pip install python-magic (Linux/Mac)")
print(" 或: pip install python-magic-bin (Windows)")
print("\n 文件类型检测功能将受限")
app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == '__main__':
main()
# 基础依赖
pip install flask
# 文件类型检测(推荐安装)
# Linux/Mac:
pip install python-magic
# Windows:
pip install python-magic-bin
# 或者使用系统包管理器
# Ubuntu/Debian: sudo apt-get install libmagic1
# macOS: brew install libmagic
基本启动:
python server.py
自定义目录和端口:
python server.py --dir /path/to/your/files --port 8080
启用密码保护:
python server.py --password your_password
启用调试模式:
python server.py --debug