Skip to content

读取 excel 生成sql 脚本

python
import openpyxl
import sys
from datetime import datetime, date


def detect_type(value):
    """自动判断数据类型"""
    if value is None:
        return None, "NULL"

    if isinstance(value, (datetime, date)):
        return (
            f"'{value.strftime('%Y-%m-%d %H:%M:%S') if isinstance(value, datetime) else value.isoformat()}'",
            "DATETIME",
        )

    if isinstance(value, bool):
        return str(int(value)), "BOOLEAN"

    if isinstance(value, (int, float)):
        if isinstance(value, float) and value.is_integer():
            return str(int(value)), "INTEGER"
        return str(value), "FLOAT"

    str_val = str(value).strip()

    if not str_val:
        return None, "NULL"

    try:
        int_val = int(str_val)
        return str(int_val), "INTEGER"
    except ValueError:
        pass

    try:
        float_val = float(str_val)
        return str(float_val), "FLOAT"
    except ValueError:
        pass

    if len(str_val) > 255:
        return f"'{str_val.replace(chr(39), chr(39) + chr(39))}'", "TEXT"

    return f"'{str_val.replace(chr(39), chr(39) + chr(39))}'", "VARCHAR"


def excel_to_insert_sql(excel_file, table_name, sheet_name=None):
    """读取 Excel 生成 INSERT SQL"""
    wb = openpyxl.load_workbook(excel_file)

    if sheet_name:
        ws = wb[sheet_name]
    else:
        ws = wb.active

    rows = list(ws.iter_rows(values_only=True))  # type: ignore

    if not rows:
        print("Excel 文件为空")
        return []

    columns = rows[0]
    columns = [
        str(col).strip() if col else f"column_{i}" for i, col in enumerate(columns)
    ]

    sql_statements = []
    type_hints = {}

    for row in rows[1:]:
        if all(
            cell is None or (isinstance(cell, str) and not cell.strip()) for cell in row
        ):
            continue

        values = []
        for i, cell in enumerate(row):
            sql_val, col_type = detect_type(cell)
            if i not in type_hints or type_hints[i] is None:
                type_hints[i] = col_type
            elif col_type != "NULL" and type_hints[i] == "NULL":
                type_hints[i] = col_type
            elif col_type in ("INTEGER", "FLOAT") and type_hints[i] == "VARCHAR":
                pass
            elif col_type == "FLOAT" and type_hints[i] == "INTEGER":
                type_hints[i] = "FLOAT"

            values.append(sql_val if sql_val else "NULL")

        columns_str = ", ".join(columns)
        values_str = ", ".join(values)
        sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
        sql_statements.append(sql)

    print(f"\n表名:{table_name}")
    print(f"列数:{len(columns)}")
    print(f"数据行数:{len(sql_statements)}")
    print("\n列类型推断:")
    for i, col in enumerate(columns):
        print(f"  {col}: {type_hints.get(i, 'UNKNOWN')}")

    return sql_statements


if __name__ == "__main__":
    # if len(sys.argv) < 3:
    #     print("用法:python excel_to_sql.py <excel 文件路径> <表名> [工作表名]")
    #     print("示例:python excel_to_sql.py data.xlsx users Sheet1")
    #     sys.exit(1)
    #
    excel_file = "1.xlsx"
    table_name = "sys_user"
    sheet_name = "Sheet1"

    try:
        sqls = excel_to_insert_sql(excel_file, table_name, sheet_name)

        output_file = f"{table_name}_inserts.sql"
        with open(output_file, "w", encoding="utf-8") as f:
            f.write("\n".join(sqls))

        print(f"\nSQL 文件已生成:{output_file}")
        print(f"\n前 5 条 SQL 预览:")
        for i, sql in enumerate(sqls[:5]):
            print(f"{i + 1}. {sql}")
        if len(sqls) > 5:
            print(f"... 还有 {len(sqls) - 5} 条")

    except FileNotFoundError:
        print(f"错误:文件 '{excel_file}' 不存在")
    except Exception as e:
        print(f"错误:{e}")

读取 excel 生成sql 脚本 pandas

python
import pandas as pd
import numpy as np
from datetime import datetime, date


def detect_value(value):
    """自动判断数据类型并转换为 SQL 值"""
    if pd.isna(value):
        return "NULL"

    if isinstance(value, bool):
        return str(int(value))

    if isinstance(value, (datetime, date)):
        fmt = "%Y-%m-%d %H:%M:%S" if isinstance(value, datetime) else "%Y-%m-%d"
        return f"'{value.strftime(fmt)}'"

    if isinstance(value, (int, float)):
        if isinstance(value, float) and value.is_integer():
            return str(int(value))
        return f"{value}"

    str_val = str(value).strip()
    if not str_val:
        return "NULL"

    try:
        int_val = int(str_val)
        return str(int_val)
    except ValueError:
        pass

    try:
        float_val = float(str_val)
        return str(float_val)
    except ValueError:
        pass

    # 字符串转义单引号
    escaped = str_val.replace("'", "''")
    return f"'{escaped}'"


def excel_to_sql_pandas(excel_file, table_name, sheet_name=0):
    try:
        df = pd.read_excel(excel_file, sheet_name=sheet_name)
    except Exception as e:
        raise e

    # 清理空行
    df = df.replace([np.nan], None, regex=False)
    df = df.dropna(how="all")

    columns = df.columns.tolist()
    sql_statements = []
    type_hints = {}

    for idx, row in df.iterrows():
        values = []
        for i, col in enumerate(columns):
            cell = row[col]
            sql_val = detect_value(cell)
            values.append(sql_val)

            # 推断类型
            if sql_val != "NULL":
                if sql_val.startswith("'"):
                    current_type = "TEXT"
                else:
                    try:
                        float(sql_val)
                        if "." in sql_val:
                            current_type = "FLOAT"
                        else:
                            current_type = "INTEGER"
                    except ValueError:
                        current_type = "TEXT"

                if i not in type_hints:
                    type_hints[i] = current_type
                elif current_type == "FLOAT" and type_hints.get(i) == "INTEGER":
                    type_hints[i] = "FLOAT"

        values_str = ", ".join(values)
        columns_str = ", ".join(columns)
        sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
        sql_statements.append(sql)

    print(f"表名:{table_name}")
    print(f"列数:{len(columns)}")
    print(f"数据行数:{len(sql_statements)}")
    print("\n列类型推断:")
    for i, col in enumerate(columns):
        print(f"  {col}: {type_hints.get(i, 'UNKNOWN')}")

    return sql_statements


if __name__ == "__main__":
    excel_file = "1.xlsx"
    table_name = "sys_user"
    sheet_name = "Sheet1"

    try:
        sqls = excel_to_sql_pandas(excel_file, table_name, sheet_name)

        output_file = f"{table_name}_inserts.sql"
        with open(output_file, "w", encoding="utf-8") as f:
            f.write("\n".join(sqls))

        print(f"\nSQL 文件已生成:{output_file}")
        print(f"\n前 5 条 SQL 预览:")
        for i, sql in enumerate(sqls[:5]):
            print(f"{i + 1}. {sql}")
        if len(sqls) > 5:
            print(f"... 还有 {len(sqls) - 5} 条")

    except FileNotFoundError:
        print(f"错误:文件 '{excel_file}' 不存在")
    except Exception as e:
        print(f"错误:{e}")