elon_py/pkg/dash/func/info_test.py
2025-03-14 17:55:31 +08:00

207 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
import os
import csv
import pandas as pd
import re
from datetime import timedelta
@app.callback(
[Output('test-info-tooltip', 'children')],
[Input('test-button', 'n_clicks'),
Input('test-date-input', 'date'),
Input('test-time-input', 'value')]
)
def update_test_info(n_clicks, test_date, test_time):
if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern')
data = render_data.global_agg_df.copy()
# 调试:打印输入值
print(f"test_date: {test_date}, test_time: {test_time}")
# 检查输入是否为空
if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
# 验证时间格式
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' # HH:MM:SS (00:00:00 to 23:59:59)
if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00) with hours 00-23, minutes 00-59, seconds 00-59.")]
# 重构 datetime_est处理夏令时模糊时间
data['hours'] = data['minute_of_day'] // 60
data['minutes'] = data['minute_of_day'] % 60
data['datetime_est'] = pd.to_datetime(
data['date'].astype(str) + ' ' +
data['hours'].astype(str) + ':' +
data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize(est, ambiguous='NaT')
if data['datetime_est'].isna().any():
print("Warning: Some datetime_est values are NaT due to ambiguous time handling")
# 解析测试日期和时间
try:
test_date = pd.to_datetime(test_date, format='%Y-%m-%d').date()
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError as e:
print(f"Error parsing date/time: {e}")
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
# 计算周期开始时间(上一个或当前周五 12:00 PM
test_day_of_week = test_datetime.weekday() # 0 = Monday, 4 = Friday
test_hour = test_datetime.hour
days_since_last_friday = (test_day_of_week - 4) % 7 # 4 表示周五
if test_hour < 12 and test_day_of_week == 4: # Before 12 PM on Friday
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7) # Previous Friday
else:
cycle_start = test_datetime - timedelta(days=days_since_last_friday) # Current or next Friday
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
# 确保周期结束时间(下周五 12:00 PM EDT考虑夏令时
cycle_end = cycle_start + timedelta(days=7)
# 精确计算夏令时开始2025年3月9日第二个星期日
first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est) # EDT
else:
cycle_end = cycle_end.tz_convert(est) # EST 或 EDT
# 调试:打印周期信息
print(f"Cycle Start: {cycle_start}, Cycle End: {cycle_end}")
# 过滤周期内的数据(从周期开始到测试时间)
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty:
return [html.Div(f"No data available in cycle from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum()
# 计算实际最终推文数(周期结束时的总数)
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum()
# 计算 days_to_next_friday从 test_datetime 到周期结束)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
# 设置预测范围
prob_start = actual_end_count * 0.9
prob_end = actual_end_count * 1.1
# 计算概率
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
# 构建测试结果表格(包含 Cycle End
test_table_rows = [
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]),
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]),
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]),
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]),
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4),
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
]
if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6)]))
else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv"):
est = pytz.timezone('US/Eastern')
# 生成测试时间序列
start_dt = pd.to_datetime(start_date).tz_localize(est)
end_dt = pd.to_datetime(end_date).tz_localize(est)
time_points = []
current_dt = start_dt
while current_dt <= end_dt:
time_points.append(current_dt)
current_dt += timedelta(hours=interval_hours)
# 准备 CSV 文件
headers = [
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
]
# 如果文件不存在,写入表头
if not os.path.exists(output_file):
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
# 循环测试
for test_datetime in time_points:
test_date = test_datetime.date().strftime('%Y-%m-%d')
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1 # 假设已点击
# 调用原始函数
result = update_test_info(n_clicks, test_date, test_time)
# 解析结果
if isinstance(result[0], html.Table):
table = result[0]
rows = table.children
# 提取数据
cycle_start = str(rows[0].children[1].children)
test_dt = str(rows[1].children[1].children)
cycle_end = str(rows[2].children[1].children)
tweet_count = int(rows[3].children[1].children)
actual_end_count = int(rows[4].children[1].children)
prob_range = rows[5].children[1].children # 例如 "2.74% - 3.25%"
# 移除 % 符号并转换为浮点数
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
# 从表头提取预测范围
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
in_range = rows[6].children[1].children == "Yes"
# 检查是否有 Expected Probability 或 Note
expected_prob = None
note = ""
if len(rows) > 7:
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1]) # 移除 "~" 和 "%"
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
# 写入 CSV
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
])
else:
# 如果返回错误信息,也记录
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children])
print(f"Processed: {test_date} {test_time}")
# 运行测试
if __name__ == "__main__":
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv")