泰州市转运信息网

基于Python+PyQt5+Pandas编写的带图形界面的Excel合并工具

2026-03-31 20:12:01 浏览次数:1
详细信息

项目结构

excel_merger/
├── main.py              # 主程序入口
├── excel_merger.py      # 核心合并逻辑
├── ui_main_window.py    # 主窗口UI
├── requirements.txt     # 依赖包
└── README.md           # 说明文档

1. requirements.txt

pandas>=1.3.0
openpyxl>=3.0.0
xlrd>=2.0.0
PyQt5>=5.15.0
PyQt5-sip>=12.9.0

2. 主窗口UI设计 (ui_main_window.py)

from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
import sys

class Ui_MainWindow(object):
    def setupUi(self, MainWindow):
        MainWindow.setObjectName("MainWindow")
        MainWindow.resize(800, 600)

        # 中心部件
        self.centralwidget = QWidget(MainWindow)
        self.centralwidget.setObjectName("centralwidget")

        # 主布局
        self.main_layout = QVBoxLayout(self.centralwidget)

        # 文件选择区域
        self.file_frame = QGroupBox("文件选择")
        self.file_layout = QVBoxLayout()

        # 按钮区域
        self.button_layout = QHBoxLayout()
        self.btn_add_files = QPushButton("添加文件")
        self.btn_add_folder = QPushButton("添加文件夹")
        self.btn_clear = QPushButton("清空列表")
        self.button_layout.addWidget(self.btn_add_files)
        self.button_layout.addWidget(self.btn_add_folder)
        self.button_layout.addWidget(self.btn_clear)

        # 文件列表
        self.file_list = QListWidget()
        self.file_list.setSelectionMode(QListWidget.ExtendedSelection)

        self.file_layout.addLayout(self.button_layout)
        self.file_layout.addWidget(self.file_list)
        self.file_frame.setLayout(self.file_layout)

        # 合并设置区域
        self.settings_frame = QGroupBox("合并设置")
        self.settings_layout = QGridLayout()

        # 合并方式
        self.lbl_merge_method = QLabel("合并方式:")
        self.cmb_merge_method = QComboBox()
        self.cmb_merge_method.addItems(["垂直合并(追加行)", "水平合并(追加列)", "按关键列合并"])

        # 处理表头
        self.lbl_header = QLabel("表头处理:")
        self.cmb_header = QComboBox()
        self.cmb_header.addItems(["使用第一个文件的表头", "重新生成表头", "无表头"])

        # 关键列设置
        self.lbl_key_column = QLabel("关键列:")
        self.txt_key_column = QLineEdit()
        self.txt_key_column.setPlaceholderText("请输入列名,多个用逗号分隔")
        self.lbl_key_column.setVisible(False)
        self.txt_key_column.setVisible(False)

        # 合并模式
        self.lbl_merge_mode = QLabel("合并模式:")
        self.cmb_merge_mode = QComboBox()
        self.cmb_merge_mode.addItems(["inner", "outer", "left", "right"])
        self.lbl_merge_mode.setVisible(False)
        self.cmb_merge_mode.setVisible(False)

        self.settings_layout.addWidget(self.lbl_merge_method, 0, 0)
        self.settings_layout.addWidget(self.cmb_merge_method, 0, 1)
        self.settings_layout.addWidget(self.lbl_header, 1, 0)
        self.settings_layout.addWidget(self.cmb_header, 1, 1)
        self.settings_layout.addWidget(self.lbl_key_column, 2, 0)
        self.settings_layout.addWidget(self.txt_key_column, 2, 1)
        self.settings_layout.addWidget(self.lbl_merge_mode, 3, 0)
        self.settings_layout.addWidget(self.cmb_merge_mode, 3, 1)

        self.settings_frame.setLayout(self.settings_layout)

        # 输出设置区域
        self.output_frame = QGroupBox("输出设置")
        self.output_layout = QGridLayout()

        self.lbl_output_path = QLabel("输出路径:")
        self.txt_output_path = QLineEdit()
        self.btn_browse = QPushButton("浏览...")

        self.lbl_output_name = QLabel("输出文件名:")
        self.txt_output_name = QLineEdit()
        self.txt_output_name.setText("merged_result.xlsx")

        self.lbl_sheet_name = QLabel("工作表名称:")
        self.txt_sheet_name = QLineEdit()
        self.txt_sheet_name.setText("Sheet1")

        self.output_layout.addWidget(self.lbl_output_path, 0, 0)
        self.output_layout.addWidget(self.txt_output_path, 0, 1)
        self.output_layout.addWidget(self.btn_browse, 0, 2)
        self.output_layout.addWidget(self.lbl_output_name, 1, 0)
        self.output_layout.addWidget(self.txt_output_name, 1, 1)
        self.output_layout.addWidget(self.lbl_sheet_name, 2, 0)
        self.output_layout.addWidget(self.txt_sheet_name, 2, 1)

        self.output_frame.setLayout(self.output_layout)

        # 进度条
        self.progress_bar = QProgressBar()

        # 合并按钮
        self.btn_merge = QPushButton("开始合并")
        self.btn_merge.setStyleSheet("QPushButton {background-color: #4CAF50; color: white; padding: 10px; font-weight: bold;}")

        # 日志区域
        self.log_frame = QGroupBox("运行日志")
        self.log_layout = QVBoxLayout()
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        self.log_layout.addWidget(self.log_text)
        self.log_frame.setLayout(self.log_layout)

        # 添加到主布局
        self.main_layout.addWidget(self.file_frame)
        self.main_layout.addWidget(self.settings_frame)
        self.main_layout.addWidget(self.output_frame)
        self.main_layout.addWidget(self.progress_bar)
        self.main_layout.addWidget(self.btn_merge)
        self.main_layout.addWidget(self.log_frame)

        MainWindow.setCentralWidget(self.centralwidget)

        # 状态栏
        self.statusbar = QStatusBar(MainWindow)
        MainWindow.setStatusBar(self.statusbar)

        # 连接信号
        self.cmb_merge_method.currentTextChanged.connect(self.on_merge_method_changed)

        self.retranslateUi(MainWindow)
        QMetaObject.connectSlotsByName(MainWindow)

    def on_merge_method_changed(self, text):
        """合并方式改变时显示/隐藏相关控件"""
        is_key_merge = "关键列" in text
        self.lbl_key_column.setVisible(is_key_merge)
        self.txt_key_column.setVisible(is_key_merge)
        self.lbl_merge_mode.setVisible(is_key_merge)
        self.cmb_merge_mode.setVisible(is_key_merge)

    def retranslateUi(self, MainWindow):
        _translate = QCoreApplication.translate
        MainWindow.setWindowTitle(_translate("MainWindow", "Excel合并工具"))
        self.file_frame.setTitle(_translate("MainWindow", "文件选择"))
        self.settings_frame.setTitle(_translate("MainWindow", "合并设置"))
        self.output_frame.setTitle(_translate("MainWindow", "输出设置"))
        self.log_frame.setTitle(_translate("MainWindow", "运行日志"))

