elon_py/api/main.py
2025-03-03 17:42:30 +08:00

415 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objs as go
import pandas as pd
import pytz
from datetime import datetime, timedelta
from sqlalchemy import create_engine
from dash import clientside_callback
# Database connection configuration (unchanged)
DB_CONFIG = {
'host': '8.155.23.172',
'port': 3306,
'user': 'root2',
'password': 'tG0f6PVYh18le41BCb',
'database': 'elonX'
}
TABLE_NAME = 'elon_tweets'
db_uri = f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
engine = create_engine(db_uri)
# Load data (unchanged)
df = pd.read_sql(f'SELECT timestamp FROM {TABLE_NAME}', con=engine)
eastern = pytz.timezone('America/New_York')
pacific = pytz.timezone('America/Los_Angeles')
central = pytz.timezone('America/Chicago')
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
df['datetime_est'] = df['datetime'].dt.tz_localize('UTC').dt.tz_convert(eastern)
df['date'] = df['datetime_est'].dt.date
df['minute_of_day'] = df['datetime_est'].dt.hour * 60 + df['datetime_est'].dt.minute
agg_df = df.groupby(['date', 'minute_of_day']).size().reset_index(name='tweet_count')
all_dates = sorted(agg_df['date'].unique(), reverse=True)
default_date = [str(all_dates[0])]
# Initialize Dash app (unchanged)
external_stylesheets = ['/assets/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
# Time interval and days options (Modified: removed 1 day, added 120 and 240 days)
interval_options = [
{'label': '1 minute', 'value': 1},
{'label': '5 minutes', 'value': 5},
{'label': '10 minutes', 'value': 10},
{'label': '30 minutes', 'value': 30},
{'label': '60 minutes', 'value': 60}
]
days_options = [
{'label': '7 days', 'value': 7},
{'label': '30 days', 'value': 30},
{'label': '90 days', 'value': 90},
{'label': '120 days', 'value': 120},
{'label': '240 days', 'value': 240}
]
# Dash app layout (unchanged except default days value)
app.layout = html.Div([
# Left sidebar with clock button and tooltip (unchanged)
html.Div(
id='clock-container',
children=[
html.Div(
id='clock-button',
children='🕒',
style={
'fontSize': '24px',
'cursor': 'pointer',
'padding': '5px',
}
),
html.Div(
id='clock-tooltip',
children=[
html.Div(id='pst-clock'),
html.Div(id='cst-clock'),
html.Div(id='est-clock')
],
style={
'position': 'absolute',
'left': '35px',
'top': '0px',
'backgroundColor': 'rgba(0, 0, 0, 0.8)',
'color': 'white',
'padding': '10px',
'borderRadius': '5px',
'fontSize': '14px',
'display': 'none',
'whiteSpace': 'nowrap'
}
),
html.Div(
id='play-button',
children='▶️',
n_clicks=0,
style={
'fontSize': '24px',
'cursor': 'pointer',
'padding': '5px',
'marginTop': '10px'
}
)
],
style={
'position': 'fixed',
'left': '10px',
'top': '50%',
'transform': 'translateY(-50%)',
'zIndex': 1000
}
),
# Main content
html.Div([
html.H1("Elon Musk Tweet Time Analysis (EST)"),
html.Div(id='date-picker-container', children=[
dcc.Dropdown(
id='multi-date-picker',
options=[{'label': str(date), 'value': str(date)} for date in all_dates],
value=default_date,
multi=True,
searchable=True,
placeholder="Search and select dates (YYYY-MM-DD)",
style={'width': '100%'}
)
]),
dcc.Dropdown(
id='multi-interval-picker',
options=interval_options,
value=10,
style={'width': '50%', 'marginTop': '10px'}
),
html.Div(id='days-display-container', style={'display': 'none'}, children=[
dcc.Dropdown(
id='days-display-picker',
options=days_options,
value=30, # Default changed to 30 since 1 is removed
style={'width': '50%', 'marginTop': '10px'}
)
]),
html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}),
dcc.Checklist(
id='time-zone-checklist',
options=[
{'label': 'California Time (PST)', 'value': 'PST'},
{'label': 'Texas Time (CST)', 'value': 'CST'}
],
value=['PST'],
style={'margin': '10px'}
),
html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}),
dcc.Tabs(id='tabs', value='line', children=[
dcc.Tab(label='Line Chart', value='line'),
dcc.Tab(label='Heatmap', value='heatmap'),
dcc.Tab(label='Scatter Plot', value='scatter'),
]),
html.Div(id='tabs-content'),
], style={'marginLeft': '50px'}),
dcc.Interval(id='clock-interval', interval=1000, n_intervals=0)
])
clientside_callback(
"""
function(n_intervals) {
const button = document.getElementById('clock-button');
const tooltip = document.getElementById('clock-tooltip');
if (button && tooltip) {
button.addEventListener('mouseover', () => {
tooltip.style.display = 'block';
});
button.addEventListener('mouseout', () => {
tooltip.style.display = 'none';
});
}
return window.dash_clientside.no_update;
}
""",
Output('clock-container', 'id'),
Input('clock-interval', 'n_intervals'),
prevent_initial_call=False
)
clientside_callback(
"""
function(n_clicks, existing_children) {
const button = document.getElementById('play-button');
if (!button) return '▶️';
if (n_clicks > 0) {
button.style.cursor = 'wait';
button.innerHTML = '🔄'; // 立即显示加载状态
setTimeout(() => {
button.innerHTML = ''; // 1秒后显示完成状态
button.style.cursor = 'pointer';
setTimeout(() => {
button.innerHTML = '▶️'; // 2秒后恢复初始状态
}, 2000);
}, 1000);
return '🔄'; // 返回当前状态
}
return '▶️'; // 默认状态
}
""",
Output('play-button', 'children'),
Input('play-button', 'n_clicks'),
State('play-button', 'children')
)
# Auxiliary functions (unchanged)
def aggregate_data(data, interval):
all_minutes = pd.DataFrame({'interval_group': range(0, 1440, interval)})
result = []
for date in data['date'].unique():
day_data = data[data['date'] == date].copy()
day_data['interval_group'] = (day_data['minute_of_day'] // interval) * interval
agg = day_data.groupby('interval_group').size().reset_index(name='tweet_count')
complete_data = all_minutes.merge(agg, on='interval_group', how='left').fillna({'tweet_count': 0})
complete_data['date'] = date
result.append(complete_data)
return pd.concat(result, ignore_index=True)
def generate_xticks(interval):
if interval <= 5:
tick_step = 60
elif interval <= 10:
tick_step = 60
elif interval <= 30:
tick_step = 120
else:
tick_step = 240
ticks = list(range(0, 1440, tick_step))
tick_labels = [f"{m // 60:02d}:{m % 60:02d}" for m in ticks]
return ticks, tick_labels
def minutes_to_time(minutes):
hours = minutes // 60
mins = minutes % 60
return f"{hours:02d}:{mins:02d}"
# Callback for updating clocks (unchanged)
@app.callback(
[Output('pst-clock', 'children'),
Output('cst-clock', 'children'),
Output('est-clock', 'children')],
[Input('clock-interval', 'n_intervals')]
)
def update_clocks(n):
now_utc = datetime.now(pytz.UTC)
pst_time = now_utc.astimezone(pacific).strftime('%Y-%m-%d %H:%M:%S PST')
cst_time = now_utc.astimezone(central).strftime('%Y-%m-%d %H:%M:%S CST')
est_time = now_utc.astimezone(eastern).strftime('%Y-%m-%d %H:%M:%S EST')
return f"𝕏: {pst_time}", f"🚀: {cst_time}", f"🏛️/🌴: {est_time}"
# Callback for toggling controls visibility (unchanged)
@app.callback(
[Output('date-picker-container', 'style'),
Output('days-display-container', 'style')],
[Input('tabs', 'value')]
)
def toggle_controls_visibility(tab):
if tab == 'heatmap':
return {'display': 'none'}, {'display': 'block'}
return {'display': 'block'}, {'display': 'none'}
# Callback for updating tabs content (Modified to add Thursday-Friday lines)
@app.callback(
[Output('tabs-content', 'children'),
Output('multi-day-warning', 'children'),
Output('multi-tweet-summary', 'children')],
[Input('tabs', 'value'),
Input('multi-date-picker', 'value'),
Input('multi-interval-picker', 'value'),
Input('time-zone-checklist', 'value'),
Input('days-display-picker', 'value')]
)
def render_tab_content(tab, selected_dates, interval, time_zones, days_to_display):
warning = ""
if tab != 'heatmap':
if len(selected_dates) > 10:
selected_dates = selected_dates[:10]
warning = "Maximum of 10 days can be selected. Showing first 10 selected days."
selected_dates = [datetime.strptime(date, '%Y-%m-%d').date() for date in selected_dates]
else:
selected_dates = sorted(all_dates, reverse=True)[:days_to_display]
multi_data_agg = agg_df[agg_df['date'].isin(selected_dates)].copy()
if multi_data_agg.empty:
multi_data_agg = pd.DataFrame({'date': selected_dates, 'minute_of_day': [0] * len(selected_dates)})
tweet_count_total = 0
else:
tweet_count_total = multi_data_agg['tweet_count'].sum()
multi_data_raw = df[df['date'].isin(selected_dates)].copy()
if multi_data_raw.empty:
tweet_count_total = 0
agg_data = aggregate_data(multi_data_agg, interval)
xticks, xtick_labels = generate_xticks(interval)
if tab == 'line':
fig = go.Figure()
for date in selected_dates:
day_data = agg_data[agg_data['date'] == date]
hover_times = [f"{date} {minutes_to_time(minute)} EST" for minute in day_data['interval_group']]
fig.add_trace(go.Scatter(
x=day_data['interval_group'],
y=day_data['tweet_count'],
mode='lines',
name=str(date),
customdata=hover_times,
hovertemplate='%{customdata}<br>Tweets: %{y}<extra></extra>'
))
elif tab == 'heatmap':
pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0)
pivot_data.index = pivot_data.index.astype(str)
fig = go.Figure(data=go.Heatmap(
z=pivot_data.values,
x=[minutes_to_time(m) for m in pivot_data.columns],
y=pivot_data.index,
colorscale='Viridis',
hoverongaps=False,
hovertemplate='%{y} %{x} EST<br>Tweets: %{z}<extra></extra>'
))
for i, date_str in enumerate(pivot_data.index):
date = datetime.strptime(date_str, '%Y-%m-%d').date()
if date.weekday() == 4: # Friday
prev_date = date - timedelta(days=1)
if str(prev_date) in pivot_data.index:
y_position = i / len(pivot_data.index)
fig.add_hline(
y=1-y_position,
line_dash="dash",
line_color="white",
xref="x",
yref="paper"
)
fig.update_layout(
title = f'Tweet Heatmap (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
xaxis_title = 'Time of Day (HH:MM EST)',
yaxis_title = 'Date',
height = max(400, len(selected_dates) * 20),
yaxis = dict(autorange='reversed')
)
elif tab == 'scatter':
fig = go.Figure()
for date in selected_dates:
day_data = multi_data_raw[multi_data_raw['date'] == date]
if not day_data.empty:
hover_times = [t.strftime('%Y-%m-%d %H:%M:%S EST') for t in day_data['datetime_est']]
fig.add_trace(go.Scatter(
x=day_data['minute_of_day'],
y=[str(date)] * len(day_data),
mode='markers',
name=str(date),
customdata=hover_times,
hovertemplate='%{customdata}<extra></extra>',
marker=dict(size=8)
))
if tab in ['line', 'scatter']:
if 'PST' in time_zones:
pacific_2am_est = (2 + 3) * 60
pacific_7am_est = (7 + 3) * 60
fig.add_vline(x=pacific_2am_est, line_dash="dash", line_color="blue", annotation_text="CA 2AM PST")
fig.add_vline(x=pacific_7am_est, line_dash="dash", line_color="blue", annotation_text="CA 7AM PST")
if 'CST' in time_zones:
central_2am_est = (2 + 1) * 60
central_7am_est = (7 + 1) * 60
fig.add_vline(x=central_2am_est, line_dash="dash", line_color="green", annotation_text="TX 2AM CST")
fig.add_vline(x=central_7am_est, line_dash="dash", line_color="green", annotation_text="TX 7AM CST")
if tab in ['line', 'scatter']:
fig.update_layout(
title=f'{"Line" if tab == "line" else "Scatter"} Tweet Frequency (Interval: {interval} minutes, EST)',
xaxis_title='Eastern Time (HH:MM)',
yaxis_title='Tweet Count' if tab == 'line' else 'Date',
xaxis=dict(range=[0, 1440], tickvals=xticks, ticktext=xtick_labels, tickangle=45),
height=600,
showlegend=True,
yaxis=dict(autorange='reversed') if tab == 'scatter' else None
)
summary = f"Total tweets for selected dates: {int(tweet_count_total)}"
return dcc.Graph(figure=fig), warning, summary
@app.callback(
Output('play-button', 'n_clicks'),
Input('play-button', 'n_clicks'),
prevent_initial_call=True
)
def execute_function(n_clicks):
if n_clicks > 0:
# Add your function to execute here
# For example:
print("Function executed!")
# Simulate some work
import time
time.sleep(1)
# Reset n_clicks to 0 to allow repeated clicks
return 0
# Run the app
if __name__ == '__main__':
app.run_server(debug=True)