读取 excel 生成sql 脚本
python
import openpyxl
import sys
from datetime import datetime, date
def detect_type(value):
"""自动判断数据类型"""
if value is None:
return None, "NULL"
if isinstance(value, (datetime, date)):
return (
f"'{value.strftime('%Y-%m-%d %H:%M:%S') if isinstance(value, datetime) else value.isoformat()}'",
"DATETIME",
)
if isinstance(value, bool):
return str(int(value)), "BOOLEAN"
if isinstance(value, (int, float)):
if isinstance(value, float) and value.is_integer():
return str(int(value)), "INTEGER"
return str(value), "FLOAT"
str_val = str(value).strip()
if not str_val:
return None, "NULL"
try:
int_val = int(str_val)
return str(int_val), "INTEGER"
except ValueError:
pass
try:
float_val = float(str_val)
return str(float_val), "FLOAT"
except ValueError:
pass
if len(str_val) > 255:
return f"'{str_val.replace(chr(39), chr(39) + chr(39))}'", "TEXT"
return f"'{str_val.replace(chr(39), chr(39) + chr(39))}'", "VARCHAR"
def excel_to_insert_sql(excel_file, table_name, sheet_name=None):
"""读取 Excel 生成 INSERT SQL"""
wb = openpyxl.load_workbook(excel_file)
if sheet_name:
ws = wb[sheet_name]
else:
ws = wb.active
rows = list(ws.iter_rows(values_only=True)) # type: ignore
if not rows:
print("Excel 文件为空")
return []
columns = rows[0]
columns = [
str(col).strip() if col else f"column_{i}" for i, col in enumerate(columns)
]
sql_statements = []
type_hints = {}
for row in rows[1:]:
if all(
cell is None or (isinstance(cell, str) and not cell.strip()) for cell in row
):
continue
values = []
for i, cell in enumerate(row):
sql_val, col_type = detect_type(cell)
if i not in type_hints or type_hints[i] is None:
type_hints[i] = col_type
elif col_type != "NULL" and type_hints[i] == "NULL":
type_hints[i] = col_type
elif col_type in ("INTEGER", "FLOAT") and type_hints[i] == "VARCHAR":
pass
elif col_type == "FLOAT" and type_hints[i] == "INTEGER":
type_hints[i] = "FLOAT"
values.append(sql_val if sql_val else "NULL")
columns_str = ", ".join(columns)
values_str = ", ".join(values)
sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
sql_statements.append(sql)
print(f"\n表名:{table_name}")
print(f"列数:{len(columns)}")
print(f"数据行数:{len(sql_statements)}")
print("\n列类型推断:")
for i, col in enumerate(columns):
print(f" {col}: {type_hints.get(i, 'UNKNOWN')}")
return sql_statements
if __name__ == "__main__":
# if len(sys.argv) < 3:
# print("用法:python excel_to_sql.py <excel 文件路径> <表名> [工作表名]")
# print("示例:python excel_to_sql.py data.xlsx users Sheet1")
# sys.exit(1)
#
excel_file = "1.xlsx"
table_name = "sys_user"
sheet_name = "Sheet1"
try:
sqls = excel_to_insert_sql(excel_file, table_name, sheet_name)
output_file = f"{table_name}_inserts.sql"
with open(output_file, "w", encoding="utf-8") as f:
f.write("\n".join(sqls))
print(f"\nSQL 文件已生成:{output_file}")
print(f"\n前 5 条 SQL 预览:")
for i, sql in enumerate(sqls[:5]):
print(f"{i + 1}. {sql}")
if len(sqls) > 5:
print(f"... 还有 {len(sqls) - 5} 条")
except FileNotFoundError:
print(f"错误:文件 '{excel_file}' 不存在")
except Exception as e:
print(f"错误:{e}")读取 excel 生成sql 脚本 pandas
python
import pandas as pd
import numpy as np
from datetime import datetime, date
def detect_value(value):
"""自动判断数据类型并转换为 SQL 值"""
if pd.isna(value):
return "NULL"
if isinstance(value, bool):
return str(int(value))
if isinstance(value, (datetime, date)):
fmt = "%Y-%m-%d %H:%M:%S" if isinstance(value, datetime) else "%Y-%m-%d"
return f"'{value.strftime(fmt)}'"
if isinstance(value, (int, float)):
if isinstance(value, float) and value.is_integer():
return str(int(value))
return f"{value}"
str_val = str(value).strip()
if not str_val:
return "NULL"
try:
int_val = int(str_val)
return str(int_val)
except ValueError:
pass
try:
float_val = float(str_val)
return str(float_val)
except ValueError:
pass
# 字符串转义单引号
escaped = str_val.replace("'", "''")
return f"'{escaped}'"
def excel_to_sql_pandas(excel_file, table_name, sheet_name=0):
try:
df = pd.read_excel(excel_file, sheet_name=sheet_name)
except Exception as e:
raise e
# 清理空行
df = df.replace([np.nan], None, regex=False)
df = df.dropna(how="all")
columns = df.columns.tolist()
sql_statements = []
type_hints = {}
for idx, row in df.iterrows():
values = []
for i, col in enumerate(columns):
cell = row[col]
sql_val = detect_value(cell)
values.append(sql_val)
# 推断类型
if sql_val != "NULL":
if sql_val.startswith("'"):
current_type = "TEXT"
else:
try:
float(sql_val)
if "." in sql_val:
current_type = "FLOAT"
else:
current_type = "INTEGER"
except ValueError:
current_type = "TEXT"
if i not in type_hints:
type_hints[i] = current_type
elif current_type == "FLOAT" and type_hints.get(i) == "INTEGER":
type_hints[i] = "FLOAT"
values_str = ", ".join(values)
columns_str = ", ".join(columns)
sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
sql_statements.append(sql)
print(f"表名:{table_name}")
print(f"列数:{len(columns)}")
print(f"数据行数:{len(sql_statements)}")
print("\n列类型推断:")
for i, col in enumerate(columns):
print(f" {col}: {type_hints.get(i, 'UNKNOWN')}")
return sql_statements
if __name__ == "__main__":
excel_file = "1.xlsx"
table_name = "sys_user"
sheet_name = "Sheet1"
try:
sqls = excel_to_sql_pandas(excel_file, table_name, sheet_name)
output_file = f"{table_name}_inserts.sql"
with open(output_file, "w", encoding="utf-8") as f:
f.write("\n".join(sqls))
print(f"\nSQL 文件已生成:{output_file}")
print(f"\n前 5 条 SQL 预览:")
for i, sql in enumerate(sqls[:5]):
print(f"{i + 1}. {sql}")
if len(sqls) > 5:
print(f"... 还有 {len(sqls) - 5} 条")
except FileNotFoundError:
print(f"错误:文件 '{excel_file}' 不存在")
except Exception as e:
print(f"错误:{e}")