elon_py/pkg/get_tweets.py
NY 2bf5cf28b7 1.▶️🔄
2.修改项目结构
2025-03-05 10:24:46 +08:00

156 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import re
import mysql.connector
import requests
from datetime import datetime
from pkg.config import TABLE_NAME,DB_CONFIG,INPUT_FILE,OUTPUT_FILE
def download_file(file_path):
url = 'https://www.xtracker.io/api/download'
headers = {
'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5',
'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json',
'Origin': 'https://www.xtracker.io', 'Pragma': 'no-cache', 'Referer': 'https://www.xtracker.io/',
'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0',
'sec-ch-ua': '"Not(A:Brand";v="99", "Microsoft Edge";v="133", "Chromium";v="133"', 'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
data = '{"handle":"elonmusk","platform":"X"}'
try:
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
with open(file_path, 'wb') as f:
f.write(response.content)
return True, "File downloaded successfully"
else:
return False, f"Download failed with status code {response.status_code}: {response.text}"
except Exception as e:
return False, f"Error downloading file: {str(e)}"
def fix_line(lines, line_number, rank_id):
full_line = ''.join(lines)
match = re.search(r'^([^,]+),"(.+?)","([A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T)"$', full_line,
re.DOTALL)
if match:
id_part = match.group(1)
text_content = match.group(2)
created_at = match.group(3)
fixed_text = text_content.replace('"', '""')
fixed_line = f'{rank_id},{id_part},"{fixed_text}","{created_at}"'
return fixed_line
else:
print(f"Line {line_number} format error: {repr(full_line)}")
return f'{rank_id},{full_line}'
def process_file(input_file, output_file):
with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'w', encoding='utf-8') as f_out:
f_out.write("rank_id,id,text,created_at\n")
buffer = []
line_number = 0
rank_id = 1
for line in f_in:
line = line.rstrip('\n')
if line.startswith('id,text,created_at'):
continue
line_number += 1
buffer.append(line)
if line.endswith('"') and re.search(r'"[A-Z][a-z]{2} \d{1,2}, \d{1,2}:\d{2}:\d{2} (AM|PM) E[SD]T"$', line):
fixed_line = fix_line(buffer, line_number, rank_id)
f_out.write(fixed_line + '\n')
buffer = []
rank_id += 1
if buffer:
fixed_line = fix_line(buffer, line_number, rank_id)
f_out.write(fixed_line + '\n')
return True, f"CSV 文件已修复并添加 rank_id保存为 {output_file}"
def get_max_rank_id(cursor):
try:
cursor.execute(f"SELECT MAX(rank_id) FROM {TABLE_NAME}")
result = cursor.fetchone()[0]
return result if result is not None else 0, True, ""
except mysql.connector.Error as e:
return 0, False, f"Error getting max rank_id: {str(e)}"
def import_to_database(input_file: str) -> tuple[bool, str]:
try:
# Use context managers to ensure resources are closed properly
with mysql.connector.connect(**DB_CONFIG) as conn, conn.cursor() as cursor:
current_year = str(datetime.now().year)
max_rank_id, success, error = get_max_rank_id(cursor)
if not success:
return False, error
with open(input_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
try:
next(reader) # Skip header
except StopIteration:
return False, "File is empty or has no valid header"
total_rows, inserted = 0, 0
for row in reader:
if len(row) != 4:
continue
try:
rank_id = int(row[0])
tweet_id = float(row[1])
text, created_at = row[2], row[3]
except (ValueError, IndexError) as e:
return False, f"Invalid data format in row: {str(e)}"
if rank_id <= max_rank_id:
continue
total_rows += 1
insert_query = f"""
INSERT INTO {TABLE_NAME} (rank_id, id, text, year, created_at, timestamp)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (rank_id, tweet_id, text, current_year, created_at, 0))
inserted += 1
conn.commit()
update_query = f"""
UPDATE {TABLE_NAME}
SET timestamp = UNIX_TIMESTAMP(
CONVERT_TZ(
STR_TO_DATE(
CONCAT(year, ' ', SUBSTRING_INDEX(created_at, ' ', 4)),
'%Y %b %d, %l:%i:%s %p'
),
CASE
WHEN RIGHT(created_at, 3) = 'EDT' THEN 'America/New_York'
WHEN RIGHT(created_at, 3) = 'EST' THEN 'America/New_York'
ELSE 'UTC'
END,
'UTC'
)
) + 8*60*60
WHERE rank_id > {max_rank_id}
"""
cursor.execute(update_query)
conn.commit()
return True, f"Database import completed: {inserted} rows inserted"
except mysql.connector.Error as e:
return False, f"Database error: {str(e)}"
except FileNotFoundError as e:
return False, f"File not found: {str(e)}"
except csv.Error as e:
return False, f"CSV parsing error: {str(e)}"
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def process_tweets():
success, msg = download_file(INPUT_FILE)
if not success:
return False, msg
success, msg = process_file(INPUT_FILE, OUTPUT_FILE)
if not success:
return False, msg
success, msg = import_to_database(OUTPUT_FILE)
return success, msg