from pkg.dash.func.info_func import * from pkg.dash.app_init import app from dash.dependencies import Input, Output from dash import html import pandas as pd from datetime import timedelta @app.callback( [Output('test-info-tooltip', 'children')], [Input('test-button', 'n_clicks'), Input('test-date-input', 'date'), Input('test-time-input', 'value')] ) def update_test_info(n_clicks, test_date, test_time): if n_clicks == 0: return [html.Div("Click 'Test' to see historical probability results.")] est = pytz.timezone('US/Eastern') # 解析测试日期和时间 try: test_date = pd.to_datetime(test_date).date() test_datetime = pd.to_datetime(f"{test_date} {test_time}").tz_localize(est) # 使用 est except ValueError: return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")] # 1. 计算到 test_datetime 的累计推文数(模拟当时的 tweet_count) data = render_data.global_agg_df.copy() historical_data = data[data['datetime_est'] <= test_datetime] if historical_data.empty: return [html.Div(f"No data available up to {test_datetime}")] tweet_count = historical_data['tweet_count'].sum() # 2. 计算实际最终推文数(到当天结束时的总数) day_end = pd.to_datetime(f"{test_date} 23:59:59").tz_localize(est) # 使用 est actual_data = data[(data['date'] == test_date) & (data['datetime_est'] <= day_end)] if actual_data.empty: return [html.Div(f"No data available for {test_date}")] actual_end_count = actual_data['tweet_count'].sum() # 3. 模拟 days_to_next_friday(从 test_datetime 到下周五) days_to_next_friday = (4 - test_date.weekday()) % 7 next_friday = (test_datetime.replace(hour=12, minute=0, second=0, microsecond=0) + timedelta(days=days_to_next_friday)) if test_datetime > next_friday: next_friday += timedelta(days=7) days_to_next_friday = (next_friday - test_datetime).total_seconds() / (24 * 60 * 60) # 4. 设置预测范围(基于实际最终推文数的 ±10%) prob_start = actual_end_count * 0.9 # 90% of actual prob_end = actual_end_count * 1.1 # 110% of actual # 5. 调用原始的 calculate_tweet_probability() 计算概率 probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) prob_min, prob_max = map(float, probability.split(" - ")) formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" # 6. 构建测试结果表格 test_table_rows = [ html.Tr([ html.Th("Test Date and Time:", colSpan=2, style={'paddingRight': '10px'}), html.Td(str(test_datetime), colSpan=6, style={'paddingRight': '10px'}) ]), html.Tr([ html.Th("Tweet Count at Test Time:", colSpan=2, style={'paddingRight': '10px'}), html.Td(str(tweet_count), colSpan=6, style={'paddingRight': '10px'}) ]), html.Tr([ html.Th("Actual Final Tweet Count:", colSpan=2, style={'paddingRight': '10px'}), html.Td(str(actual_end_count), colSpan=6, style={'paddingRight': '10px'}) ]), html.Tr([ html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=2, style={'paddingRight': '10px'}), html.Td(formatted_probability, colSpan=6, style={'paddingRight': '10px'}) ]), html.Tr([ html.Th("Does Actual Fall in Range?", colSpan=2, style={'paddingRight': '10px'}), html.Td( "Yes" if prob_start <= actual_end_count <= prob_end else "No", colSpan=6, style={'paddingRight': '10px', 'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'} ) ]) ] if prob_start <= actual_end_count <= prob_end: expected_prob = (prob_max + prob_min) / 2 test_table_rows.append( html.Tr([ html.Th("Expected Probability:", colSpan=2, style={'paddingRight': '10px'}), html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6, style={'paddingRight': '10px'}) ]) ) else: test_table_rows.append( html.Tr([ html.Th("Note:", colSpan=2, style={'paddingRight': '10px'}), html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'paddingRight': '10px', 'color': 'red'}) ]) ) test_table = html.Table(test_table_rows, style={ 'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse' }) return [test_table]