This commit is contained in:
Pi 2025-03-16 17:08:51 +08:00
parent 70c1da09ef
commit 8f5050ab31
3 changed files with 123 additions and 107 deletions

View File

@ -163,8 +163,10 @@ def calculate_avg_tweets_per_day(target, now, remain):
def calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end, peak_percentile=75): def calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end, peak_percentile=75):
remaining_hours = days_to_next_friday * 24 remaining_hours = days_to_next_friday * 24
hourly_weights = get_dynamic_hourly_weights() if remaining_hours <= 0:
return "0.0000 - 0.0000" # 测试时间超出周期结束,返回默认值
hourly_weights = get_dynamic_hourly_weights()
data = get_last_7_days_data() data = get_last_7_days_data()
if data.empty: if data.empty:
recent_tweets = [70] * 7 recent_tweets = [70] * 7

View File

@ -7,6 +7,26 @@ import csv
import pandas as pd import pandas as pd
import re import re
from datetime import timedelta from datetime import timedelta
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
# 全局数据,避免重复加载
global_data = None
def initialize_global_data():
global global_data
if global_data is None:
global_data = render_data.global_agg_df.copy()
# 预计算常用列,避免重复操作
global_data['hours'] = global_data['minute_of_day'] // 60
global_data['minutes'] = global_data['minute_of_day'] % 60
global_data['datetime_est'] = pd.to_datetime(
global_data['date'].astype(str) + ' ' +
global_data['hours'].astype(str) + ':' +
global_data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize('US/Eastern', ambiguous='NaT')
@app.callback( @app.callback(
[Output('test-info-tooltip', 'children')], [Output('test-info-tooltip', 'children')],
@ -14,94 +34,73 @@ from datetime import timedelta
Input('test-date-input', 'date'), Input('test-date-input', 'date'),
Input('test-time-input', 'value')] Input('test-time-input', 'value')]
) )
def update_test_info(n_clicks, test_date, test_time): def update_test_info(n_clicks, test_date, test_time, data=None):
if n_clicks == 0: if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")] return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern') est = pytz.timezone('US/Eastern')
data = render_data.global_agg_df.copy() data = data if data is not None else render_data.global_agg_df.copy()
# 调试:打印输入值
print(f"test_date: {test_date}, test_time: {test_time}")
# 检查输入是否为空
if not test_date or not test_time: if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")] return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
# 验证时间格式 time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$'
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' # HH:MM:SS (00:00:00 to 23:59:59)
if not re.match(time_pattern, test_time): if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00) with hours 00-23, minutes 00-59, seconds 00-59.")] return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")]
# 重构 datetime_est处理夏令时模糊时间
data['hours'] = data['minute_of_day'] // 60
data['minutes'] = data['minute_of_day'] % 60
data['datetime_est'] = pd.to_datetime(
data['date'].astype(str) + ' ' +
data['hours'].astype(str) + ':' +
data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize(est, ambiguous='NaT')
if data['datetime_est'].isna().any():
print("Warning: Some datetime_est values are NaT due to ambiguous time handling")
# 解析测试日期和时间
try: try:
test_date = pd.to_datetime(test_date, format='%Y-%m-%d').date()
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True) test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError as e: except ValueError:
print(f"Error parsing date/time: {e}")
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")] return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
# 计算周期开始时间(上一个或当前周五 12:00 PM test_day_of_week = test_datetime.weekday()
test_day_of_week = test_datetime.weekday() # 0 = Monday, 4 = Friday
test_hour = test_datetime.hour test_hour = test_datetime.hour
days_since_last_friday = (test_day_of_week - 4) % 7 # 4 表示周五 days_since_last_friday = (test_day_of_week - 4) % 7
if test_hour < 12 and test_day_of_week == 4: # Before 12 PM on Friday if test_hour < 12 and test_day_of_week == 4:
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7) # Previous Friday cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7)
else: else:
cycle_start = test_datetime - timedelta(days=days_since_last_friday) # Current or next Friday cycle_start = test_datetime - timedelta(days=days_since_last_friday)
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0) cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
# 确保周期结束时间(下周五 12:00 PM EDT考虑夏令时
cycle_end = cycle_start + timedelta(days=7) cycle_end = cycle_start + timedelta(days=7)
# 精确计算夏令时开始2025年3月9日第二个星期日
first_day = cycle_end.replace(day=1) first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7) second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2): if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est) # EDT cycle_end = cycle_end.tz_convert(est)
else: else:
cycle_end = cycle_end.tz_convert(est) # EST 或 EDT cycle_end = cycle_end.tz_convert(est)
# 调试:打印周期信息 days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
print(f"Cycle Start: {cycle_start}, Cycle End: {cycle_end}") if days_to_next_friday <= 0:
return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")]
# 过滤周期内的数据(从周期开始到测试时间)
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)] cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty: if cycle_data.empty:
return [html.Div(f"No data available in cycle from {cycle_start} to {test_datetime}")] return [html.Div(f"No data available from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum() tweet_count = cycle_data['tweet_count'].sum()
# 计算实际最终推文数(周期结束时的总数)
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)] actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty: if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")] return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum() actual_end_count = actual_data['tweet_count'].sum()
# 计算 days_to_next_friday从 test_datetime 到周期结束) days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60) if days_elapsed <= 0:
return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")]
# 设置预测范围 daily_avg = tweet_count / days_elapsed
prob_start = actual_end_count * 0.9 predicted_end_count = daily_avg * 7
prob_end = actual_end_count * 1.1 prob_start = predicted_end_count * 0.9
prob_end = predicted_end_count * 1.1
# 计算概率 try:
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - ")) prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
except Exception as e:
return [html.Div(f"Error calculating probability: {str(e)}")]
# 构建测试结果表格(包含 Cycle End
test_table_rows = [ test_table_rows = [
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]), html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]), html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
@ -116,19 +115,55 @@ def update_test_info(n_clicks, test_date, test_time):
if prob_start <= actual_end_count <= prob_end: if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2 expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4), test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
html.Td(f"~{expected_prob * 100:.2f}% (should be high if model fits)", colSpan=6)])) html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)]))
else: else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4), test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
html.Td("Model prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})])) html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'}) test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table] return [test_table]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv"): def process_test_case(args):
est = pytz.timezone('US/Eastern') test_datetime, data = args
test_date = test_datetime.date().strftime('%Y-%m-%d')
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1
# 生成测试时间序列 result = update_test_info(n_clicks, test_date, test_time, data)
if isinstance(result[0], html.Table):
table = result[0]
rows = table.children
cycle_start = str(rows[0].children[1].children)
test_dt = str(rows[1].children[1].children)
cycle_end = str(rows[2].children[1].children)
tweet_count = int(rows[3].children[1].children)
actual_end_count = int(rows[4].children[1].children)
prob_range = rows[5].children[1].children
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
in_range = rows[6].children[1].children == "Yes"
expected_prob = None
note = ""
if len(rows) > 7:
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1])
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
return [
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
]
else:
return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000):
est = pytz.timezone('US/Eastern')
start_dt = pd.to_datetime(start_date).tz_localize(est) start_dt = pd.to_datetime(start_date).tz_localize(est)
end_dt = pd.to_datetime(end_date).tz_localize(est) end_dt = pd.to_datetime(end_date).tz_localize(est)
time_points = [] time_points = []
@ -137,71 +172,49 @@ def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours
time_points.append(current_dt) time_points.append(current_dt)
current_dt += timedelta(hours=interval_hours) current_dt += timedelta(hours=interval_hours)
# 准备 CSV 文件
headers = [ headers = [
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time", "Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End", "Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note" "Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
] ]
# 如果文件不存在,写入表头
if not os.path.exists(output_file): if not os.path.exists(output_file):
with open(output_file, 'w', newline='') as f: with open(output_file, 'w', newline='') as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow(headers) writer.writerow(headers)
# 循环测试 # 预加载数据
for test_datetime in time_points: initialize_global_data()
test_date = test_datetime.date().strftime('%Y-%m-%d') data = global_data
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1 # 假设已点击
# 调用原始函数 total_steps = len(time_points)
result = update_test_info(n_clicks, test_date, test_time) max_workers = max_workers or os.cpu_count() or 4
chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数
# 解析结果 # 分块处理时间点
if isinstance(result[0], html.Table): chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)]
table = result[0]
rows = table.children
# 提取数据 with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar:
cycle_start = str(rows[0].children[1].children) for chunk in chunks:
test_dt = str(rows[1].children[1].children) results = []
cycle_end = str(rows[2].children[1].children) with ProcessPoolExecutor(max_workers=max_workers) as executor:
tweet_count = int(rows[3].children[1].children) futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk}
actual_end_count = int(rows[4].children[1].children) for future in as_completed(futures):
prob_range = rows[5].children[1].children # 例如 "2.74% - 3.25%" try:
# 移除 % 符号并转换为浮点数 result = future.result()
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")] results.append(result)
# 从表头提取预测范围 pbar.update(1)
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-")) except Exception as e:
in_range = rows[6].children[1].children == "Yes" test_datetime = futures[future]
# 检查是否有 Expected Probability 或 Note results.append([test_datetime.date().strftime('%Y-%m-%d'),
expected_prob = None test_datetime.time().strftime('%H:%M:%S'),
note = "" "", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"])
if len(rows) > 7: pbar.update(1)
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1]) # 移除 "~" 和 "%"
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
# 写入 CSV # 每处理完一个块写入 CSV
with open(output_file, 'a', newline='') as f: with open(output_file, 'a', newline='') as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow([ writer.writerows(results)
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
])
else:
# 如果返回错误信息,也记录
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children])
print(f"Processed: {test_date} {test_time}")
# 运行测试
if __name__ == "__main__": if __name__ == "__main__":
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv") run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000)

View File

@ -11,3 +11,4 @@ numpy~=2.2.3
scipy~=1.15.2 scipy~=1.15.2
ipython~=8.32.0 ipython~=8.32.0
Flask~=3.0.3 Flask~=3.0.3
tqdm~=4.67.1