elon_py/pkg/tool.py
2025-03-24 10:29:17 +08:00

150 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
import pandas as pd
from pkg.config import render_data
import pytz
def aggregate_data(data, interval):
all_minutes = pd.DataFrame({'interval_group': range(0, 1440, interval)})
result = []
if data.empty or 'date' not in data.columns:
complete_data = all_minutes.copy()
complete_data['tweet_count'] = 0
complete_data['date'] = datetime.now().date()
return complete_data
for date in data['date'].unique():
day_data = data[data['date'] == date].copy()
day_data['interval_group'] = (day_data['minute_of_day'] // interval) * interval
agg = day_data.groupby('interval_group')['tweet_count'].sum().reset_index(name='tweet_count')
complete_data = all_minutes.merge(agg, on='interval_group', how='left').fillna({'tweet_count': 0})
complete_data['date'] = date
result.append(complete_data)
if not result:
complete_data = all_minutes.copy()
complete_data['tweet_count'] = 0
complete_data['date'] = data['date'].iloc[0] if not data.empty else datetime.now().date()
return complete_data
return pd.concat(result, ignore_index=True)
def generate_xticks(interval):
if interval <= 5:
tick_step = 60
elif interval <= 10:
tick_step = 60
elif interval <= 30:
tick_step = 120
else:
tick_step = 240
ticks = list(range(0, 1440, tick_step))
tick_labels = [f"{m // 60:02d}:{m % 60:02d}" for m in ticks]
return ticks, tick_labels
def minutes_to_time(minutes):
hours = minutes // 60
mins = minutes % 60
return f"{hours:02d}:{mins:02d}"
def get_tweets_since_last_friday():
est = pytz.timezone('US/Eastern')
now_est = datetime.now(est)
today = now_est.date()
days_since_friday = (today.weekday() - 4) % 7
this_friday = today - timedelta(days=days_since_friday)
this_friday_datetime = est.localize(datetime.combine(this_friday, datetime.strptime("12:00", "%H:%M").time()))
last_friday = this_friday - timedelta(days=7)
last_friday_datetime = est.localize(datetime.combine(last_friday, datetime.strptime("12:00", "%H:%M").time()))
if now_est < this_friday_datetime:
start_datetime = last_friday_datetime
else:
start_datetime = this_friday_datetime
if hasattr(render_data, 'global_df') and not render_data.global_df.empty:
df = render_data.global_df.copy()
mask = df['datetime_est'] >= start_datetime
filtered_df = df[mask]
tweet_count = len(filtered_df)
return int(tweet_count)
return 0
def get_time_since_last_tweet():
est = pytz.timezone('US/Eastern')
now_est = datetime.now(est)
if (not hasattr(render_data, 'global_df') or
render_data.global_df is None or
render_data.global_df.empty):
return 0.0
df = render_data.global_df
if 'datetime_est' not in df.columns:
return 0.0
latest_tweet_time = df['datetime_est'].max()
time_diff = now_est - latest_tweet_time
days_diff = time_diff.total_seconds() / (24 * 60 * 60)
return days_diff
def format_time_str(days_to_next_friday):
total_seconds = days_to_next_friday * 24 * 60 * 60
days = int(total_seconds // (24 * 60 * 60))
hours = int((total_seconds % (24 * 60 * 60)) // (60 * 60))
minutes = int((total_seconds % (60 * 60)) // 60)
seconds = int(total_seconds % 60)
total_hours = round(days_to_next_friday * 24, 2)
return f"{days}d {hours:02d}h {minutes:02d}m {seconds:02d}s ({total_hours}h)"
def get_pace_and_total_tweets(target_time: datetime) -> tuple[float, int]:
est = pytz.timezone('US/Eastern')
# 如果 target_time 没有时区信息,假设为 EST
if target_time.tzinfo is None:
target_time = est.localize(target_time)
# 计算上周五 12:00 AM EST
target_date = target_time.date()
days_since_last_friday = (target_date.weekday() + 3) % 7 # 距离上周五的天数
last_friday = target_time - timedelta(days=days_since_last_friday)
last_friday_midnight = last_friday.replace(hour=0, minute=0, second=0, microsecond=0)
# 计算下周五 12:00 AM EST
days_to_next_friday = (4 - target_date.weekday()) % 7
next_friday = target_time + timedelta(days=days_to_next_friday)
next_friday_midnight = next_friday.replace(hour=0, minute=0, second=0, microsecond=0)
if target_time > next_friday_midnight:
next_friday_midnight += timedelta(days=7)
# 从 global_agg_df 中筛选从上周五 12:00 AM 到 target_time 的数据
if hasattr(render_data, 'global_agg_df') and not render_data.global_agg_df.empty:
multi_data_agg = render_data.global_agg_df[
(render_data.global_agg_df['date'] >= last_friday_midnight.date()) &
(render_data.global_agg_df['date'] <= target_date)
].copy()
else:
multi_data_agg = pd.DataFrame()
if multi_data_agg.empty:
total_tweets = 0
else:
# 使用 minute_of_day 转换为时间戳并筛选到 target_time 之前
multi_data_agg['timestamp'] = pd.to_datetime(multi_data_agg['date'].astype(str)) + \
pd.to_timedelta(multi_data_agg['minute_of_day'], unit='m')
multi_data_agg['timestamp'] = multi_data_agg['timestamp'].dt.tz_localize(est)
multi_data_agg = multi_data_agg[multi_data_agg['timestamp'] <= target_time]
total_tweets = multi_data_agg['tweet_count'].sum() if 'tweet_count' in multi_data_agg else 0
# 计算 Pace
days_elapsed = (target_time - last_friday_midnight).total_seconds() / (24 * 60 * 60)
days_remaining = (next_friday_midnight - target_time).total_seconds() / (24 * 60 * 60)
if days_elapsed > 0 and total_tweets > 0:
daily_avg = total_tweets / days_elapsed
pace = daily_avg * days_remaining + total_tweets
else:
pace = float(total_tweets) # 如果没有数据或时间未开始Pace 等于当前推文数
return round(pace, 2), int(total_tweets)