from pkg.dash.func.info_func import * from pkg.dash.app_init import app from dash.dependencies import Input, Output from dash import html import pandas as pd import re from datetime import timedelta @app.callback( [Output('test-info-tooltip', 'children')], [Input('test-button', 'n_clicks'), Input('test-date-input', 'date'), Input('test-time-input', 'value')] ) def update_test_info(n_clicks, test_date, test_time): if n_clicks == 0: return [html.Div("Click 'Test' to see historical probability results.")] est = pytz.timezone('US/Eastern') data = render_data.global_agg_df.copy() # 调试:打印输入值 print(f"test_date: {test_date}, test_time: {test_time}") # 检查输入是否为空 if not test_date or not test_time: return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")] # 验证时间格式 time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' # HH:MM:SS (00:00:00 to 23:59:59) if not re.match(time_pattern, test_time): return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00) with hours 00-23, minutes 00-59, seconds 00-59.")] # 重构 datetime_est,处理夏令时模糊时间 data['hours'] = data['minute_of_day'] // 60 data['minutes'] = data['minute_of_day'] % 60 data['datetime_est'] = pd.to_datetime( data['date'].astype(str) + ' ' + data['hours'].astype(str) + ':' + data['minutes'].astype(str) + ':00', errors='coerce' ).dt.tz_localize(est, ambiguous='NaT') if data['datetime_est'].isna().any(): print("Warning: Some datetime_est values are NaT due to ambiguous time handling") # 解析测试日期和时间 try: test_date = pd.to_datetime(test_date, format='%Y-%m-%d').date() test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True) except ValueError as e: print(f"Error parsing date/time: {e}") return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")] # 计算周期开始时间(上一个周五 12:00 PM) test_date_only = test_datetime.replace(hour=0, minute=0, second=0, microsecond=0) # 只考虑日期部分 days_to_last_friday = (test_date_only.weekday() - 4) % 7 # 4 表示周五 cycle_start = test_date_only - timedelta(days=days_to_last_friday) cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0) # 已经是 tz-aware,直接调整时间 # 确保周期结束时间(下周五 12:00 PM EDT)考虑夏令时 cycle_end = cycle_start + timedelta(days=7) if cycle_end.month == 3 and 8 <= cycle_end.day <= 14: # 粗略检查夏令时开始(3月第二个星期日) cycle_end = cycle_end.tz_convert(est) # 转换为 EDT else: cycle_end = cycle_end.tz_convert(est) # 保持一致 # 调试:打印周期信息 print(f"Cycle Start: {cycle_start}, Cycle End: {cycle_end}") # 过滤周期内的数据 cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)] if cycle_data.empty: return [html.Div(f"No data available in cycle from {cycle_start} to {test_datetime}")] tweet_count = cycle_data['tweet_count'].sum() # 计算实际最终推文数(周期结束时的总数) actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)] if actual_data.empty: return [html.Div(f"No data available for cycle ending {cycle_end}")] actual_end_count = actual_data['tweet_count'].sum() # 计算 days_to_next_friday(从 test_datetime 到周期结束) days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60) # 设置预测范围 prob_start = actual_end_count * 0.9 prob_end = actual_end_count * 1.1 # 计算概率 probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) prob_min, prob_max = map(float, probability.split(" - ")) formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" # 构建测试结果表格 test_table_rows = [ html.Tr([html.Th("Test Date and Time:", colSpan=2), html.Td(str(test_datetime), colSpan=6)]), html.Tr([html.Th("Tweet Count at Test Time:", colSpan=2), html.Td(str(tweet_count), colSpan=6)]), html.Tr([html.Th("Actual Final Tweet Count:", colSpan=2), html.Td(str(actual_end_count), colSpan=6)]), html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=2), html.Td(formatted_probability, colSpan=6)]), html.Tr([html.Th("Does Actual Fall in Range?", colSpan=2), html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No", colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})]) ] if prob_start <= actual_end_count <= prob_end: expected_prob = (prob_max + prob_min) / 2 test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=2), html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6)])) else: test_table_rows.append(html.Tr([html.Th("Note:", colSpan=2), html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})])) test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'}) return [test_table]