import csv import re import mysql.connector from datetime import datetime # 文件路径 INPUT_FILE = 'elonmusk.csv' OUTPUT_FILE = 'fixed.csv' # 数据库连接配置 DB_CONFIG = { 'host': '8.155.23.172', 'port': 3306, 'user': 'root2', 'password': 'tG0f6PVYh18le41BCb', 'database': 'elonX' } TABLE_NAME = 'elon_tweets' # 第一步:修复 CSV 文件并添加 rank_id def fix_line(lines, line_number, rank_id): full_line = ''.join(lines) match = re.search(r'^([^,]+),"(.+?)","([A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T)"$', full_line, re.DOTALL) if match: id_part = match.group(1) text_content = match.group(2) created_at = match.group(3) fixed_text = text_content.replace('"', '""') fixed_line = f'{rank_id},{id_part},"{fixed_text}","{created_at}"' return fixed_line else: print(f"Line {line_number} format error: {repr(full_line)}") return f'{rank_id},{full_line}' def process_file(input_file, output_file): with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'w', encoding='utf-8') as f_out: f_out.write("rank_id,id,text,created_at\n") buffer = [] line_number = 0 rank_id = 1 for line in f_in: line = line.rstrip('\n') if line.startswith('id,text,created_at'): continue line_number += 1 buffer.append(line) if line.endswith('"') and re.search(r'"[A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T"$', line): fixed_line = fix_line(buffer, line_number, rank_id) f_out.write(fixed_line + '\n') buffer = [] rank_id += 1 if buffer: fixed_line = fix_line(buffer, line_number, rank_id) f_out.write(fixed_line + '\n') print(f"CSV 文件已修复并添加 rank_id,保存为 {output_file}") # 第二步:数据库操作 def get_max_rank_id(cursor): try: cursor.execute(f"SELECT MAX(rank_id) FROM {TABLE_NAME}") result = cursor.fetchone()[0] return result if result is not None else 0 except mysql.connector.Error as e: print(f"获取最大 rank_id 出错: {e}") return 0 def import_to_database(input_file): try: conn = mysql.connector.connect(**DB_CONFIG) cursor = conn.cursor() print("成功连接到数据库") # 获取当前年份 current_year = str(datetime.now().year) # 获取数据库中最大 rank_id max_rank_id = get_max_rank_id(cursor) print(f"数据库中最大 rank_id: {max_rank_id}") with open(input_file, 'r', encoding='utf-8') as f: reader = csv.reader(f) next(reader) # 跳过表头 total_rows = 0 inserted = 0 for row in reader: if len(row) != 4: print(f"跳过无效行: {row}") continue rank_id, id_, text, created_at = row rank_id = int(rank_id) tweet_id = float(id_) # 只导入 rank_id 大于 max_rank_id 的记录 if rank_id <= max_rank_id: continue total_rows += 1 insert_query = f""" INSERT INTO {TABLE_NAME} (rank_id, id, text, year, created_at, timestamp) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, (rank_id, tweet_id, text, current_year, created_at, 0)) inserted += 1 print(f"文本: 【{text}】,这是第 {inserted} 行") conn.commit() print(f"数据库导入完成:总计处理 {total_rows} 行,插入 {inserted} 行") # 更新新插入记录的 timestamp,使用参数化查询 update_query = f""" UPDATE {TABLE_NAME} SET timestamp = UNIX_TIMESTAMP( CONVERT_TZ( STR_TO_DATE( CONCAT(year, ' ', SUBSTRING_INDEX(created_at, ' ', 4)), '%Y %b %d, %l:%i:%s %p' ), CASE WHEN RIGHT(created_at, 3) = 'EDT' THEN 'America/New_York' WHEN RIGHT(created_at, 3) = 'EST' THEN 'America/New_York' ELSE 'UTC' END, 'UTC' ) ) WHERE rank_id > {max_rank_id} """ cursor.execute(update_query) conn.commit() print(f"已更新 rank_id > {max_rank_id} 的记录的时间戳") except mysql.connector.Error as e: print(f"数据库错误: {e}") except Exception as e: print(f"其他错误: {e}") finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals() and conn.is_connected(): conn.close() print("数据库连接已关闭") # 主流程 def main(): process_file(INPUT_FILE, OUTPUT_FILE) import_to_database(OUTPUT_FILE) if __name__ == "__main__": main()