diff --git a/pkg/dash/func/info_func.py b/pkg/dash/func/info_func.py index d397c7d..149eba0 100644 --- a/pkg/dash/func/info_func.py +++ b/pkg/dash/func/info_func.py @@ -163,8 +163,10 @@ def calculate_avg_tweets_per_day(target, now, remain): def calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end, peak_percentile=75): remaining_hours = days_to_next_friday * 24 - hourly_weights = get_dynamic_hourly_weights() + if remaining_hours <= 0: + return "0.0000 - 0.0000" # 测试时间超出周期结束,返回默认值 + hourly_weights = get_dynamic_hourly_weights() data = get_last_7_days_data() if data.empty: recent_tweets = [70] * 7 diff --git a/pkg/dash/func/info_test.py b/pkg/dash/func/info_test.py index 2009a0a..5b9e6d2 100644 --- a/pkg/dash/func/info_test.py +++ b/pkg/dash/func/info_test.py @@ -7,6 +7,26 @@ import csv import pandas as pd import re from datetime import timedelta +from tqdm import tqdm +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing as mp + +# 全局数据,避免重复加载 +global_data = None + +def initialize_global_data(): + global global_data + if global_data is None: + global_data = render_data.global_agg_df.copy() + # 预计算常用列,避免重复操作 + global_data['hours'] = global_data['minute_of_day'] // 60 + global_data['minutes'] = global_data['minute_of_day'] % 60 + global_data['datetime_est'] = pd.to_datetime( + global_data['date'].astype(str) + ' ' + + global_data['hours'].astype(str) + ':' + + global_data['minutes'].astype(str) + ':00', + errors='coerce' + ).dt.tz_localize('US/Eastern', ambiguous='NaT') @app.callback( [Output('test-info-tooltip', 'children')], @@ -14,94 +34,73 @@ from datetime import timedelta Input('test-date-input', 'date'), Input('test-time-input', 'value')] ) -def update_test_info(n_clicks, test_date, test_time): +def update_test_info(n_clicks, test_date, test_time, data=None): if n_clicks == 0: return [html.Div("Click 'Test' to see historical probability results.")] est = pytz.timezone('US/Eastern') - data = render_data.global_agg_df.copy() + data = data if data is not None else render_data.global_agg_df.copy() - # 调试:打印输入值 - print(f"test_date: {test_date}, test_time: {test_time}") - - # 检查输入是否为空 if not test_date or not test_time: return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")] - # 验证时间格式 - time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' # HH:MM:SS (00:00:00 to 23:59:59) + time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' if not re.match(time_pattern, test_time): - return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00) with hours 00-23, minutes 00-59, seconds 00-59.")] + return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")] - # 重构 datetime_est,处理夏令时模糊时间 - data['hours'] = data['minute_of_day'] // 60 - data['minutes'] = data['minute_of_day'] % 60 - data['datetime_est'] = pd.to_datetime( - data['date'].astype(str) + ' ' + - data['hours'].astype(str) + ':' + - data['minutes'].astype(str) + ':00', - errors='coerce' - ).dt.tz_localize(est, ambiguous='NaT') - - if data['datetime_est'].isna().any(): - print("Warning: Some datetime_est values are NaT due to ambiguous time handling") - - # 解析测试日期和时间 try: - test_date = pd.to_datetime(test_date, format='%Y-%m-%d').date() test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True) - except ValueError as e: - print(f"Error parsing date/time: {e}") + except ValueError: return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")] - # 计算周期开始时间(上一个或当前周五 12:00 PM) - test_day_of_week = test_datetime.weekday() # 0 = Monday, 4 = Friday + test_day_of_week = test_datetime.weekday() test_hour = test_datetime.hour - days_since_last_friday = (test_day_of_week - 4) % 7 # 4 表示周五 - if test_hour < 12 and test_day_of_week == 4: # Before 12 PM on Friday - cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7) # Previous Friday + days_since_last_friday = (test_day_of_week - 4) % 7 + if test_hour < 12 and test_day_of_week == 4: + cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7) else: - cycle_start = test_datetime - timedelta(days=days_since_last_friday) # Current or next Friday + cycle_start = test_datetime - timedelta(days=days_since_last_friday) cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0) - # 确保周期结束时间(下周五 12:00 PM EDT)考虑夏令时 cycle_end = cycle_start + timedelta(days=7) - # 精确计算夏令时开始(2025年3月9日,第二个星期日) first_day = cycle_end.replace(day=1) second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7) if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2): - cycle_end = cycle_end.tz_convert(est) # EDT + cycle_end = cycle_end.tz_convert(est) else: - cycle_end = cycle_end.tz_convert(est) # EST 或 EDT + cycle_end = cycle_end.tz_convert(est) - # 调试:打印周期信息 - print(f"Cycle Start: {cycle_start}, Cycle End: {cycle_end}") + days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60) + if days_to_next_friday <= 0: + return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")] - # 过滤周期内的数据(从周期开始到测试时间) cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)] if cycle_data.empty: - return [html.Div(f"No data available in cycle from {cycle_start} to {test_datetime}")] + return [html.Div(f"No data available from {cycle_start} to {test_datetime}")] + tweet_count = cycle_data['tweet_count'].sum() - # 计算实际最终推文数(周期结束时的总数) actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)] if actual_data.empty: return [html.Div(f"No data available for cycle ending {cycle_end}")] actual_end_count = actual_data['tweet_count'].sum() - # 计算 days_to_next_friday(从 test_datetime 到周期结束) - days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60) + days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60) + if days_elapsed <= 0: + return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")] - # 设置预测范围 - prob_start = actual_end_count * 0.9 - prob_end = actual_end_count * 1.1 + daily_avg = tweet_count / days_elapsed + predicted_end_count = daily_avg * 7 + prob_start = predicted_end_count * 0.9 + prob_end = predicted_end_count * 1.1 - # 计算概率 - probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) - prob_min, prob_max = map(float, probability.split(" - ")) - formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" + try: + probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) + prob_min, prob_max = map(float, probability.split(" - ")) + formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" + except Exception as e: + return [html.Div(f"Error calculating probability: {str(e)}")] - # 构建测试结果表格(包含 Cycle End) test_table_rows = [ html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]), html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]), @@ -116,19 +115,55 @@ def update_test_info(n_clicks, test_date, test_time): if prob_start <= actual_end_count <= prob_end: expected_prob = (prob_max + prob_min) / 2 test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4), - html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6)])) + html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)])) else: test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4), - html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})])) + html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})])) test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'}) return [test_table] -def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv"): - est = pytz.timezone('US/Eastern') +def process_test_case(args): + test_datetime, data = args + test_date = test_datetime.date().strftime('%Y-%m-%d') + test_time = test_datetime.time().strftime('%H:%M:%S') + n_clicks = 1 - # 生成测试时间序列 + result = update_test_info(n_clicks, test_date, test_time, data) + + if isinstance(result[0], html.Table): + table = result[0] + rows = table.children + + cycle_start = str(rows[0].children[1].children) + test_dt = str(rows[1].children[1].children) + cycle_end = str(rows[2].children[1].children) + tweet_count = int(rows[3].children[1].children) + actual_end_count = int(rows[4].children[1].children) + prob_range = rows[5].children[1].children + prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")] + prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-")) + in_range = rows[6].children[1].children == "Yes" + expected_prob = None + note = "" + if len(rows) > 7: + if "Expected" in rows[7].children[0].children: + expected_prob = float(rows[7].children[1].children.split()[0][1:-1]) + elif "Note" in rows[7].children[0].children: + note = rows[7].children[1].children + + return [ + test_date, test_time, cycle_start, cycle_end, tweet_count, + actual_end_count, prob_start, prob_end, prob_min, prob_max, + "Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note + ] + else: + return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children] + + +def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000): + est = pytz.timezone('US/Eastern') start_dt = pd.to_datetime(start_date).tz_localize(est) end_dt = pd.to_datetime(end_date).tz_localize(est) time_points = [] @@ -137,71 +172,49 @@ def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours time_points.append(current_dt) current_dt += timedelta(hours=interval_hours) - # 准备 CSV 文件 headers = [ "Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time", "Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End", "Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note" ] - # 如果文件不存在,写入表头 if not os.path.exists(output_file): with open(output_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(headers) - # 循环测试 - for test_datetime in time_points: - test_date = test_datetime.date().strftime('%Y-%m-%d') - test_time = test_datetime.time().strftime('%H:%M:%S') - n_clicks = 1 # 假设已点击 + # 预加载数据 + initialize_global_data() + data = global_data - # 调用原始函数 - result = update_test_info(n_clicks, test_date, test_time) + total_steps = len(time_points) + max_workers = max_workers or os.cpu_count() or 4 + chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数 - # 解析结果 - if isinstance(result[0], html.Table): - table = result[0] - rows = table.children + # 分块处理时间点 + chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)] - # 提取数据 - cycle_start = str(rows[0].children[1].children) - test_dt = str(rows[1].children[1].children) - cycle_end = str(rows[2].children[1].children) - tweet_count = int(rows[3].children[1].children) - actual_end_count = int(rows[4].children[1].children) - prob_range = rows[5].children[1].children # 例如 "2.74% - 3.25%" - # 移除 % 符号并转换为浮点数 - prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")] - # 从表头提取预测范围 - prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-")) - in_range = rows[6].children[1].children == "Yes" - # 检查是否有 Expected Probability 或 Note - expected_prob = None - note = "" - if len(rows) > 7: - if "Expected" in rows[7].children[0].children: - expected_prob = float(rows[7].children[1].children.split()[0][1:-1]) # 移除 "~" 和 "%" - elif "Note" in rows[7].children[0].children: - note = rows[7].children[1].children + with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar: + for chunk in chunks: + results = [] + with ProcessPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk} + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + pbar.update(1) + except Exception as e: + test_datetime = futures[future] + results.append([test_datetime.date().strftime('%Y-%m-%d'), + test_datetime.time().strftime('%H:%M:%S'), + "", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"]) + pbar.update(1) - # 写入 CSV + # 每处理完一个块写入 CSV with open(output_file, 'a', newline='') as f: writer = csv.writer(f) - writer.writerow([ - test_date, test_time, cycle_start, cycle_end, tweet_count, - actual_end_count, prob_start, prob_end, prob_min, prob_max, - "Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note - ]) - else: - # 如果返回错误信息,也记录 - with open(output_file, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerow([test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]) + writer.writerows(results) - print(f"Processed: {test_date} {test_time}") - - -# 运行测试 if __name__ == "__main__": - run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv") \ No newline at end of file + run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b886d70..ea2bcd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ matplotlib~=3.10.1 numpy~=2.2.3 scipy~=1.15.2 ipython~=8.32.0 -Flask~=3.0.3 \ No newline at end of file +Flask~=3.0.3 +tqdm~=4.67.1 \ No newline at end of file