import csv import re import mysql.connector import requests from datetime import datetime from pkg.config import TABLE_NAME,DB_CONFIG,INPUT_FILE,OUTPUT_FILE def download_file(file_path): url = 'https://www.xtracker.io/api/download' headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Origin': 'https://www.xtracker.io', 'Pragma': 'no-cache', 'Referer': 'https://www.xtracker.io/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0', 'sec-ch-ua': '"Not(A:Brand";v="99", "Microsoft Edge";v="133", "Chromium";v="133"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } data = '{"handle":"elonmusk","platform":"X"}' try: response = requests.post(url, headers=headers, data=data) if response.status_code == 200: with open(file_path, 'wb') as f: f.write(response.content) return True, "File downloaded successfully" else: return False, f"Download failed with status code {response.status_code}: {response.text}" except Exception as e: return False, f"Error downloading file: {str(e)}" def fix_line(lines, line_number, rank_id): full_line = ''.join(lines) match = re.search(r'^([^,]+),"(.+?)","([A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T)"$', full_line, re.DOTALL) if match: id_part = match.group(1) text_content = match.group(2) created_at = match.group(3) fixed_text = text_content.replace('"', '""') fixed_line = f'{rank_id},{id_part},"{fixed_text}","{created_at}"' return fixed_line else: print(f"Line {line_number} format error: {repr(full_line)}") return f'{rank_id},{full_line}' def process_file(input_file, output_file): with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'w', encoding='utf-8') as f_out: f_out.write("rank_id,id,text,created_at\n") buffer = [] line_number = 0 rank_id = 1 for line in f_in: line = line.rstrip('\n') if line.startswith('id,text,created_at'): continue line_number += 1 buffer.append(line) if line.endswith('"') and re.search(r'"[A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T"$', line): fixed_line = fix_line(buffer, line_number, rank_id) f_out.write(fixed_line + '\n') buffer = [] rank_id += 1 if buffer: fixed_line = fix_line(buffer, line_number, rank_id) f_out.write(fixed_line + '\n') return True, f"CSV 文件已修复并添加 rank_id,保存为 {output_file}" def get_max_rank_id(cursor): try: cursor.execute(f"SELECT MAX(rank_id) FROM {TABLE_NAME}") result = cursor.fetchone()[0] return result if result is not None else 0, True, "" except mysql.connector.Error as e: return 0, False, f"Error getting max rank_id: {str(e)}" def import_to_database(input_file: str) -> tuple[bool, str]: try: # Use context managers to ensure resources are closed properly with mysql.connector.connect(**DB_CONFIG) as conn, conn.cursor() as cursor: current_year = str(datetime.now().year) max_rank_id, success, error = get_max_rank_id(cursor) if not success: return False, error with open(input_file, 'r', encoding='utf-8') as f: reader = csv.reader(f) try: next(reader) # Skip header except StopIteration: return False, "File is empty or has no valid header" total_rows, inserted = 0, 0 for row in reader: if len(row) != 4: continue try: rank_id = int(row[0]) tweet_id = float(row[1]) text, created_at = row[2], row[3] except (ValueError, IndexError) as e: return False, f"Invalid data format in row: {str(e)}" if rank_id <= max_rank_id: continue total_rows += 1 insert_query = f""" INSERT INTO {TABLE_NAME} (rank_id, id, text, year, created_at, timestamp) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, (rank_id, tweet_id, text, current_year, created_at, 0)) inserted += 1 conn.commit() update_query = f""" UPDATE {TABLE_NAME} SET timestamp = UNIX_TIMESTAMP( CONVERT_TZ( STR_TO_DATE( CONCAT(year, ' ', SUBSTRING_INDEX(created_at, ' ', 4)), '%Y %b %d, %l:%i:%s %p' ), CASE WHEN RIGHT(created_at, 3) = 'EDT' THEN 'America/New_York' WHEN RIGHT(created_at, 3) = 'EST' THEN 'America/New_York' ELSE 'UTC' END, 'UTC' ) ) + 8*60*60 WHERE rank_id > {max_rank_id} """ cursor.execute(update_query) conn.commit() return True, f"Database import completed: {inserted} rows inserted" except mysql.connector.Error as e: return False, f"Database error: {str(e)}" except FileNotFoundError as e: return False, f"File not found: {str(e)}" except csv.Error as e: return False, f"CSV parsing error: {str(e)}" except Exception as e: return False, f"Unexpected error: {str(e)}" def process_tweets(): success, msg = download_file(INPUT_FILE) if not success: return False, msg success, msg = process_file(INPUT_FILE, OUTPUT_FILE) if not success: return False, msg success, msg = import_to_database(OUTPUT_FILE) return success, msg