elon_py/pkg/tool.py

81 lines
3.0 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
2025-03-05 10:24:46 +08:00
import pandas as pd
from pkg.config import render_data
import pytz
2025-03-05 10:24:46 +08:00
def aggregate_data(data, interval):
all_minutes = pd.DataFrame({'interval_group': range(0, 1440, interval)})
result = []
2025-03-06 10:16:59 +08:00
if data.empty or 'date' not in data.columns:
complete_data = all_minutes.copy()
complete_data['tweet_count'] = 0
complete_data['date'] = datetime.now().date()
return complete_data
2025-03-05 10:24:46 +08:00
for date in data['date'].unique():
day_data = data[data['date'] == date].copy()
day_data['interval_group'] = (day_data['minute_of_day'] // interval) * interval
agg = day_data.groupby('interval_group').size().reset_index(name='tweet_count')
complete_data = all_minutes.merge(agg, on='interval_group', how='left').fillna({'tweet_count': 0})
complete_data['date'] = date
result.append(complete_data)
2025-03-06 10:16:59 +08:00
if not result:
complete_data = all_minutes.copy()
complete_data['tweet_count'] = 0
complete_data['date'] = data['date'].iloc[0] if not data.empty else datetime.now().date()
return complete_data
2025-03-05 10:24:46 +08:00
return pd.concat(result, ignore_index=True)
def generate_xticks(interval):
if interval <= 5:
tick_step = 60
elif interval <= 10:
tick_step = 60
elif interval <= 30:
tick_step = 120
else:
tick_step = 240
ticks = list(range(0, 1440, tick_step))
tick_labels = [f"{m // 60:02d}:{m % 60:02d}" for m in ticks]
return ticks, tick_labels
def minutes_to_time(minutes):
hours = minutes // 60
mins = minutes % 60
return f"{hours:02d}:{mins:02d}"
def get_tweets_since_last_friday():
est = pytz.timezone('US/Eastern')
now_est = datetime.now(est)
today = now_est.date()
2025-03-07 13:52:35 +08:00
days_since_friday = (today.weekday() - 4) % 7
this_friday = today - timedelta(days=days_since_friday)
this_friday_datetime = est.localize(datetime.combine(this_friday, datetime.strptime("12:00", "%H:%M").time()))
2025-03-07 13:52:35 +08:00
last_friday = this_friday - timedelta(days=7)
last_friday_datetime = est.localize(datetime.combine(last_friday, datetime.strptime("12:00", "%H:%M").time()))
if now_est < this_friday_datetime:
start_datetime = last_friday_datetime
else:
start_datetime = this_friday_datetime
if hasattr(render_data, 'global_df') and not render_data.global_df.empty:
df = render_data.global_df.copy()
2025-03-07 13:52:35 +08:00
mask = df['datetime_est'] >= start_datetime
filtered_df = df[mask]
tweet_count = len(filtered_df)
return int(tweet_count)
2025-03-07 13:52:35 +08:00
return 0
def format_time_str(days_to_next_friday):
total_seconds = days_to_next_friday * 24 * 60 * 60
days = int(total_seconds // (24 * 60 * 60))
hours = int((total_seconds % (24 * 60 * 60)) // (60 * 60))
minutes = int((total_seconds % (60 * 60)) // 60)
seconds = int(total_seconds % 60)
total_hours = round(days_to_next_friday * 24, 2)
return f"{days}d {hours}h {minutes:02d}m {seconds:02d}s ({total_hours}h)"