-line chart

-Probability
This commit is contained in:
NY 2025-03-21 12:22:12 +08:00
parent abf820fe08
commit 681f501c1c
5 changed files with 27 additions and 417 deletions

View File

@ -2,6 +2,7 @@ from datetime import timedelta, datetime
from dash import dcc, html from dash import dcc, html
from pkg.config import interval_options, days_options, render_data from pkg.config import interval_options, days_options, render_data
def layout_config(app): def layout_config(app):
app.layout = html.Div([ app.layout = html.Div([
html.Div( html.Div(
@ -112,27 +113,16 @@ def layout_config(app):
'zIndex': 1000 'zIndex': 1000
} }
), ),
# Main content
html.Div([ html.Div([
html.H1("Elon Musk Tweet Time Analysis (EST)"), html.Div(id='multi-interval-container', children=[
html.Div(id='date-picker-container', children=[
dcc.Dropdown( dcc.Dropdown(
id='multi-date-picker', id='multi-interval-picker',
options=[{'label': str(date), 'value': str(date)} for date in render_data.all_dates], options=interval_options,
value=render_data.default_date, value=10,
multi=True, style={'width': '50%', 'marginTop': '10px'}
searchable=True,
placeholder="Search and select dates (YYYY-MM-DD)",
style={'width': '100%'}
) )
]), ]),
dcc.Dropdown( html.Div(id='days-display-container', children=[
id='multi-interval-picker',
options=interval_options,
value=10,
style={'width': '50%', 'marginTop': '10px'}
),
html.Div(id='days-display-container', style={'display': 'none'}, children=[
dcc.Dropdown( dcc.Dropdown(
id='days-display-picker', id='days-display-picker',
options=days_options, options=days_options,
@ -141,18 +131,8 @@ def layout_config(app):
) )
]), ]),
html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}), html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}),
dcc.Checklist(
id='time-zone-checklist',
options=[
{'label': 'California Time (PST)', 'value': 'PST'},
{'label': 'Texas Time (CST)', 'value': 'CST'}
],
value=['PST'],
style={'margin': '10px'}
),
html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}), html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}),
dcc.Tabs(id='tabs', value='line', children=[ dcc.Tabs(id='tabs', value='heatmap', children=[
dcc.Tab(label='Line', value='line'),
dcc.Tab(label='Heatmap', value='heatmap'), dcc.Tab(label='Heatmap', value='heatmap'),
dcc.Tab(label='Heatmap(1-day)', value='one_day_heatmap'), dcc.Tab(label='Heatmap(1-day)', value='one_day_heatmap'),
]), ]),
@ -193,83 +173,7 @@ def layout_config(app):
style={'width': '100%'} style={'width': '100%'}
) )
) )
]), ])
html.Tr([
html.Td("Predict Tweets Start:", style={'paddingRight': '10px'}),
html.Td(
dcc.Input(
id='prob-start-input',
type='number',
placeholder='输入 Probability Start 值',
value=525,
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Predict Tweets End:", style={'paddingRight': '10px'}),
html.Td(
dcc.Input(
id='prob-end-input',
type='number',
placeholder='输入 Probability End 值',
value=549,
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Calculate Probability:", style={'paddingRight': '10px'}),
html.Td(
html.Button('Calculate', id='update-button', n_clicks=0)
)
]),
html.Tr(id='manual-info-tooltip', style={'margin': '10px'})
], style={
'width': '50%',
'marginTop': '10px',
'borderCollapse': 'collapse'
}),
# 新增测试区域
html.H2("Historical Probability Test", style={'marginTop': '20px'}),
html.Table([
html.Tr([
html.Td("Test Date:", style={'paddingRight': '10px'}),
html.Td(
dcc.DatePickerSingle(
id='test-date-input',
date=(datetime.now().date() - timedelta(days=1)).strftime('%Y-%m-%d'), # 默认昨天
display_format='YYYY-MM-DD',
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Test Time:", style={'paddingRight': '10px'}),
html.Td(
html.Div([
dcc.Input(
id='test-time-input',
type='text',
placeholder='HH:MM:SS (e.g., 12:00:00)', # 增强提示
value='12:00:00',
pattern='[0-2][0-9]:[0-5][0-9]:[0-5][0-9]', # 限制格式
style={'width': '100%'}
),
html.Span(
"Enter time in HH:MM:SS format (e.g., 12:00:00)",
style={'fontSize': '12px', 'color': 'gray', 'marginTop': '5px', 'display': 'block'}
)
])
)
]),
html.Tr([
html.Td("Test Probability:", style={'paddingRight': '10px'}),
html.Td(
html.Button('Test', id='test-button', n_clicks=0)
)
]),
html.Tr(id='test-info-tooltip', style={'margin': '10px'})
], style={ ], style={
'width': '50%', 'width': '50%',
'marginTop': '10px', 'marginTop': '10px',
@ -279,4 +183,4 @@ def layout_config(app):
dcc.Interval(id='clock-interval', interval=1000, n_intervals=0) dcc.Interval(id='clock-interval', interval=1000, n_intervals=0)
]) ])
return app return app

View File

@ -1,37 +0,0 @@
from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
@app.callback(
[Output('manual-info-tooltip', 'children')],
[Input('update-button', 'n_clicks'),
Input('prob-start-input', 'value'),
Input('prob-end-input', 'value')]
)
def update_info_manual(n_clicks, prob_start, prob_end):
if n_clicks == 0:
return [html.Div("Click 'Manual Update' to see results.")]
tweet_count, days_to_next_friday = get_pace_params()
prob_start = int(prob_start) if prob_start is not None else 525
prob_end = int(prob_end) if prob_end is not None else 549
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_low, prob_high = map(float, probability.split(" - "))
formatted_probability = f"{prob_low * 100:.2f}% - {prob_high * 100:.2f}%"
pace_table_rows = [
html.Tr([
html.Th(f"Probability ({prob_start}-{prob_end})", colSpan=2, style={'paddingRight': '10px'}),
html.Td(formatted_probability, colSpan=6, style={'paddingRight': '10px'})
])
]
pace_table = html.Table(pace_table_rows, style={
'width': '100%',
'textAlign': 'left',
'borderCollapse': 'collapse'
})
return [pace_table]

View File

@ -1,219 +0,0 @@
from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
import os
import csv
import pandas as pd
import re
from datetime import timedelta
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
# 全局数据,避免重复加载
global_data = None
def initialize_global_data():
global global_data
if global_data is None:
global_data = render_data.global_agg_df.copy()
# 预计算常用列,避免重复操作
global_data['hours'] = global_data['minute_of_day'] // 60
global_data['minutes'] = global_data['minute_of_day'] % 60
global_data['datetime_est'] = pd.to_datetime(
global_data['date'].astype(str) + ' ' +
global_data['hours'].astype(str) + ':' +
global_data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize('US/Eastern', ambiguous='NaT')
@app.callback(
[Output('test-info-tooltip', 'children')],
[Input('test-button', 'n_clicks'),
Input('test-date-input', 'date'),
Input('test-time-input', 'value')]
)
def update_test_info(n_clicks, test_date, test_time, data=None):
if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern')
data = data if data is not None else render_data.global_agg_df.copy()
if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$'
if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")]
try:
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError:
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
test_day_of_week = test_datetime.weekday()
test_hour = test_datetime.hour
days_since_last_friday = (test_day_of_week - 4) % 7
if test_hour < 12 and test_day_of_week == 4:
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7)
else:
cycle_start = test_datetime - timedelta(days=days_since_last_friday)
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
cycle_end = cycle_start + timedelta(days=7)
first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est)
else:
cycle_end = cycle_end.tz_convert(est)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
if days_to_next_friday <= 0:
return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")]
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty:
return [html.Div(f"No data available from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum()
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum()
days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60)
if days_elapsed <= 0:
return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")]
daily_avg = tweet_count / days_elapsed
predicted_end_count = daily_avg * 7
prob_start = predicted_end_count * 0.9
prob_end = predicted_end_count * 1.1
try:
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
except Exception as e:
return [html.Div(f"Error calculating probability: {str(e)}")]
test_table_rows = [
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]),
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]),
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]),
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]),
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4),
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
]
if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)]))
else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table]
def process_test_case(args):
test_datetime, data = args
test_date = test_datetime.date().strftime('%Y-%m-%d')
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1
result = update_test_info(n_clicks, test_date, test_time, data)
if isinstance(result[0], html.Table):
table = result[0]
rows = table.children
cycle_start = str(rows[0].children[1].children)
test_dt = str(rows[1].children[1].children)
cycle_end = str(rows[2].children[1].children)
tweet_count = int(rows[3].children[1].children)
actual_end_count = int(rows[4].children[1].children)
prob_range = rows[5].children[1].children
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
in_range = rows[6].children[1].children == "Yes"
expected_prob = None
note = ""
if len(rows) > 7:
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1])
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
return [
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
]
else:
return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000):
est = pytz.timezone('US/Eastern')
start_dt = pd.to_datetime(start_date).tz_localize(est)
end_dt = pd.to_datetime(end_date).tz_localize(est)
time_points = []
current_dt = start_dt
while current_dt <= end_dt:
time_points.append(current_dt)
current_dt += timedelta(hours=interval_hours)
headers = [
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
]
if not os.path.exists(output_file):
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
# 预加载数据
initialize_global_data()
data = global_data
total_steps = len(time_points)
max_workers = max_workers or os.cpu_count() or 4
chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数
# 分块处理时间点
chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)]
with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar:
for chunk in chunks:
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
pbar.update(1)
except Exception as e:
test_datetime = futures[future]
results.append([test_datetime.date().strftime('%Y-%m-%d'),
test_datetime.time().strftime('%H:%M:%S'),
"", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"])
pbar.update(1)
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(results)
if __name__ == "__main__":
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000)

View File

@ -2,37 +2,25 @@ from datetime import datetime, timedelta
from dash.dependencies import Input, Output from dash.dependencies import Input, Output
from pkg.dash.app_init import app from pkg.dash.app_init import app
from pkg.config import render_data from pkg.config import render_data
from pkg.tool import aggregate_data, generate_xticks, minutes_to_time, get_tweets_since_last_friday from pkg.tool import aggregate_data, minutes_to_time, get_tweets_since_last_friday
from dash import dcc from dash import dcc
import plotly.graph_objs as go import plotly.graph_objs as go
import pandas as pd import pandas as pd
@app.callback( @app.callback(
[Output('tabs-content', 'children'), [Output('tabs-content', 'children'),
Output('multi-day-warning', 'children'), Output('multi-day-warning', 'children'),
Output('multi-tweet-summary', 'children')], Output('multi-tweet-summary', 'children')],
[Input('tabs', 'value'), [Input('tabs', 'value'),
Input('multi-date-picker', 'value'),
Input('multi-interval-picker', 'value'), Input('multi-interval-picker', 'value'),
Input('time-zone-checklist', 'value'),
Input('days-display-picker', 'value')] Input('days-display-picker', 'value')]
) )
def render_tab_content(tab, selected_dates, interval, time_zones, days_to_display): def render_tab_content(tab, interval, days_to_display):
warning = "" warning = ""
if tab == 'line': available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
if not selected_dates: # Handle None or empty list selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
selected_dates = [datetime.now().date()] # Default to today if not available_dates:
warning = "No dates selected. Showing todays data." warning = "No data available. Showing todays date with zero tweets."
if len(selected_dates) > 10:
selected_dates = selected_dates[:10]
warning = "Maximum of 10 days can be selected. Showing first 10 selected days."
selected_dates = [datetime.strptime(date, '%Y-%m-%d').date() for date in selected_dates]
else:
available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
if not available_dates:
warning = "No data available. Showing todays date with zero tweets."
multi_data_agg = render_data.global_agg_df[render_data.global_agg_df['date'].isin(selected_dates)].copy() multi_data_agg = render_data.global_agg_df[render_data.global_agg_df['date'].isin(selected_dates)].copy()
if multi_data_agg.empty: if multi_data_agg.empty:
@ -47,23 +35,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
tweet_count_total = 0 tweet_count_total = 0
agg_data = aggregate_data(multi_data_agg, interval) agg_data = aggregate_data(multi_data_agg, interval)
xticks, xtick_labels = generate_xticks(interval)
if tab == 'line': if tab == 'heatmap':
fig = go.Figure()
for date in selected_dates:
day_data = agg_data[agg_data['date'] == date]
hover_times = [f"{date} {minutes_to_time(minute)} EST" for minute in day_data['interval_group']]
fig.add_trace(go.Scatter(
x=day_data['interval_group'],
y=day_data['tweet_count'],
mode='lines',
name=str(date),
customdata=hover_times,
hovertemplate='%{customdata}<br>Tweets: %{y}<extra></extra>'
))
elif tab == 'heatmap':
pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0) pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0)
pivot_data.index = pivot_data.index.astype(str) pivot_data.index = pivot_data.index.astype(str)
fig = go.Figure(data=go.Heatmap( fig = go.Figure(data=go.Heatmap(
@ -77,7 +50,7 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
for i, date_str in enumerate(pivot_data.index): for i, date_str in enumerate(pivot_data.index):
date = datetime.strptime(date_str, '%Y-%m-%d').date() date = datetime.strptime(date_str, '%Y-%m-%d').date()
if date.weekday() == 4: # Friday if date.weekday() == 4:
prev_date = date - timedelta(days=1) prev_date = date - timedelta(days=1)
if str(prev_date) in pivot_data.index: if str(prev_date) in pivot_data.index:
y_position = i / len(pivot_data.index) y_position = i / len(pivot_data.index)
@ -105,8 +78,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
for _, row in one_day_data.iterrows(): for _, row in one_day_data.iterrows():
minute = row['interval_group'] minute = row['interval_group']
hour = int(minute // 60) # Convert to integer hour = int(minute // 60)
interval_idx = int((minute % 60) // interval) # Convert to integer interval_idx = int((minute % 60) // interval)
if hour < 24: if hour < 24:
z_values[hour][interval_idx] = row['tweet_count'] z_values[hour][interval_idx] = row['tweet_count']
@ -126,20 +99,12 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
hovertemplate='%{y}:%{x} EST<br>Tweets: %{z}<br>Rate: %{customdata:.2%}<extra></extra>' hovertemplate='%{y}:%{x} EST<br>Tweets: %{z}<br>Rate: %{customdata:.2%}<extra></extra>'
)) ))
if tab in ['line', 'one_day_heatmap']:
fig.update_layout( fig.update_layout(
title=f'{"Line" if tab == "line" else "One-Day Heatmap"} Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)', title=f'One-Day Heatmap Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
xaxis_title='Minutes' if tab == 'one_day_heatmap' else 'Eastern Time (HH:MM)', xaxis_title='Minutes',
yaxis_title='Hour of Day' if tab == 'one_day_heatmap' else 'Tweet Count', yaxis_title='Hour of Day',
xaxis=dict(
range=[0, 1440] if tab == 'line' else None,
tickvals=xticks if tab == 'line' else None,
ticktext=xtick_labels if tab == 'line' else None,
tickangle=45 if tab == 'line' else 0
),
height=600, height=600,
showlegend=(tab == 'line'), yaxis=dict(autorange='reversed')
yaxis=dict(autorange='reversed') if tab == 'one_day_heatmap' else None
) )
summary = f"Total tweets: {get_tweets_since_last_friday()}" summary = f"Total tweets: {get_tweets_since_last_friday()}"

View File

@ -3,12 +3,9 @@ from dash.dependencies import Input, Output
@app.callback( @app.callback(
[Output('date-picker-container', 'style'), [Output('days-display-container', 'style'),
Output('days-display-container', 'style'), Output('multi-interval-container', 'style')],
Output('time-zone-checklist', 'style')],
[Input('tabs', 'value')] [Input('tabs', 'value')]
) )
def toggle_controls_visibility(tab): def toggle_controls_visibility(tab):
if tab == 'heatmap' or tab == 'one_day_heatmap': return {'display': 'block'},{'display': 'block'}
return {'display': 'none'}, {'display': 'block'}, {'display': 'none'}
return {'display': 'block'}, {'display': 'none'}, {'display': 'block'}