3. Excel合并逻辑 (excel_merger.py)

import pandas as pd
import os
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
import traceback

class ExcelMerger:
    def __init__(self):
        self.log_callback = None
        self.progress_callback = None

    def set_callbacks(self, log_callback, progress_callback):
        """设置回调函数"""
        self.log_callback = log_callback
        self.progress_callback = progress_callback

    def log(self, message):
        """记录日志"""
        if self.log_callback:
            self.log_callback(message)

    def update_progress(self, value):
        """更新进度"""
        if self.progress_callback:
            self.progress_callback(value)

    def merge_excel_files(self, file_paths, settings):
        """
        合并Excel文件

        参数:
            file_paths: 文件路径列表
            settings: 合并设置字典
        """
        try:
            self.log("开始合并Excel文件...")

            if not file_paths:
                raise ValueError("请选择要合并的文件")

            # 读取所有文件
            dataframes = []
            total_files = len(file_paths)

            for i, file_path in enumerate(file_paths):
                self.log(f"正在读取文件: {os.path.basename(file_path)}")

                try:
                    # 根据文件扩展名选择合适的读取方式
                    if file_path.endswith('.xlsx'):
                        df = pd.read_excel(file_path, header=None if settings['header'] == 2 else 0)
                    elif file_path.endswith('.xls'):
                        df = pd.read_excel(file_path, engine='xlrd', 
                                          header=None if settings['header'] == 2 else 0)
                    elif file_path.endswith('.csv'):
                        df = pd.read_csv(file_path, 
                                        header=None if settings['header'] == 2 else 0)
                    else:
                        self.log(f"不支持的文件格式: {file_path}")
                        continue

                    dataframes.append(df)
                    self.log(f"成功读取文件: {os.path.basename(file_path)},共 {len(df)} 行,{len(df.columns)} 列")

                except Exception as e:
                    self.log(f"读取文件 {file_path} 时出错: {str(e)}")
                    continue

                # 更新进度
                progress = int((i + 1) / total_files * 50)
                self.update_progress(progress)

            if not dataframes:
                raise ValueError("没有成功读取任何文件")

            self.log(f"成功读取 {len(dataframes)} 个文件")

            # 根据合并方式进行合并
            merge_method = settings['merge_method']
            result_df = None

            if merge_method == 0:  # 垂直合并
                result_df = self.vertical_merge(dataframes, settings)
            elif merge_method == 1:  # 水平合并
                result_df = self.horizontal_merge(dataframes, settings)
            elif merge_method == 2:  # 按关键列合并
                result_df = self.key_merge(dataframes, settings)

            # 保存结果
            output_path = self.save_result(result_df, settings)

            self.log("合并完成!")
            self.log(f"结果已保存到: {output_path}")
            self.update_progress(100)

            return True, output_path

        except Exception as e:
            error_msg = f"合并过程中出错: {str(e)}\n{traceback.format_exc()}"
            self.log(error_msg)
            return False, str(e)

    def vertical_merge(self, dataframes, settings):
        """垂直合并(追加行)"""
        self.log("开始垂直合并...")

        if settings['header'] == 0:  # 使用第一个文件的表头
            # 保持第一个文件的表头,其他文件忽略表头
            result_df = dataframes[0]
            for df in dataframes[1:]:
                # 如果其他文件有表头,跳过第一行
                if settings['header'] != 2:  # 如果不是无表头模式
                    df = df.iloc[1:] if len(df) > 0 else df
                result_df = pd.concat([result_df, df], ignore_index=True)

        elif settings['header'] == 1:  # 重新生成表头
            # 全部作为数据处理,生成新的表头
            for i, df in enumerate(dataframes):
                # 跳过原有的表头行
                if i > 0 and settings['header'] != 2:
                    df = df.iloc[1:] if len(df) > 0 else df
                # 重命名列
                df.columns = [f'Column_{j+1}' for j in range(len(df.columns))]
                if i == 0:
                    result_df = df
                else:
                    result_df = pd.concat([result_df, df], ignore_index=True)

        else:  # 无表头
            # 直接合并所有数据
            result_df = pd.concat(dataframes, ignore_index=True)

        self.log(f"垂直合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
        return result_df

    def horizontal_merge(self, dataframes, settings):
        """水平合并(追加列)"""
        self.log("开始水平合并...")

        result_df = dataframes[0]

        for i, df in enumerate(dataframes[1:], 1):
            # 如果列名重复,添加后缀
            duplicate_cols = set(result_df.columns) & set(df.columns)
            if duplicate_cols and i > 0:
                suffix = f"_file{i}"
                df = df.add_suffix(suffix)

            # 重置索引以确保对齐
            result_df = result_df.reset_index(drop=True)
            df = df.reset_index(drop=True)

            # 水平合并
            result_df = pd.concat([result_df, df], axis=1)

        self.log(f"水平合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
        return result_df

    def key_merge(self, dataframes, settings):
        """按关键列合并"""
        self.log("开始按关键列合并...")

        key_columns = [col.strip() for col in settings['key_columns'].split(',')]
        merge_mode = settings['merge_mode']

        result_df = dataframes[0]

        for i, df in enumerate(dataframes[1:], 1):
            # 查找共同的关键列
            common_keys = set(key_columns) & set(df.columns)
            if not common_keys:
                self.log(f"警告:第 {i+1} 个文件没有找到关键列 {key_columns}")
                continue

            # 进行合并
            result_df = pd.merge(
                result_df, 
                df, 
                how=merge_mode,
                on=list(common_keys),
                suffixes=('', f'_file{i}')
            )

        self.log(f"按关键列合并完成,总行数: {len(result_df)},总列数: {len(result_df.columns)}")
        return result_df

    def save_result(self, df, settings):
        """保存合并结果"""
        output_dir = settings['output_dir']
        output_name = settings['output_name']
        sheet_name = settings['sheet_name']

        # 确保输出目录存在
        os.makedirs(output_dir, exist_ok=True)

        # 构建完整路径
        output_path = os.path.join(output_dir, output_name)

        # 确保文件名以.xlsx结尾
        if not output_path.endswith('.xlsx'):
            output_path += '.xlsx'

        # 处理重名文件
        counter = 1
        original_path = output_path
        while os.path.exists(output_path):
            name, ext = os.path.splitext(original_path)
            output_path = f"{name}_{counter}{ext}"
            counter += 1

        # 保存到Excel
        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name=sheet_name, index=False)

        return output_path

4. 主程序 (main.py)

import sys
import os
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from ui_main_window import Ui_MainWindow
from excel_merger import ExcelMerger

class ExcelMergerApp(QMainWindow):
    def __init__(self):
        super().__init__()
        self.ui = Ui_MainWindow()
        self.ui.setupUi(self)

        # 初始化合并器
        self.merger = ExcelMerger()
        self.merger.set_callbacks(self.log_message, self.update_progress)

        # 文件列表
        self.file_paths = []

        # 连接信号槽
        self.connect_signals()

        # 设置窗口图标
        self.setWindowIcon(QIcon.fromTheme("document-merge"))

    def connect_signals(self):
        """连接所有信号槽"""
        self.ui.btn_add_files.clicked.connect(self.add_files)
        self.ui.btn_add_folder.clicked.connect(self.add_folder)
        self.ui.btn_clear.clicked.connect(self.clear_files)
        self.ui.btn_browse.clicked.connect(self.browse_output)
        self.ui.btn_merge.clicked.connect(self.merge_files)

        # 拖放支持
        self.setAcceptDrops(True)
        self.ui.file_list.setAcceptDrops(True)

    def dragEnterEvent(self, event):
        """拖拽进入事件"""
        if event.mimeData().hasUrls():
            event.accept()
        else:
            event.ignore()

    def dropEvent(self, event):
        """拖放事件"""
        urls = event.mimeData().urls()
        for url in urls:
            file_path = url.toLocalFile()
            if os.path.isfile(file_path) and self.is_excel_file(file_path):
                self.add_file_path(file_path)

    def is_excel_file(self, file_path):
        """检查是否为Excel文件"""
        ext = os.path.splitext(file_path)[1].lower()
        return ext in ['.xlsx', '.xls', '.csv']

    def add_files(self):
        """添加文件"""
        files, _ = QFileDialog.getOpenFileNames(
            self,
            "选择Excel文件",
            "",
            "Excel文件 (*.xlsx *.xls);;CSV文件 (*.csv);;所有文件 (*.*)"
        )

        for file_path in files:
            self.add_file_path(file_path)

    def add_folder(self):
        """添加文件夹中的所有Excel文件"""
        folder = QFileDialog.getExistingDirectory(self, "选择文件夹")

        if folder:
            for root, dirs, files in os.walk(folder):
                for file in files:
                    if self.is_excel_file(file):
                        file_path = os.path.join(root, file)
                        self.add_file_path(file_path)

    def add_file_path(self, file_path):
        """添加文件路径到列表"""
        if file_path not in self.file_paths:
            self.file_paths.append(file_path)
            item = QListWidgetItem(file_path)
            self.ui.file_list.addItem(item)
            self.log_message(f"添加文件: {os.path.basename(file_path)}")

    def clear_files(self):
        """清空文件列表"""
        self.file_paths.clear()
        self.ui.file_list.clear()
        self.log_message("已清空文件列表")

    def browse_output(self):
        """浏览输出路径"""
        output_dir = QFileDialog.getExistingDirectory(self, "选择输出目录")
        if output_dir:
            self.ui.txt_output_path.setText(output_dir)

    def get_settings(self):
        """获取合并设置"""
        settings = {
            'merge_method': self.ui.cmb_merge_method.currentIndex(),
            'header': self.ui.cmb_header.currentIndex(),
            'key_columns': self.ui.txt_key_column.text(),
            'merge_mode': self.ui.cmb_merge_mode.currentText(),
            'output_dir': self.ui.txt_output_path.text() or os.getcwd(),
            'output_name': self.ui.txt_output_name.text(),
            'sheet_name': self.ui.txt_sheet_name.text()
        }
        return settings

    def merge_files(self):
        """开始合并文件"""
        if not self.file_paths:
            QMessageBox.warning(self, "警告", "请先添加要合并的文件")
            return

        # 验证设置
        settings = self.get_settings()
        if settings['merge_method'] == 2 and not settings['key_columns'].strip():
            QMessageBox.warning(self, "警告", "按关键列合并时,请指定关键列")
            return

        # 禁用按钮,防止重复点击
        self.ui.btn_merge.setEnabled(False)
        self.ui.btn_merge.setText("合并中...")

        # 使用线程进行合并操作
        self.merge_thread = MergeThread(self.file_paths, settings, self.merger)
        self.merge_thread.finished.connect(self.on_merge_finished)
        self.merge_thread.start()

    def on_merge_finished(self, success, message):
        """合并完成后的处理"""
        self.ui.btn_merge.setEnabled(True)
        self.ui.btn_merge.setText("开始合并")

        if success:
            QMessageBox.information(self, "成功", f"合并完成!\n文件已保存到:\n{message}")
            # 打开输出目录
            output_dir = os.path.dirname(message)
            QDesktopServices.openUrl(QUrl.fromLocalFile(output_dir))
        else:
            QMessageBox.critical(self, "错误", f"合并失败:\n{message}")

    def log_message(self, message):
        """记录日志信息"""
        self.ui.log_text.append(f"[{QDateTime.currentDateTime().toString('HH:mm:ss')}] {message}")
        # 自动滚动到底部
        self.ui.log_text.verticalScrollBar().setValue(
            self.ui.log_text.verticalScrollBar().maximum()
        )

    def update_progress(self, value):
        """更新进度条"""
        self.ui.progress_bar.setValue(value)
        QApplication.processEvents()  # 更新UI

class MergeThread(QThread):
    """合并线程"""
    finished = pyqtSignal(bool, str)

    def __init__(self, file_paths, settings, merger):
        super().__init__()
        self.file_paths = file_paths
        self.settings = settings
        self.merger = merger

    def run(self):
        success, message = self.merger.merge_excel_files(self.file_paths, self.settings)
        self.finished.emit(success, message)

def main():
    app = QApplication(sys.argv)
    app.setStyle('Fusion')  # 使用Fusion风格

    # 设置应用样式
    app.setStyleSheet("""
        QMainWindow {
            background-color: #f0f0f0;
        }
        QGroupBox {
            font-weight: bold;
            border: 2px solid #cccccc;
            border-radius: 5px;
            margin-top: 10px;
            padding-top: 10px;
        }
        QGroupBox::title {
            subcontrol-origin: margin;
            left: 10px;
            padding: 0 5px 0 5px;
        }
        QPushButton {
            padding: 5px 15px;
            border-radius: 3px;
            border: 1px solid #cccccc;
        }
        QPushButton:hover {
            background-color: #e0e0e0;
        }
        QListWidget {
            border: 1px solid #cccccc;
            border-radius: 3px;
            background-color: white;
        }
        QTextEdit {
            border: 1px solid #cccccc;
            border-radius: 3px;
            background-color: white;
            font-family: 'Consolas', 'Monaco', monospace;
        }
    """)

    window = ExcelMergerApp()
    window.show()
    sys.exit(app.exec_())

if __name__ == "__main__":
    main()

5. 安装和使用说明

安装步骤:

创建虚拟环境(可选):

python -m venv venv
source venv/bin/activate  # Linux/Mac
# 或
venv\Scripts\activate  # Windows

安装依赖:

pip install -r requirements.txt

运行程序:

python main.py

功能特性:

多种合并方式

表头处理选项

文件支持

用户界面

错误处理

使用示例:

点击"添加文件"或"添加文件夹"选择要合并的Excel文件 选择合并方式(垂直、水平或按关键列) 设置表头处理方式 选择输出路径和文件名 点击"开始合并"按钮 查看运行日志和进度

这个工具提供了完整的Excel文件合并解决方案,具有友好的图形界面和强大的合并功能。你可以根据需要进一步扩展功能,比如添加更多文件格式支持、更复杂的合并逻辑等。

相关推荐