elon_py/pkg/dash/func/info_test.py
2025-03-20 09:48:47 +08:00

219 lines
9.9 KiB
Python

from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
import os
import csv
import pandas as pd
import re
from datetime import timedelta
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
# 全局数据,避免重复加载
global_data = None
def initialize_global_data():
global global_data
if global_data is None:
global_data = render_data.global_agg_df.copy()
# 预计算常用列,避免重复操作
global_data['hours'] = global_data['minute_of_day'] // 60
global_data['minutes'] = global_data['minute_of_day'] % 60
global_data['datetime_est'] = pd.to_datetime(
global_data['date'].astype(str) + ' ' +
global_data['hours'].astype(str) + ':' +
global_data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize('US/Eastern', ambiguous='NaT')
@app.callback(
[Output('test-info-tooltip', 'children')],
[Input('test-button', 'n_clicks'),
Input('test-date-input', 'date'),
Input('test-time-input', 'value')]
)
def update_test_info(n_clicks, test_date, test_time, data=None):
if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern')
data = data if data is not None else render_data.global_agg_df.copy()
if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$'
if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")]
try:
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError:
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
test_day_of_week = test_datetime.weekday()
test_hour = test_datetime.hour
days_since_last_friday = (test_day_of_week - 4) % 7
if test_hour < 12 and test_day_of_week == 4:
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7)
else:
cycle_start = test_datetime - timedelta(days=days_since_last_friday)
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
cycle_end = cycle_start + timedelta(days=7)
first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est)
else:
cycle_end = cycle_end.tz_convert(est)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
if days_to_next_friday <= 0:
return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")]
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty:
return [html.Div(f"No data available from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum()
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum()
days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60)
if days_elapsed <= 0:
return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")]
daily_avg = tweet_count / days_elapsed
predicted_end_count = daily_avg * 7
prob_start = predicted_end_count * 0.9
prob_end = predicted_end_count * 1.1
try:
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
except Exception as e:
return [html.Div(f"Error calculating probability: {str(e)}")]
test_table_rows = [
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]),
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]),
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]),
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]),
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4),
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
]
if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)]))
else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table]
def process_test_case(args):
test_datetime, data = args
test_date = test_datetime.date().strftime('%Y-%m-%d')
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1
result = update_test_info(n_clicks, test_date, test_time, data)
if isinstance(result[0], html.Table):
table = result[0]
rows = table.children
cycle_start = str(rows[0].children[1].children)
test_dt = str(rows[1].children[1].children)
cycle_end = str(rows[2].children[1].children)
tweet_count = int(rows[3].children[1].children)
actual_end_count = int(rows[4].children[1].children)
prob_range = rows[5].children[1].children
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
in_range = rows[6].children[1].children == "Yes"
expected_prob = None
note = ""
if len(rows) > 7:
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1])
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
return [
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
]
else:
return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000):
est = pytz.timezone('US/Eastern')
start_dt = pd.to_datetime(start_date).tz_localize(est)
end_dt = pd.to_datetime(end_date).tz_localize(est)
time_points = []
current_dt = start_dt
while current_dt <= end_dt:
time_points.append(current_dt)
current_dt += timedelta(hours=interval_hours)
headers = [
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
]
if not os.path.exists(output_file):
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
# 预加载数据
initialize_global_data()
data = global_data
total_steps = len(time_points)
max_workers = max_workers or os.cpu_count() or 4
chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数
# 分块处理时间点
chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)]
with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar:
for chunk in chunks:
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
pbar.update(1)
except Exception as e:
test_datetime = futures[future]
results.append([test_datetime.date().strftime('%Y-%m-%d'),
test_datetime.time().strftime('%H:%M:%S'),
"", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"])
pbar.update(1)
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(results)
if __name__ == "__main__":
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000)