package main import ( "database/sql" "encoding/json" "fmt" "log" "os" "time" _ "github.com/go-sql-driver/mysql" ) // Customer 客户信息结构 type Customer struct { ID string `json:"id"` CreatedAt string `json:"createdAt"` CustomerName string `json:"customerName"` IntendedProduct string `json:"intendedProduct"` Version string `json:"version"` Description string `json:"description"` Solution string `json:"solution"` Type string `json:"type"` Module string `json:"module"` StatusProgress string `json:"statusProgress"` Reporter string `json:"reporter"` } // Followup 客户跟进结构 type Followup struct { ID string `json:"id"` CreatedAt string `json:"createdAt"` CustomerName string `json:"customerName"` DealStatus string `json:"dealStatus"` CustomerLevel string `json:"customerLevel"` Industry string `json:"industry"` FollowUpTime string `json:"followUpTime"` NotificationSent bool `json:"notificationSent"` } // TrialPeriod 试用期结构 type TrialPeriod struct { ID string `json:"id"` CustomerName string `json:"customerName"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` IsTrial bool `json:"isTrial"` CreatedAt string `json:"createdAt"` } // 数据库配置 type DBConfig struct { Host string Port int User string Password string Database string } func main() { // 默认数据库配置 config := DBConfig{ Host: "localhost", Port: 3306, User: "root", Password: "", // 请修改为实际密码 Database: "crm_db", } // 从环境变量读取配置 if host := os.Getenv("DB_HOST"); host != "" { config.Host = host } if user := os.Getenv("DB_USER"); user != "" { config.User = user } if pwd := os.Getenv("DB_PASSWORD"); pwd != "" { config.Password = pwd } if db := os.Getenv("DB_NAME"); db != "" { config.Database = db } // JSON 文件路径 dataDir := "./data" if len(os.Args) > 1 { dataDir = os.Args[1] } // 连接数据库 dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database) db, err := sql.Open("mysql", dsn) if err != nil { log.Fatalf("连接数据库失败: %v", err) } defer db.Close() // 测试连接 if err := db.Ping(); err != nil { log.Fatalf("数据库连接测试失败: %v", err) } log.Println("✅ 数据库连接成功") // 创建表 if err := createTables(db); err != nil { log.Fatalf("创建表失败: %v", err) } log.Println("✅ 数据表创建成功") // 迁移客户数据 customersFile := fmt.Sprintf("%s/customers.json", dataDir) if err := migrateCustomers(db, customersFile); err != nil { log.Printf("⚠️ 迁移客户数据失败: %v", err) } else { log.Println("✅ 客户数据迁移成功") } // 迁移跟进数据 followupsFile := fmt.Sprintf("%s/followups.json", dataDir) if err := migrateFollowups(db, followupsFile); err != nil { log.Printf("⚠️ 迁移跟进数据失败: %v", err) } else { log.Println("✅ 跟进数据迁移成功") } // 迁移试用期数据 trialsFile := fmt.Sprintf("%s/trial_periods.json", dataDir) if err := migrateTrialPeriods(db, trialsFile); err != nil { log.Printf("⚠️ 迁移试用期数据失败: %v", err) } else { log.Println("✅ 试用期数据迁移成功") } log.Println("🎉 数据迁移完成!") } func createTables(db *sql.DB) error { queries := []string{ `CREATE TABLE IF NOT EXISTS customers ( id VARCHAR(64) NOT NULL PRIMARY KEY, created_at DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, intended_product VARCHAR(100), version VARCHAR(50), description TEXT, solution TEXT, type VARCHAR(50), module VARCHAR(100), status_progress VARCHAR(50) DEFAULT '进行中', reporter VARCHAR(100), INDEX idx_customer_name (customer_name), INDEX idx_created_at (created_at), INDEX idx_status_progress (status_progress) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci`, `CREATE TABLE IF NOT EXISTS followups ( id VARCHAR(64) NOT NULL PRIMARY KEY, created_at DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, deal_status VARCHAR(50) DEFAULT '未成交', customer_level VARCHAR(10) DEFAULT 'C', industry VARCHAR(100), follow_up_time DATETIME, notification_sent TINYINT(1) DEFAULT 0, INDEX idx_customer_name (customer_name), INDEX idx_created_at (created_at), INDEX idx_follow_up_time (follow_up_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci`, `CREATE TABLE IF NOT EXISTS trial_periods ( id VARCHAR(64) NOT NULL PRIMARY KEY, customer_name VARCHAR(255) NOT NULL, start_time DATETIME NOT NULL, end_time DATETIME NOT NULL, is_trial TINYINT(1) DEFAULT 1, created_at DATETIME NOT NULL, INDEX idx_customer_name (customer_name), INDEX idx_end_time (end_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci`, } for _, query := range queries { if _, err := db.Exec(query); err != nil { return fmt.Errorf("执行SQL失败: %v\nSQL: %s", err, query) } } return nil } func parseTime(timeStr string) (time.Time, error) { // 尝试多种时间格式 formats := []string{ time.RFC3339, time.RFC3339Nano, "2006-01-02T15:04:05.999999999Z07:00", "2006-01-02T15:04:05Z", "2006-01-02 15:04:05", "2006-01-02", } for _, format := range formats { if t, err := time.Parse(format, timeStr); err == nil { return t, nil } } return time.Time{}, fmt.Errorf("无法解析时间: %s", timeStr) } func migrateCustomers(db *sql.DB, filepath string) error { data, err := os.ReadFile(filepath) if err != nil { return fmt.Errorf("读取文件失败: %v", err) } var customers []Customer if err := json.Unmarshal(data, &customers); err != nil { return fmt.Errorf("解析JSON失败: %v", err) } stmt, err := db.Prepare(` INSERT INTO customers (id, created_at, customer_name, intended_product, version, description, solution, type, module, status_progress, reporter) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE customer_name = VALUES(customer_name), intended_product = VALUES(intended_product), version = VALUES(version), description = VALUES(description), solution = VALUES(solution), type = VALUES(type), module = VALUES(module), status_progress = VALUES(status_progress), reporter = VALUES(reporter) `) if err != nil { return fmt.Errorf("准备SQL语句失败: %v", err) } defer stmt.Close() successCount := 0 for _, c := range customers { createdAt, _ := parseTime(c.CreatedAt) _, err := stmt.Exec( c.ID, createdAt, c.CustomerName, c.IntendedProduct, c.Version, c.Description, c.Solution, c.Type, c.Module, c.StatusProgress, c.Reporter, ) if err != nil { log.Printf("插入客户记录失败 [%s]: %v", c.CustomerName, err) continue } successCount++ } log.Printf(" 已迁移 %d/%d 条客户记录", successCount, len(customers)) return nil } func migrateFollowups(db *sql.DB, filepath string) error { data, err := os.ReadFile(filepath) if err != nil { return fmt.Errorf("读取文件失败: %v", err) } var followups []Followup if err := json.Unmarshal(data, &followups); err != nil { return fmt.Errorf("解析JSON失败: %v", err) } stmt, err := db.Prepare(` INSERT INTO followups (id, created_at, customer_name, deal_status, customer_level, industry, follow_up_time, notification_sent) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE customer_name = VALUES(customer_name), deal_status = VALUES(deal_status), customer_level = VALUES(customer_level), industry = VALUES(industry), follow_up_time = VALUES(follow_up_time), notification_sent = VALUES(notification_sent) `) if err != nil { return fmt.Errorf("准备SQL语句失败: %v", err) } defer stmt.Close() successCount := 0 for _, f := range followups { createdAt, _ := parseTime(f.CreatedAt) followUpTime, _ := parseTime(f.FollowUpTime) notificationSent := 0 if f.NotificationSent { notificationSent = 1 } _, err := stmt.Exec( f.ID, createdAt, f.CustomerName, f.DealStatus, f.CustomerLevel, f.Industry, followUpTime, notificationSent, ) if err != nil { log.Printf("插入跟进记录失败 [%s]: %v", f.CustomerName, err) continue } successCount++ } log.Printf(" 已迁移 %d/%d 条跟进记录", successCount, len(followups)) return nil } func migrateTrialPeriods(db *sql.DB, filepath string) error { data, err := os.ReadFile(filepath) if err != nil { return fmt.Errorf("读取文件失败: %v", err) } var trials []TrialPeriod if err := json.Unmarshal(data, &trials); err != nil { return fmt.Errorf("解析JSON失败: %v", err) } stmt, err := db.Prepare(` INSERT INTO trial_periods (id, customer_name, start_time, end_time, is_trial, created_at) VALUES (?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE customer_name = VALUES(customer_name), start_time = VALUES(start_time), end_time = VALUES(end_time), is_trial = VALUES(is_trial) `) if err != nil { return fmt.Errorf("准备SQL语句失败: %v", err) } defer stmt.Close() successCount := 0 for _, t := range trials { startTime, _ := parseTime(t.StartTime) endTime, _ := parseTime(t.EndTime) createdAt, _ := parseTime(t.CreatedAt) isTrial := 0 if t.IsTrial { isTrial = 1 } _, err := stmt.Exec( t.ID, t.CustomerName, startTime, endTime, isTrial, createdAt, ) if err != nil { log.Printf("插入试用期记录失败 [%s]: %v", t.CustomerName, err) continue } successCount++ } log.Printf(" 已迁移 %d/%d 条试用期记录", successCount, len(trials)) return nil } func init() { // 设置日志格式 log.SetFlags(log.Ltime) // 打印使用说明 if len(os.Args) > 1 && (os.Args[1] == "-h" || os.Args[1] == "--help") { fmt.Println(`CRM JSON 到 MySQL 数据迁移工具 用法: go run migrate_to_mysql.go [data目录路径] 环境变量: DB_HOST - 数据库主机 (默认: localhost) DB_USER - 数据库用户 (默认: root) DB_PASSWORD - 数据库密码 (默认: 空) DB_NAME - 数据库名称 (默认: crm_db) 示例: # 使用默认配置 go run migrate_to_mysql.go ./data # 使用环境变量配置 DB_HOST=127.0.0.1 DB_USER=root DB_PASSWORD=123456 go run migrate_to_mysql.go ./data `) os.Exit(0) } }