项目结构
excel_merger/
├── main.py # 主程序入口
├── excel_merger.py # 核心合并逻辑
├── ui_main_window.py # 主窗口UI
├── requirements.txt # 依赖包
└── README.md # 说明文档
1. requirements.txt
pandas>=1.3.0
openpyxl>=3.0.0
xlrd>=2.0.0
PyQt5>=5.15.0
PyQt5-sip>=12.9.0
2. 主窗口UI设计 (ui_main_window.py)
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
import sys
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 600)
# 中心部件
self.centralwidget = QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
# 主布局
self.main_layout = QVBoxLayout(self.centralwidget)
# 文件选择区域
self.file_frame = QGroupBox("文件选择")
self.file_layout = QVBoxLayout()
# 按钮区域
self.button_layout = QHBoxLayout()
self.btn_add_files = QPushButton("添加文件")
self.btn_add_folder = QPushButton("添加文件夹")
self.btn_clear = QPushButton("清空列表")
self.button_layout.addWidget(self.btn_add_files)
self.button_layout.addWidget(self.btn_add_folder)
self.button_layout.addWidget(self.btn_clear)
# 文件列表
self.file_list = QListWidget()
self.file_list.setSelectionMode(QListWidget.ExtendedSelection)
self.file_layout.addLayout(self.button_layout)
self.file_layout.addWidget(self.file_list)
self.file_frame.setLayout(self.file_layout)
# 合并设置区域
self.settings_frame = QGroupBox("合并设置")
self.settings_layout = QGridLayout()
# 合并方式
self.lbl_merge_method = QLabel("合并方式:")
self.cmb_merge_method = QComboBox()
self.cmb_merge_method.addItems(["垂直合并(追加行)", "水平合并(追加列)", "按关键列合并"])
# 处理表头
self.lbl_header = QLabel("表头处理:")
self.cmb_header = QComboBox()
self.cmb_header.addItems(["使用第一个文件的表头", "重新生成表头", "无表头"])
# 关键列设置
self.lbl_key_column = QLabel("关键列:")
self.txt_key_column = QLineEdit()
self.txt_key_column.setPlaceholderText("请输入列名,多个用逗号分隔")
self.lbl_key_column.setVisible(False)
self.txt_key_column.setVisible(False)
# 合并模式
self.lbl_merge_mode = QLabel("合并模式:")
self.cmb_merge_mode = QComboBox()
self.cmb_merge_mode.addItems(["inner", "outer", "left", "right"])
self.lbl_merge_mode.setVisible(False)
self.cmb_merge_mode.setVisible(False)
self.settings_layout.addWidget(self.lbl_merge_method, 0, 0)
self.settings_layout.addWidget(self.cmb_merge_method, 0, 1)
self.settings_layout.addWidget(self.lbl_header, 1, 0)
self.settings_layout.addWidget(self.cmb_header, 1, 1)
self.settings_layout.addWidget(self.lbl_key_column, 2, 0)
self.settings_layout.addWidget(self.txt_key_column, 2, 1)
self.settings_layout.addWidget(self.lbl_merge_mode, 3, 0)
self.settings_layout.addWidget(self.cmb_merge_mode, 3, 1)
self.settings_frame.setLayout(self.settings_layout)
# 输出设置区域
self.output_frame = QGroupBox("输出设置")
self.output_layout = QGridLayout()
self.lbl_output_path = QLabel("输出路径:")
self.txt_output_path = QLineEdit()
self.btn_browse = QPushButton("浏览...")
self.lbl_output_name = QLabel("输出文件名:")
self.txt_output_name = QLineEdit()
self.txt_output_name.setText("merged_result.xlsx")
self.lbl_sheet_name = QLabel("工作表名称:")
self.txt_sheet_name = QLineEdit()
self.txt_sheet_name.setText("Sheet1")
self.output_layout.addWidget(self.lbl_output_path, 0, 0)
self.output_layout.addWidget(self.txt_output_path, 0, 1)
self.output_layout.addWidget(self.btn_browse, 0, 2)
self.output_layout.addWidget(self.lbl_output_name, 1, 0)
self.output_layout.addWidget(self.txt_output_name, 1, 1)
self.output_layout.addWidget(self.lbl_sheet_name, 2, 0)
self.output_layout.addWidget(self.txt_sheet_name, 2, 1)
self.output_frame.setLayout(self.output_layout)
# 进度条
self.progress_bar = QProgressBar()
# 合并按钮
self.btn_merge = QPushButton("开始合并")
self.btn_merge.setStyleSheet("QPushButton {background-color: #4CAF50; color: white; padding: 10px; font-weight: bold;}")
# 日志区域
self.log_frame = QGroupBox("运行日志")
self.log_layout = QVBoxLayout()
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
self.log_layout.addWidget(self.log_text)
self.log_frame.setLayout(self.log_layout)
# 添加到主布局
self.main_layout.addWidget(self.file_frame)
self.main_layout.addWidget(self.settings_frame)
self.main_layout.addWidget(self.output_frame)
self.main_layout.addWidget(self.progress_bar)
self.main_layout.addWidget(self.btn_merge)
self.main_layout.addWidget(self.log_frame)
MainWindow.setCentralWidget(self.centralwidget)
# 状态栏
self.statusbar = QStatusBar(MainWindow)
MainWindow.setStatusBar(self.statusbar)
# 连接信号
self.cmb_merge_method.currentTextChanged.connect(self.on_merge_method_changed)
self.retranslateUi(MainWindow)
QMetaObject.connectSlotsByName(MainWindow)
def on_merge_method_changed(self, text):
"""合并方式改变时显示/隐藏相关控件"""
is_key_merge = "关键列" in text
self.lbl_key_column.setVisible(is_key_merge)
self.txt_key_column.setVisible(is_key_merge)
self.lbl_merge_mode.setVisible(is_key_merge)
self.cmb_merge_mode.setVisible(is_key_merge)
def retranslateUi(self, MainWindow):
_translate = QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "Excel合并工具"))
self.file_frame.setTitle(_translate("MainWindow", "文件选择"))
self.settings_frame.setTitle(_translate("MainWindow", "合并设置"))
self.output_frame.setTitle(_translate("MainWindow", "输出设置"))
self.log_frame.setTitle(_translate("MainWindow", "运行日志"))
3. Excel合并逻辑 (excel_merger.py)
import pandas as pd
import os
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
import traceback
class ExcelMerger:
def __init__(self):
self.log_callback = None
self.progress_callback = None
def set_callbacks(self, log_callback, progress_callback):
"""设置回调函数"""
self.log_callback = log_callback
self.progress_callback = progress_callback
def log(self, message):
"""记录日志"""
if self.log_callback:
self.log_callback(message)
def update_progress(self, value):
"""更新进度"""
if self.progress_callback:
self.progress_callback(value)
def merge_excel_files(self, file_paths, settings):
"""
合并Excel文件
参数:
file_paths: 文件路径列表
settings: 合并设置字典
"""
try:
self.log("开始合并Excel文件...")
if not file_paths:
raise ValueError("请选择要合并的文件")
# 读取所有文件
dataframes = []
total_files = len(file_paths)
for i, file_path in enumerate(file_paths):
self.log(f"正在读取文件: {os.path.basename(file_path)}")
try:
# 根据文件扩展名选择合适的读取方式
if file_path.endswith('.xlsx'):
df = pd.read_excel(file_path, header=None if settings['header'] == 2 else 0)
elif file_path.endswith('.xls'):
df = pd.read_excel(file_path, engine='xlrd',
header=None if settings['header'] == 2 else 0)
elif file_path.endswith('.csv'):
df = pd.read_csv(file_path,
header=None if settings['header'] == 2 else 0)
else:
self.log(f"不支持的文件格式: {file_path}")
continue
dataframes.append(df)
self.log(f"成功读取文件: {os.path.basename(file_path)},共 {len(df)} 行,{len(df.columns)} 列")
except Exception as e:
self.log(f"读取文件 {file_path} 时出错: {str(e)}")
continue
# 更新进度
progress = int((i + 1) / total_files * 50)
self.update_progress(progress)
if not dataframes:
raise ValueError("没有成功读取任何文件")
self.log(f"成功读取 {len(dataframes)} 个文件")
# 根据合并方式进行合并
merge_method = settings['merge_method']
result_df = None
if merge_method == 0: # 垂直合并
result_df = self.vertical_merge(dataframes, settings)
elif merge_method == 1: # 水平合并
result_df = self.horizontal_merge(dataframes, settings)
elif merge_method == 2: # 按关键列合并
result_df = self.key_merge(dataframes, settings)
# 保存结果
output_path = self.save_result(result_df, settings)
self.log("合并完成!")
self.log(f"结果已保存到: {output_path}")
self.update_progress(100)
return True, output_path
except Exception as e:
error_msg = f"合并过程中出错: {str(e)}\n{traceback.format_exc()}"
self.log(error_msg)
return False, str(e)
def vertical_merge(self, dataframes, settings):
"""垂直合并(追加行)"""
self.log("开始垂直合并...")
if settings['header'] == 0: # 使用第一个文件的表头
# 保持第一个文件的表头,其他文件忽略表头
result_df = dataframes[0]
for df in dataframes[1:]:
# 如果其他文件有表头,跳过第一行
if settings['header'] != 2: # 如果不是无表头模式
df = df.iloc[1:] if len(df) > 0 else df
result_df = pd.concat([result_df, df], ignore_index=True)
elif settings['header'] == 1: # 重新生成表头
# 全部作为数据处理,生成新的表头
for i, df in enumerate(dataframes):
# 跳过原有的表头行
if i > 0 and settings['header'] != 2:
df = df.iloc[1:] if len(df) > 0 else df
# 重命名列
df.columns = [f'Column_{j+1}' for j in range(len(df.columns))]
if i == 0:
result_df = df
else:
result_df = pd.concat([result_df, df], ignore_index=True)
else: # 无表头
# 直接合并所有数据
result_df = pd.concat(dataframes, ignore_index=True)
self.log(f"垂直合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
return result_df
def horizontal_merge(self, dataframes, settings):
"""水平合并(追加列)"""
self.log("开始水平合并...")
result_df = dataframes[0]
for i, df in enumerate(dataframes[1:], 1):
# 如果列名重复,添加后缀
duplicate_cols = set(result_df.columns) & set(df.columns)
if duplicate_cols and i > 0:
suffix = f"_file{i}"
df = df.add_suffix(suffix)
# 重置索引以确保对齐
result_df = result_df.reset_index(drop=True)
df = df.reset_index(drop=True)
# 水平合并
result_df = pd.concat([result_df, df], axis=1)
self.log(f"水平合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
return result_df
def key_merge(self, dataframes, settings):
"""按关键列合并"""
self.log("开始按关键列合并...")
key_columns = [col.strip() for col in settings['key_columns'].split(',')]
merge_mode = settings['merge_mode']
result_df = dataframes[0]
for i, df in enumerate(dataframes[1:], 1):
# 查找共同的关键列
common_keys = set(key_columns) & set(df.columns)
if not common_keys:
self.log(f"警告:第 {i+1} 个文件没有找到关键列 {key_columns}")
continue
# 进行合并
result_df = pd.merge(
result_df,
df,
how=merge_mode,
on=list(common_keys),
suffixes=('', f'_file{i}')
)
self.log(f"按关键列合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
return result_df
def save_result(self, df, settings):
"""保存合并结果"""
output_dir = settings['output_dir']
output_name = settings['output_name']
sheet_name = settings['sheet_name']
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 构建完整路径
output_path = os.path.join(output_dir, output_name)
# 确保文件名以.xlsx结尾
if not output_path.endswith('.xlsx'):
output_path += '.xlsx'
# 处理重名文件
counter = 1
original_path = output_path
while os.path.exists(output_path):
name, ext = os.path.splitext(original_path)
output_path = f"{name}_{counter}{ext}"
counter += 1
# 保存到Excel
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
return output_path
4. 主程序 (main.py)
import sys
import os
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from ui_main_window import Ui_MainWindow
from excel_merger import ExcelMerger
class ExcelMergerApp(QMainWindow):
def __init__(self):
super().__init__()
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
# 初始化合并器
self.merger = ExcelMerger()
self.merger.set_callbacks(self.log_message, self.update_progress)
# 文件列表
self.file_paths = []
# 连接信号槽
self.connect_signals()
# 设置窗口图标
self.setWindowIcon(QIcon.fromTheme("document-merge"))
def connect_signals(self):
"""连接所有信号槽"""
self.ui.btn_add_files.clicked.connect(self.add_files)
self.ui.btn_add_folder.clicked.connect(self.add_folder)
self.ui.btn_clear.clicked.connect(self.clear_files)
self.ui.btn_browse.clicked.connect(self.browse_output)
self.ui.btn_merge.clicked.connect(self.merge_files)
# 拖放支持
self.setAcceptDrops(True)
self.ui.file_list.setAcceptDrops(True)
def dragEnterEvent(self, event):
"""拖拽进入事件"""
if event.mimeData().hasUrls():
event.accept()
else:
event.ignore()
def dropEvent(self, event):
"""拖放事件"""
urls = event.mimeData().urls()
for url in urls:
file_path = url.toLocalFile()
if os.path.isfile(file_path) and self.is_excel_file(file_path):
self.add_file_path(file_path)
def is_excel_file(self, file_path):
"""检查是否为Excel文件"""
ext = os.path.splitext(file_path)[1].lower()
return ext in ['.xlsx', '.xls', '.csv']
def add_files(self):
"""添加文件"""
files, _ = QFileDialog.getOpenFileNames(
self,
"选择Excel文件",
"",
"Excel文件 (*.xlsx *.xls);;CSV文件 (*.csv);;所有文件 (*.*)"
)
for file_path in files:
self.add_file_path(file_path)
def add_folder(self):
"""添加文件夹中的所有Excel文件"""
folder = QFileDialog.getExistingDirectory(self, "选择文件夹")
if folder:
for root, dirs, files in os.walk(folder):
for file in files:
if self.is_excel_file(file):
file_path = os.path.join(root, file)
self.add_file_path(file_path)
def add_file_path(self, file_path):
"""添加文件路径到列表"""
if file_path not in self.file_paths:
self.file_paths.append(file_path)
item = QListWidgetItem(file_path)
self.ui.file_list.addItem(item)
self.log_message(f"添加文件: {os.path.basename(file_path)}")
def clear_files(self):
"""清空文件列表"""
self.file_paths.clear()
self.ui.file_list.clear()
self.log_message("已清空文件列表")
def browse_output(self):
"""浏览输出路径"""
output_dir = QFileDialog.getExistingDirectory(self, "选择输出目录")
if output_dir:
self.ui.txt_output_path.setText(output_dir)
def get_settings(self):
"""获取合并设置"""
settings = {
'merge_method': self.ui.cmb_merge_method.currentIndex(),
'header': self.ui.cmb_header.currentIndex(),
'key_columns': self.ui.txt_key_column.text(),
'merge_mode': self.ui.cmb_merge_mode.currentText(),
'output_dir': self.ui.txt_output_path.text() or os.getcwd(),
'output_name': self.ui.txt_output_name.text(),
'sheet_name': self.ui.txt_sheet_name.text()
}
return settings
def merge_files(self):
"""开始合并文件"""
if not self.file_paths:
QMessageBox.warning(self, "警告", "请先添加要合并的文件")
return
# 验证设置
settings = self.get_settings()
if settings['merge_method'] == 2 and not settings['key_columns'].strip():
QMessageBox.warning(self, "警告", "按关键列合并时,请指定关键列")
return
# 禁用按钮,防止重复点击
self.ui.btn_merge.setEnabled(False)
self.ui.btn_merge.setText("合并中...")
# 使用线程进行合并操作
self.merge_thread = MergeThread(self.file_paths, settings, self.merger)
self.merge_thread.finished.connect(self.on_merge_finished)
self.merge_thread.start()
def on_merge_finished(self, success, message):
"""合并完成后的处理"""
self.ui.btn_merge.setEnabled(True)
self.ui.btn_merge.setText("开始合并")
if success:
QMessageBox.information(self, "成功", f"合并完成!\n文件已保存到:\n{message}")
# 打开输出目录
output_dir = os.path.dirname(message)
QDesktopServices.openUrl(QUrl.fromLocalFile(output_dir))
else:
QMessageBox.critical(self, "错误", f"合并失败:\n{message}")
def log_message(self, message):
"""记录日志信息"""
self.ui.log_text.append(f"[{QDateTime.currentDateTime().toString('HH:mm:ss')}] {message}")
# 自动滚动到底部
self.ui.log_text.verticalScrollBar().setValue(
self.ui.log_text.verticalScrollBar().maximum()
)
def update_progress(self, value):
"""更新进度条"""
self.ui.progress_bar.setValue(value)
QApplication.processEvents() # 更新UI
class MergeThread(QThread):
"""合并线程"""
finished = pyqtSignal(bool, str)
def __init__(self, file_paths, settings, merger):
super().__init__()
self.file_paths = file_paths
self.settings = settings
self.merger = merger
def run(self):
success, message = self.merger.merge_excel_files(self.file_paths, self.settings)
self.finished.emit(success, message)
def main():
app = QApplication(sys.argv)
app.setStyle('Fusion') # 使用Fusion风格
# 设置应用样式
app.setStyleSheet("""
QMainWindow {
background-color: #f0f0f0;
}
QGroupBox {
font-weight: bold;
border: 2px solid #cccccc;
border-radius: 5px;
margin-top: 10px;
padding-top: 10px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 5px 0 5px;
}
QPushButton {
padding: 5px 15px;
border-radius: 3px;
border: 1px solid #cccccc;
}
QPushButton:hover {
background-color: #e0e0e0;
}
QListWidget {
border: 1px solid #cccccc;
border-radius: 3px;
background-color: white;
}
QTextEdit {
border: 1px solid #cccccc;
border-radius: 3px;
background-color: white;
font-family: 'Consolas', 'Monaco', monospace;
}
""")
window = ExcelMergerApp()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
5. 安装和使用说明
安装步骤:
创建虚拟环境(可选):
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows
安装依赖:
pip install -r requirements.txt
运行程序:
python main.py
功能特性:
多种合并方式:
- 垂直合并(追加行)
- 水平合并(追加列)
- 按关键列合并
表头处理选项:
文件支持:
- 支持.xlsx、.xls、.csv格式
- 支持拖放添加文件
- 支持批量添加文件夹
用户界面:
错误处理:
使用示例:
点击"添加文件"或"添加文件夹"选择要合并的Excel文件
选择合并方式(垂直、水平或按关键列)
设置表头处理方式
选择输出路径和文件名
点击"开始合并"按钮
查看运行日志和进度
这个工具提供了完整的Excel文件合并解决方案,具有友好的图形界面和强大的合并功能。你可以根据需要进一步扩展功能,比如添加更多文件格式支持、更复杂的合并逻辑等。