elon_py/pkg/dash/func/info_test.py
2025-03-14 16:22:00 +08:00

118 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
import pandas as pd
import re
from datetime import timedelta
@app.callback(
[Output('test-info-tooltip', 'children')],
[Input('test-button', 'n_clicks'),
Input('test-date-input', 'date'),
Input('test-time-input', 'value')]
)
def update_test_info(n_clicks, test_date, test_time):
if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern')
data = render_data.global_agg_df.copy()
# 调试:打印输入值
print(f"test_date: {test_date}, test_time: {test_time}")
# 检查输入是否为空
if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
# 验证时间格式
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' # HH:MM:SS (00:00:00 to 23:59:59)
if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00) with hours 00-23, minutes 00-59, seconds 00-59.")]
# 重构 datetime_est处理夏令时模糊时间
data['hours'] = data['minute_of_day'] // 60
data['minutes'] = data['minute_of_day'] % 60
data['datetime_est'] = pd.to_datetime(
data['date'].astype(str) + ' ' +
data['hours'].astype(str) + ':' +
data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize(est, ambiguous='NaT')
if data['datetime_est'].isna().any():
print("Warning: Some datetime_est values are NaT due to ambiguous time handling")
# 解析测试日期和时间
try:
test_date = pd.to_datetime(test_date, format='%Y-%m-%d').date()
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError as e:
print(f"Error parsing date/time: {e}")
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
# 计算周期开始时间(上一个周五 12:00 PM
days_since_last_friday = (test_datetime.weekday() - 4) % 7 # 4 表示周五
cycle_start = test_datetime - timedelta(days=days_since_last_friday)
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
# 确保周期结束时间(下周五 12:00 PM EDT考虑夏令时
cycle_end = cycle_start + timedelta(days=7)
# 精确计算夏令时开始2025年3月9日第二个星期日
first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est) # EDT
else:
cycle_end = cycle_end.tz_convert(est) # EST 或 EDT
# 调试:打印周期信息
print(f"Cycle Start: {cycle_start}, Cycle End: {cycle_end}")
# 过滤周期内的数据
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty:
return [html.Div(f"No data available in cycle from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum()
# 计算实际最终推文数(周期结束时的总数)
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum()
# 计算 days_to_next_friday从 test_datetime 到周期结束)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
# 设置预测范围
prob_start = actual_end_count * 0.9
prob_end = actual_end_count * 1.1
# 计算概率
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
# 构建测试结果表格(包含 Cycle End
test_table_rows = [
html.Tr([html.Th("Test Date and Time:", colSpan=2), html.Td(str(test_datetime), colSpan=6)]),
html.Tr([html.Th("Cycle Start:", colSpan=2), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Cycle End:", colSpan=2), html.Td(str(cycle_end), colSpan=6)]),
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=2), html.Td(str(tweet_count), colSpan=6)]),
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=2), html.Td(str(actual_end_count), colSpan=6)]),
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=2), html.Td(formatted_probability, colSpan=6)]),
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=2),
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
]
if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=2),
html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6)]))
else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=2),
html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table]