From 681f501c1cce34da71e340d6e33e079799d5af12 Mon Sep 17 00:00:00 2001 From: NY Date: Fri, 21 Mar 2025 12:22:12 +0800 Subject: [PATCH] -line chart -Probability --- pkg/dash/app_html.py | 116 ++------------------ pkg/dash/func/info_m.py | 37 ------- pkg/dash/func/info_test.py | 219 ------------------------------------- pkg/dash/func/render.py | 63 +++-------- pkg/dash/func/ui.py | 9 +- 5 files changed, 27 insertions(+), 417 deletions(-) delete mode 100644 pkg/dash/func/info_m.py delete mode 100644 pkg/dash/func/info_test.py diff --git a/pkg/dash/app_html.py b/pkg/dash/app_html.py index a9bce85..7b40290 100644 --- a/pkg/dash/app_html.py +++ b/pkg/dash/app_html.py @@ -2,6 +2,7 @@ from datetime import timedelta, datetime from dash import dcc, html from pkg.config import interval_options, days_options, render_data + def layout_config(app): app.layout = html.Div([ html.Div( @@ -112,27 +113,16 @@ def layout_config(app): 'zIndex': 1000 } ), - # Main content html.Div([ - html.H1("Elon Musk Tweet Time Analysis (EST)"), - html.Div(id='date-picker-container', children=[ + html.Div(id='multi-interval-container', children=[ dcc.Dropdown( - id='multi-date-picker', - options=[{'label': str(date), 'value': str(date)} for date in render_data.all_dates], - value=render_data.default_date, - multi=True, - searchable=True, - placeholder="Search and select dates (YYYY-MM-DD)", - style={'width': '100%'} + id='multi-interval-picker', + options=interval_options, + value=10, + style={'width': '50%', 'marginTop': '10px'} ) ]), - dcc.Dropdown( - id='multi-interval-picker', - options=interval_options, - value=10, - style={'width': '50%', 'marginTop': '10px'} - ), - html.Div(id='days-display-container', style={'display': 'none'}, children=[ + html.Div(id='days-display-container', children=[ dcc.Dropdown( id='days-display-picker', options=days_options, @@ -141,18 +131,8 @@ def layout_config(app): ) ]), html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}), - dcc.Checklist( - id='time-zone-checklist', - options=[ - {'label': 'California Time (PST)', 'value': 'PST'}, - {'label': 'Texas Time (CST)', 'value': 'CST'} - ], - value=['PST'], - style={'margin': '10px'} - ), html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}), - dcc.Tabs(id='tabs', value='line', children=[ - dcc.Tab(label='Line', value='line'), + dcc.Tabs(id='tabs', value='heatmap', children=[ dcc.Tab(label='Heatmap', value='heatmap'), dcc.Tab(label='Heatmap(1-day)', value='one_day_heatmap'), ]), @@ -193,83 +173,7 @@ def layout_config(app): style={'width': '100%'} ) ) - ]), - html.Tr([ - html.Td("Predict Tweets Start:", style={'paddingRight': '10px'}), - html.Td( - dcc.Input( - id='prob-start-input', - type='number', - placeholder='输入 Probability Start 值', - value=525, - style={'width': '100%'} - ) - ) - ]), - html.Tr([ - html.Td("Predict Tweets End:", style={'paddingRight': '10px'}), - html.Td( - dcc.Input( - id='prob-end-input', - type='number', - placeholder='输入 Probability End 值', - value=549, - style={'width': '100%'} - ) - ) - ]), - html.Tr([ - html.Td("Calculate Probability:", style={'paddingRight': '10px'}), - html.Td( - html.Button('Calculate', id='update-button', n_clicks=0) - ) - ]), - html.Tr(id='manual-info-tooltip', style={'margin': '10px'}) - ], style={ - 'width': '50%', - 'marginTop': '10px', - 'borderCollapse': 'collapse' - }), - # 新增测试区域 - html.H2("Historical Probability Test", style={'marginTop': '20px'}), - html.Table([ - html.Tr([ - html.Td("Test Date:", style={'paddingRight': '10px'}), - html.Td( - dcc.DatePickerSingle( - id='test-date-input', - date=(datetime.now().date() - timedelta(days=1)).strftime('%Y-%m-%d'), # 默认昨天 - display_format='YYYY-MM-DD', - style={'width': '100%'} - ) - ) - ]), - html.Tr([ - html.Td("Test Time:", style={'paddingRight': '10px'}), - html.Td( - html.Div([ - dcc.Input( - id='test-time-input', - type='text', - placeholder='HH:MM:SS (e.g., 12:00:00)', # 增强提示 - value='12:00:00', - pattern='[0-2][0-9]:[0-5][0-9]:[0-5][0-9]', # 限制格式 - style={'width': '100%'} - ), - html.Span( - "Enter time in HH:MM:SS format (e.g., 12:00:00)", - style={'fontSize': '12px', 'color': 'gray', 'marginTop': '5px', 'display': 'block'} - ) - ]) - ) - ]), - html.Tr([ - html.Td("Test Probability:", style={'paddingRight': '10px'}), - html.Td( - html.Button('Test', id='test-button', n_clicks=0) - ) - ]), - html.Tr(id='test-info-tooltip', style={'margin': '10px'}) + ]) ], style={ 'width': '50%', 'marginTop': '10px', @@ -279,4 +183,4 @@ def layout_config(app): dcc.Interval(id='clock-interval', interval=1000, n_intervals=0) ]) - return app \ No newline at end of file + return app diff --git a/pkg/dash/func/info_m.py b/pkg/dash/func/info_m.py deleted file mode 100644 index 3c8b446..0000000 --- a/pkg/dash/func/info_m.py +++ /dev/null @@ -1,37 +0,0 @@ -from pkg.dash.func.info_func import * -from pkg.dash.app_init import app -from dash.dependencies import Input, Output -from dash import html - -@app.callback( - [Output('manual-info-tooltip', 'children')], - [Input('update-button', 'n_clicks'), - Input('prob-start-input', 'value'), - Input('prob-end-input', 'value')] -) -def update_info_manual(n_clicks, prob_start, prob_end): - if n_clicks == 0: - return [html.Div("Click 'Manual Update' to see results.")] - - tweet_count, days_to_next_friday = get_pace_params() - prob_start = int(prob_start) if prob_start is not None else 525 - prob_end = int(prob_end) if prob_end is not None else 549 - - probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) - - prob_low, prob_high = map(float, probability.split(" - ")) - formatted_probability = f"{prob_low * 100:.2f}% - {prob_high * 100:.2f}%" - - pace_table_rows = [ - html.Tr([ - html.Th(f"Probability ({prob_start}-{prob_end})", colSpan=2, style={'paddingRight': '10px'}), - html.Td(formatted_probability, colSpan=6, style={'paddingRight': '10px'}) - ]) - ] - pace_table = html.Table(pace_table_rows, style={ - 'width': '100%', - 'textAlign': 'left', - 'borderCollapse': 'collapse' - }) - return [pace_table] - diff --git a/pkg/dash/func/info_test.py b/pkg/dash/func/info_test.py deleted file mode 100644 index 48ed5cf..0000000 --- a/pkg/dash/func/info_test.py +++ /dev/null @@ -1,219 +0,0 @@ -from pkg.dash.func.info_func import * -from pkg.dash.app_init import app -from dash.dependencies import Input, Output -from dash import html -import os -import csv -import pandas as pd -import re -from datetime import timedelta -from tqdm import tqdm -from concurrent.futures import ProcessPoolExecutor, as_completed -import multiprocessing as mp - -# 全局数据,避免重复加载 -global_data = None - -def initialize_global_data(): - global global_data - if global_data is None: - global_data = render_data.global_agg_df.copy() - # 预计算常用列,避免重复操作 - global_data['hours'] = global_data['minute_of_day'] // 60 - global_data['minutes'] = global_data['minute_of_day'] % 60 - global_data['datetime_est'] = pd.to_datetime( - global_data['date'].astype(str) + ' ' + - global_data['hours'].astype(str) + ':' + - global_data['minutes'].astype(str) + ':00', - errors='coerce' - ).dt.tz_localize('US/Eastern', ambiguous='NaT') - -@app.callback( - [Output('test-info-tooltip', 'children')], - [Input('test-button', 'n_clicks'), - Input('test-date-input', 'date'), - Input('test-time-input', 'value')] -) -def update_test_info(n_clicks, test_date, test_time, data=None): - if n_clicks == 0: - return [html.Div("Click 'Test' to see historical probability results.")] - - est = pytz.timezone('US/Eastern') - data = data if data is not None else render_data.global_agg_df.copy() - - if not test_date or not test_time: - return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")] - - time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$' - if not re.match(time_pattern, test_time): - return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")] - - try: - test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True) - except ValueError: - return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")] - - test_day_of_week = test_datetime.weekday() - test_hour = test_datetime.hour - days_since_last_friday = (test_day_of_week - 4) % 7 - if test_hour < 12 and test_day_of_week == 4: - cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7) - else: - cycle_start = test_datetime - timedelta(days=days_since_last_friday) - cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0) - - cycle_end = cycle_start + timedelta(days=7) - first_day = cycle_end.replace(day=1) - second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7) - if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2): - cycle_end = cycle_end.tz_convert(est) - else: - cycle_end = cycle_end.tz_convert(est) - - days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60) - if days_to_next_friday <= 0: - return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")] - - cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)] - if cycle_data.empty: - return [html.Div(f"No data available from {cycle_start} to {test_datetime}")] - - tweet_count = cycle_data['tweet_count'].sum() - - actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)] - if actual_data.empty: - return [html.Div(f"No data available for cycle ending {cycle_end}")] - actual_end_count = actual_data['tweet_count'].sum() - - days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60) - if days_elapsed <= 0: - return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")] - - daily_avg = tweet_count / days_elapsed - predicted_end_count = daily_avg * 7 - prob_start = predicted_end_count * 0.9 - prob_end = predicted_end_count * 1.1 - - try: - probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end) - prob_min, prob_max = map(float, probability.split(" - ")) - formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%" - except Exception as e: - return [html.Div(f"Error calculating probability: {str(e)}")] - - test_table_rows = [ - html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]), - html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]), - html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]), - html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]), - html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]), - html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]), - html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4), - html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No", - colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})]) - ] - if prob_start <= actual_end_count <= prob_end: - expected_prob = (prob_max + prob_min) / 2 - test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4), - html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)])) - else: - test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4), - html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})])) - - test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'}) - return [test_table] - - -def process_test_case(args): - test_datetime, data = args - test_date = test_datetime.date().strftime('%Y-%m-%d') - test_time = test_datetime.time().strftime('%H:%M:%S') - n_clicks = 1 - - result = update_test_info(n_clicks, test_date, test_time, data) - - if isinstance(result[0], html.Table): - table = result[0] - rows = table.children - - cycle_start = str(rows[0].children[1].children) - test_dt = str(rows[1].children[1].children) - cycle_end = str(rows[2].children[1].children) - tweet_count = int(rows[3].children[1].children) - actual_end_count = int(rows[4].children[1].children) - prob_range = rows[5].children[1].children - prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")] - prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-")) - in_range = rows[6].children[1].children == "Yes" - expected_prob = None - note = "" - if len(rows) > 7: - if "Expected" in rows[7].children[0].children: - expected_prob = float(rows[7].children[1].children.split()[0][1:-1]) - elif "Note" in rows[7].children[0].children: - note = rows[7].children[1].children - - return [ - test_date, test_time, cycle_start, cycle_end, tweet_count, - actual_end_count, prob_start, prob_end, prob_min, prob_max, - "Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note - ] - else: - return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children] - - -def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000): - est = pytz.timezone('US/Eastern') - start_dt = pd.to_datetime(start_date).tz_localize(est) - end_dt = pd.to_datetime(end_date).tz_localize(est) - time_points = [] - current_dt = start_dt - while current_dt <= end_dt: - time_points.append(current_dt) - current_dt += timedelta(hours=interval_hours) - - headers = [ - "Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time", - "Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End", - "Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note" - ] - - if not os.path.exists(output_file): - with open(output_file, 'w', newline='') as f: - writer = csv.writer(f) - writer.writerow(headers) - - # 预加载数据 - initialize_global_data() - data = global_data - - total_steps = len(time_points) - max_workers = max_workers or os.cpu_count() or 4 - chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数 - - # 分块处理时间点 - chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)] - - with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar: - for chunk in chunks: - results = [] - with ProcessPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk} - for future in as_completed(futures): - try: - result = future.result() - results.append(result) - pbar.update(1) - except Exception as e: - test_datetime = futures[future] - results.append([test_datetime.date().strftime('%Y-%m-%d'), - test_datetime.time().strftime('%H:%M:%S'), - "", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"]) - pbar.update(1) - - with open(output_file, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(results) - -if __name__ == "__main__": - run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000) \ No newline at end of file diff --git a/pkg/dash/func/render.py b/pkg/dash/func/render.py index ddeaa5f..f5aeb05 100644 --- a/pkg/dash/func/render.py +++ b/pkg/dash/func/render.py @@ -2,37 +2,25 @@ from datetime import datetime, timedelta from dash.dependencies import Input, Output from pkg.dash.app_init import app from pkg.config import render_data -from pkg.tool import aggregate_data, generate_xticks, minutes_to_time, get_tweets_since_last_friday +from pkg.tool import aggregate_data, minutes_to_time, get_tweets_since_last_friday from dash import dcc import plotly.graph_objs as go import pandas as pd - @app.callback( [Output('tabs-content', 'children'), Output('multi-day-warning', 'children'), Output('multi-tweet-summary', 'children')], [Input('tabs', 'value'), - Input('multi-date-picker', 'value'), Input('multi-interval-picker', 'value'), - Input('time-zone-checklist', 'value'), Input('days-display-picker', 'value')] ) -def render_tab_content(tab, selected_dates, interval, time_zones, days_to_display): +def render_tab_content(tab, interval, days_to_display): warning = "" - if tab == 'line': - if not selected_dates: # Handle None or empty list - selected_dates = [datetime.now().date()] # Default to today - warning = "No dates selected. Showing today’s data." - if len(selected_dates) > 10: - selected_dates = selected_dates[:10] - warning = "Maximum of 10 days can be selected. Showing first 10 selected days." - selected_dates = [datetime.strptime(date, '%Y-%m-%d').date() for date in selected_dates] - else: - available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True) - selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()] - if not available_dates: - warning = "No data available. Showing today’s date with zero tweets." + available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True) + selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()] + if not available_dates: + warning = "No data available. Showing today’s date with zero tweets." multi_data_agg = render_data.global_agg_df[render_data.global_agg_df['date'].isin(selected_dates)].copy() if multi_data_agg.empty: @@ -47,23 +35,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa tweet_count_total = 0 agg_data = aggregate_data(multi_data_agg, interval) - xticks, xtick_labels = generate_xticks(interval) - if tab == 'line': - fig = go.Figure() - for date in selected_dates: - day_data = agg_data[agg_data['date'] == date] - hover_times = [f"{date} {minutes_to_time(minute)} EST" for minute in day_data['interval_group']] - fig.add_trace(go.Scatter( - x=day_data['interval_group'], - y=day_data['tweet_count'], - mode='lines', - name=str(date), - customdata=hover_times, - hovertemplate='%{customdata}
Tweets: %{y}' - )) - - elif tab == 'heatmap': + if tab == 'heatmap': pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0) pivot_data.index = pivot_data.index.astype(str) fig = go.Figure(data=go.Heatmap( @@ -77,7 +50,7 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa for i, date_str in enumerate(pivot_data.index): date = datetime.strptime(date_str, '%Y-%m-%d').date() - if date.weekday() == 4: # Friday + if date.weekday() == 4: prev_date = date - timedelta(days=1) if str(prev_date) in pivot_data.index: y_position = i / len(pivot_data.index) @@ -105,8 +78,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa for _, row in one_day_data.iterrows(): minute = row['interval_group'] - hour = int(minute // 60) # Convert to integer - interval_idx = int((minute % 60) // interval) # Convert to integer + hour = int(minute // 60) + interval_idx = int((minute % 60) // interval) if hour < 24: z_values[hour][interval_idx] = row['tweet_count'] @@ -126,20 +99,12 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa hovertemplate='%{y}:%{x} EST
Tweets: %{z}
Rate: %{customdata:.2%}' )) - if tab in ['line', 'one_day_heatmap']: fig.update_layout( - title=f'{"Line" if tab == "line" else "One-Day Heatmap"} Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)', - xaxis_title='Minutes' if tab == 'one_day_heatmap' else 'Eastern Time (HH:MM)', - yaxis_title='Hour of Day' if tab == 'one_day_heatmap' else 'Tweet Count', - xaxis=dict( - range=[0, 1440] if tab == 'line' else None, - tickvals=xticks if tab == 'line' else None, - ticktext=xtick_labels if tab == 'line' else None, - tickangle=45 if tab == 'line' else 0 - ), + title=f'One-Day Heatmap Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)', + xaxis_title='Minutes', + yaxis_title='Hour of Day', height=600, - showlegend=(tab == 'line'), - yaxis=dict(autorange='reversed') if tab == 'one_day_heatmap' else None + yaxis=dict(autorange='reversed') ) summary = f"Total tweets: {get_tweets_since_last_friday()}" diff --git a/pkg/dash/func/ui.py b/pkg/dash/func/ui.py index f447ff0..730c0cb 100644 --- a/pkg/dash/func/ui.py +++ b/pkg/dash/func/ui.py @@ -3,12 +3,9 @@ from dash.dependencies import Input, Output @app.callback( - [Output('date-picker-container', 'style'), - Output('days-display-container', 'style'), - Output('time-zone-checklist', 'style')], + [Output('days-display-container', 'style'), + Output('multi-interval-container', 'style')], [Input('tabs', 'value')] ) def toggle_controls_visibility(tab): - if tab == 'heatmap' or tab == 'one_day_heatmap': - return {'display': 'none'}, {'display': 'block'}, {'display': 'none'} - return {'display': 'block'}, {'display': 'none'}, {'display': 'block'} + return {'display': 'block'},{'display': 'block'}