Compare commits

...

5 Commits

Author SHA1 Message Date
NY
c40a1cb0dc fix 2025-05-09 08:40:07 +08:00
NY
1abc7e0b7c fix 2025-03-24 17:50:46 +08:00
NY
d7976d598e fix 2025-03-24 10:29:17 +08:00
NY
681f501c1c -line chart
-Probability
2025-03-21 12:22:12 +08:00
NY
abf820fe08 +selection 2025-03-21 11:57:34 +08:00
13 changed files with 598 additions and 420 deletions

View File

@ -26,12 +26,15 @@ interval_options = [
{'label': '5 minutes', 'value': 5},
{'label': '10 minutes', 'value': 10},
{'label': '30 minutes', 'value': 30},
{'label': '60 minutes', 'value': 60},
{'label': '⅓ day', 'value': 480},
{'label': '1 hour', 'value': 60},
{'label': '6 hours(1/4 day)', 'value': 360},
{'label': '8 hours(1/3 day)', 'value': 480},
{'label': '12 hours(1/2 day)', 'value': 720},
{'label': '1 day', 'value': 1440}
]
days_options = [
{'label': '7 days', 'value': 7},
{'label': '14 days', 'value': 14},
{'label': '30 days', 'value': 30},
{'label': '90 days', 'value': 90},
{'label': '120 days', 'value': 120},

View File

@ -2,6 +2,7 @@ from datetime import timedelta, datetime
from dash import dcc, html
from pkg.config import interval_options, days_options, render_data
def layout_config(app):
app.layout = html.Div([
html.Div(
@ -112,27 +113,16 @@ def layout_config(app):
'zIndex': 1000
}
),
# Main content
html.Div([
html.H1("Elon Musk Tweet Time Analysis (EST)"),
html.Div(id='date-picker-container', children=[
html.Div(id='multi-interval-container', children=[
dcc.Dropdown(
id='multi-date-picker',
options=[{'label': str(date), 'value': str(date)} for date in render_data.all_dates],
value=render_data.default_date,
multi=True,
searchable=True,
placeholder="Search and select dates (YYYY-MM-DD)",
style={'width': '100%'}
id='multi-interval-picker',
options=interval_options,
value=10,
style={'width': '50%', 'marginTop': '10px'}
)
]),
dcc.Dropdown(
id='multi-interval-picker',
options=interval_options,
value=10,
style={'width': '50%', 'marginTop': '10px'}
),
html.Div(id='days-display-container', style={'display': 'none'}, children=[
html.Div(id='days-display-container', children=[
dcc.Dropdown(
id='days-display-picker',
options=days_options,
@ -141,18 +131,8 @@ def layout_config(app):
)
]),
html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}),
dcc.Checklist(
id='time-zone-checklist',
options=[
{'label': 'California Time (PST)', 'value': 'PST'},
{'label': 'Texas Time (CST)', 'value': 'CST'}
],
value=['PST'],
style={'margin': '10px'}
),
html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}),
dcc.Tabs(id='tabs', value='line', children=[
dcc.Tab(label='Line', value='line'),
dcc.Tabs(id='tabs', value='heatmap', children=[
dcc.Tab(label='Heatmap', value='heatmap'),
dcc.Tab(label='Heatmap(1-day)', value='one_day_heatmap'),
]),
@ -193,83 +173,7 @@ def layout_config(app):
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Predict Tweets Start:", style={'paddingRight': '10px'}),
html.Td(
dcc.Input(
id='prob-start-input',
type='number',
placeholder='输入 Probability Start 值',
value=525,
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Predict Tweets End:", style={'paddingRight': '10px'}),
html.Td(
dcc.Input(
id='prob-end-input',
type='number',
placeholder='输入 Probability End 值',
value=549,
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Calculate Probability:", style={'paddingRight': '10px'}),
html.Td(
html.Button('Calculate', id='update-button', n_clicks=0)
)
]),
html.Tr(id='manual-info-tooltip', style={'margin': '10px'})
], style={
'width': '50%',
'marginTop': '10px',
'borderCollapse': 'collapse'
}),
# 新增测试区域
html.H2("Historical Probability Test", style={'marginTop': '20px'}),
html.Table([
html.Tr([
html.Td("Test Date:", style={'paddingRight': '10px'}),
html.Td(
dcc.DatePickerSingle(
id='test-date-input',
date=(datetime.now().date() - timedelta(days=1)).strftime('%Y-%m-%d'), # 默认昨天
display_format='YYYY-MM-DD',
style={'width': '100%'}
)
)
]),
html.Tr([
html.Td("Test Time:", style={'paddingRight': '10px'}),
html.Td(
html.Div([
dcc.Input(
id='test-time-input',
type='text',
placeholder='HH:MM:SS (e.g., 12:00:00)', # 增强提示
value='12:00:00',
pattern='[0-2][0-9]:[0-5][0-9]:[0-5][0-9]', # 限制格式
style={'width': '100%'}
),
html.Span(
"Enter time in HH:MM:SS format (e.g., 12:00:00)",
style={'fontSize': '12px', 'color': 'gray', 'marginTop': '5px', 'display': 'block'}
)
])
)
]),
html.Tr([
html.Td("Test Probability:", style={'paddingRight': '10px'}),
html.Td(
html.Button('Test', id='test-button', n_clicks=0)
)
]),
html.Tr(id='test-info-tooltip', style={'margin': '10px'})
])
], style={
'width': '50%',
'marginTop': '10px',
@ -279,4 +183,4 @@ def layout_config(app):
dcc.Interval(id='clock-interval', interval=1000, n_intervals=0)
])
return app
return app

View File

@ -1,37 +0,0 @@
from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
@app.callback(
[Output('manual-info-tooltip', 'children')],
[Input('update-button', 'n_clicks'),
Input('prob-start-input', 'value'),
Input('prob-end-input', 'value')]
)
def update_info_manual(n_clicks, prob_start, prob_end):
if n_clicks == 0:
return [html.Div("Click 'Manual Update' to see results.")]
tweet_count, days_to_next_friday = get_pace_params()
prob_start = int(prob_start) if prob_start is not None else 525
prob_end = int(prob_end) if prob_end is not None else 549
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_low, prob_high = map(float, probability.split(" - "))
formatted_probability = f"{prob_low * 100:.2f}% - {prob_high * 100:.2f}%"
pace_table_rows = [
html.Tr([
html.Th(f"Probability ({prob_start}-{prob_end})", colSpan=2, style={'paddingRight': '10px'}),
html.Td(formatted_probability, colSpan=6, style={'paddingRight': '10px'})
])
]
pace_table = html.Table(pace_table_rows, style={
'width': '100%',
'textAlign': 'left',
'borderCollapse': 'collapse'
})
return [pace_table]

View File

@ -1,219 +0,0 @@
from pkg.dash.func.info_func import *
from pkg.dash.app_init import app
from dash.dependencies import Input, Output
from dash import html
import os
import csv
import pandas as pd
import re
from datetime import timedelta
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
# 全局数据,避免重复加载
global_data = None
def initialize_global_data():
global global_data
if global_data is None:
global_data = render_data.global_agg_df.copy()
# 预计算常用列,避免重复操作
global_data['hours'] = global_data['minute_of_day'] // 60
global_data['minutes'] = global_data['minute_of_day'] % 60
global_data['datetime_est'] = pd.to_datetime(
global_data['date'].astype(str) + ' ' +
global_data['hours'].astype(str) + ':' +
global_data['minutes'].astype(str) + ':00',
errors='coerce'
).dt.tz_localize('US/Eastern', ambiguous='NaT')
@app.callback(
[Output('test-info-tooltip', 'children')],
[Input('test-button', 'n_clicks'),
Input('test-date-input', 'date'),
Input('test-time-input', 'value')]
)
def update_test_info(n_clicks, test_date, test_time, data=None):
if n_clicks == 0:
return [html.Div("Click 'Test' to see historical probability results.")]
est = pytz.timezone('US/Eastern')
data = data if data is not None else render_data.global_agg_df.copy()
if not test_date or not test_time:
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$'
if not re.match(time_pattern, test_time):
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")]
try:
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
except ValueError:
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
test_day_of_week = test_datetime.weekday()
test_hour = test_datetime.hour
days_since_last_friday = (test_day_of_week - 4) % 7
if test_hour < 12 and test_day_of_week == 4:
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7)
else:
cycle_start = test_datetime - timedelta(days=days_since_last_friday)
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
cycle_end = cycle_start + timedelta(days=7)
first_day = cycle_end.replace(day=1)
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
cycle_end = cycle_end.tz_convert(est)
else:
cycle_end = cycle_end.tz_convert(est)
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
if days_to_next_friday <= 0:
return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")]
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
if cycle_data.empty:
return [html.Div(f"No data available from {cycle_start} to {test_datetime}")]
tweet_count = cycle_data['tweet_count'].sum()
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
if actual_data.empty:
return [html.Div(f"No data available for cycle ending {cycle_end}")]
actual_end_count = actual_data['tweet_count'].sum()
days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60)
if days_elapsed <= 0:
return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")]
daily_avg = tweet_count / days_elapsed
predicted_end_count = daily_avg * 7
prob_start = predicted_end_count * 0.9
prob_end = predicted_end_count * 1.1
try:
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
prob_min, prob_max = map(float, probability.split(" - "))
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
except Exception as e:
return [html.Div(f"Error calculating probability: {str(e)}")]
test_table_rows = [
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]),
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]),
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]),
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]),
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4),
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
]
if prob_start <= actual_end_count <= prob_end:
expected_prob = (prob_max + prob_min) / 2
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)]))
else:
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
return [test_table]
def process_test_case(args):
test_datetime, data = args
test_date = test_datetime.date().strftime('%Y-%m-%d')
test_time = test_datetime.time().strftime('%H:%M:%S')
n_clicks = 1
result = update_test_info(n_clicks, test_date, test_time, data)
if isinstance(result[0], html.Table):
table = result[0]
rows = table.children
cycle_start = str(rows[0].children[1].children)
test_dt = str(rows[1].children[1].children)
cycle_end = str(rows[2].children[1].children)
tweet_count = int(rows[3].children[1].children)
actual_end_count = int(rows[4].children[1].children)
prob_range = rows[5].children[1].children
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
in_range = rows[6].children[1].children == "Yes"
expected_prob = None
note = ""
if len(rows) > 7:
if "Expected" in rows[7].children[0].children:
expected_prob = float(rows[7].children[1].children.split()[0][1:-1])
elif "Note" in rows[7].children[0].children:
note = rows[7].children[1].children
return [
test_date, test_time, cycle_start, cycle_end, tweet_count,
actual_end_count, prob_start, prob_end, prob_min, prob_max,
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
]
else:
return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000):
est = pytz.timezone('US/Eastern')
start_dt = pd.to_datetime(start_date).tz_localize(est)
end_dt = pd.to_datetime(end_date).tz_localize(est)
time_points = []
current_dt = start_dt
while current_dt <= end_dt:
time_points.append(current_dt)
current_dt += timedelta(hours=interval_hours)
headers = [
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
]
if not os.path.exists(output_file):
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
# 预加载数据
initialize_global_data()
data = global_data
total_steps = len(time_points)
max_workers = max_workers or os.cpu_count() or 4
chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数
# 分块处理时间点
chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)]
with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar:
for chunk in chunks:
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk}
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
pbar.update(1)
except Exception as e:
test_datetime = futures[future]
results.append([test_datetime.date().strftime('%Y-%m-%d'),
test_datetime.time().strftime('%H:%M:%S'),
"", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"])
pbar.update(1)
with open(output_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(results)
if __name__ == "__main__":
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000)

View File

@ -2,37 +2,27 @@ from datetime import datetime, timedelta
from dash.dependencies import Input, Output
from pkg.dash.app_init import app
from pkg.config import render_data
from pkg.tool import aggregate_data, generate_xticks, minutes_to_time, get_tweets_since_last_friday
from pkg.tool import aggregate_data, minutes_to_time, get_tweets_since_last_friday,get_pace_and_total_tweets
from dash import dcc
import plotly.graph_objs as go
import pandas as pd
import pytz
import numpy as np
@app.callback(
[Output('tabs-content', 'children'),
Output('multi-day-warning', 'children'),
Output('multi-tweet-summary', 'children')],
[Input('tabs', 'value'),
Input('multi-date-picker', 'value'),
Input('multi-interval-picker', 'value'),
Input('time-zone-checklist', 'value'),
Input('days-display-picker', 'value')]
)
def render_tab_content(tab, selected_dates, interval, time_zones, days_to_display):
def render_tab_content(tab, interval, days_to_display):
warning = ""
if tab == 'line':
if not selected_dates: # Handle None or empty list
selected_dates = [datetime.now().date()] # Default to today
warning = "No dates selected. Showing todays data."
if len(selected_dates) > 10:
selected_dates = selected_dates[:10]
warning = "Maximum of 10 days can be selected. Showing first 10 selected days."
selected_dates = [datetime.strptime(date, '%Y-%m-%d').date() for date in selected_dates]
else:
available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
if not available_dates:
warning = "No data available. Showing todays date with zero tweets."
available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
if not available_dates:
warning = "No data available. Showing todays date with zero tweets."
multi_data_agg = render_data.global_agg_df[render_data.global_agg_df['date'].isin(selected_dates)].copy()
if multi_data_agg.empty:
@ -47,23 +37,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
tweet_count_total = 0
agg_data = aggregate_data(multi_data_agg, interval)
xticks, xtick_labels = generate_xticks(interval)
if tab == 'line':
fig = go.Figure()
for date in selected_dates:
day_data = agg_data[agg_data['date'] == date]
hover_times = [f"{date} {minutes_to_time(minute)} EST" for minute in day_data['interval_group']]
fig.add_trace(go.Scatter(
x=day_data['interval_group'],
y=day_data['tweet_count'],
mode='lines',
name=str(date),
customdata=hover_times,
hovertemplate='%{customdata}<br>Tweets: %{y}<extra></extra>'
))
elif tab == 'heatmap':
if tab == 'heatmap':
pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0)
pivot_data.index = pivot_data.index.astype(str)
fig = go.Figure(data=go.Heatmap(
@ -77,7 +52,7 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
for i, date_str in enumerate(pivot_data.index):
date = datetime.strptime(date_str, '%Y-%m-%d').date()
if date.weekday() == 4: # Friday
if date.weekday() == 4:
prev_date = date - timedelta(days=1)
if str(prev_date) in pivot_data.index:
y_position = i / len(pivot_data.index)
@ -105,8 +80,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
for _, row in one_day_data.iterrows():
minute = row['interval_group']
hour = int(minute // 60) # Convert to integer
interval_idx = int((minute % 60) // interval) # Convert to integer
hour = int(minute // 60)
interval_idx = int((minute % 60) // interval)
if hour < 24:
z_values[hour][interval_idx] = row['tweet_count']
@ -126,21 +101,13 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
hovertemplate='%{y}:%{x} EST<br>Tweets: %{z}<br>Rate: %{customdata:.2%}<extra></extra>'
))
if tab in ['line', 'one_day_heatmap']:
fig.update_layout(
title=f'{"Line" if tab == "line" else "One-Day Heatmap"} Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
xaxis_title='Minutes' if tab == 'one_day_heatmap' else 'Eastern Time (HH:MM)',
yaxis_title='Hour of Day' if tab == 'one_day_heatmap' else 'Tweet Count',
xaxis=dict(
range=[0, 1440] if tab == 'line' else None,
tickvals=xticks if tab == 'line' else None,
ticktext=xtick_labels if tab == 'line' else None,
tickangle=45 if tab == 'line' else 0
),
title=f'One-Day Heatmap Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
xaxis_title='Minutes',
yaxis_title='Hour of Day',
height=600,
showlegend=(tab == 'line'),
yaxis=dict(autorange='reversed') if tab == 'one_day_heatmap' else None
yaxis=dict(autorange='reversed')
)
summary = f"Total tweets for selected dates: {int(tweet_count_total)}Total tweets: {get_tweets_since_last_friday()}"
summary = f"Total tweets: {get_tweets_since_last_friday()}"
return dcc.Graph(figure=fig), warning, summary

View File

@ -3,12 +3,9 @@ from dash.dependencies import Input, Output
@app.callback(
[Output('date-picker-container', 'style'),
Output('days-display-container', 'style'),
Output('time-zone-checklist', 'style')],
[Output('days-display-container', 'style'),
Output('multi-interval-container', 'style')],
[Input('tabs', 'value')]
)
def toggle_controls_visibility(tab):
if tab == 'heatmap' or tab == 'one_day_heatmap':
return {'display': 'none'}, {'display': 'block'}, {'display': 'none'}
return {'display': 'block'}, {'display': 'none'}, {'display': 'block'}
return {'display': 'block'},{'display': 'block'}

View File

@ -96,3 +96,54 @@ def format_time_str(days_to_next_friday):
return f"{days}d {hours:02d}h {minutes:02d}m {seconds:02d}s ({total_hours}h)"
def get_pace_and_total_tweets(target_time: datetime) -> tuple[float, int]:
est = pytz.timezone('US/Eastern')
# 如果 target_time 没有时区信息,假设为 EST
if target_time.tzinfo is None:
target_time = est.localize(target_time)
# 计算上周五 12:00 AM EST
target_date = target_time.date()
days_since_last_friday = (target_date.weekday() + 3) % 7 # 距离上周五的天数
last_friday = target_time - timedelta(days=days_since_last_friday)
last_friday_midnight = last_friday.replace(hour=0, minute=0, second=0, microsecond=0)
# 计算下周五 12:00 AM EST
days_to_next_friday = (4 - target_date.weekday()) % 7
next_friday = target_time + timedelta(days=days_to_next_friday)
next_friday_midnight = next_friday.replace(hour=0, minute=0, second=0, microsecond=0)
if target_time > next_friday_midnight:
next_friday_midnight += timedelta(days=7)
# 从 global_agg_df 中筛选从上周五 12:00 AM 到 target_time 的数据
if hasattr(render_data, 'global_agg_df') and not render_data.global_agg_df.empty:
multi_data_agg = render_data.global_agg_df[
(render_data.global_agg_df['date'] >= last_friday_midnight.date()) &
(render_data.global_agg_df['date'] <= target_date)
].copy()
else:
multi_data_agg = pd.DataFrame()
if multi_data_agg.empty:
total_tweets = 0
else:
# 使用 minute_of_day 转换为时间戳并筛选到 target_time 之前
multi_data_agg['timestamp'] = pd.to_datetime(multi_data_agg['date'].astype(str)) + \
pd.to_timedelta(multi_data_agg['minute_of_day'], unit='m')
multi_data_agg['timestamp'] = multi_data_agg['timestamp'].dt.tz_localize(est)
multi_data_agg = multi_data_agg[multi_data_agg['timestamp'] <= target_time]
total_tweets = multi_data_agg['tweet_count'].sum() if 'tweet_count' in multi_data_agg else 0
# 计算 Pace
days_elapsed = (target_time - last_friday_midnight).total_seconds() / (24 * 60 * 60)
days_remaining = (next_friday_midnight - target_time).total_seconds() / (24 * 60 * 60)
if days_elapsed > 0 and total_tweets > 0:
daily_avg = total_tweets / days_elapsed
pace = daily_avg * days_remaining + total_tweets
else:
pace = float(total_tweets) # 如果没有数据或时间未开始Pace 等于当前推文数
return round(pace, 2), int(total_tweets)

240
test/aola.py Normal file
View File

@ -0,0 +1,240 @@
import sqlite3
import tkinter as tk
from tkinter import ttk
import os
import json
class TeamSwitchPacketGenerator:
def __init__(self):
# 检查数据库文件是否存在
db_file = 'aola.sqlite'
if not os.path.exists(db_file):
raise FileNotFoundError(f"数据库文件 {db_file} 未找到,请确保文件在 {os.getcwd()} 目录下!")
print(f"数据库文件路径: {os.path.abspath(db_file)}")
self.conn = sqlite3.connect(db_file)
self.cursor = self.conn.cursor()
self.root = tk.Tk()
self.root.title("阵容切换封包生成器")
# 创建下拉框
self.team_var = tk.StringVar()
self.team_dropdown = ttk.Combobox(self.root, textvariable=self.team_var)
self.team_dropdown['values'] = self.get_team_names()
self.team_dropdown.pack(pady=10)
self.team_dropdown.set("请选择阵容")
# 创建按钮框架,包含“刷新”和“生成封包”按钮
button_frame = tk.Frame(self.root)
button_frame.pack(pady=5)
# 刷新按钮
tk.Button(button_frame, text="刷新", command=self.refresh_dropdown).pack(side=tk.LEFT, padx=5)
# 生成按钮
tk.Button(button_frame, text="生成封包(第一次生成请先解锁二级密码)", command=self.generate_packets).pack(side=tk.LEFT, padx=5)
# 创建 Frame 包含封包输出框和复制按钮
output_frame = tk.Frame(self.root)
output_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 封包输出框
tk.Label(output_frame, text="封包输出:").pack(anchor=tk.W)
self.output_text = tk.Text(output_frame, height=15, width=60)
self.output_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 复制按钮(放在右侧)
tk.Button(output_frame, text="复制脚本", command=self.copy_output).pack(side=tk.RIGHT, padx=5)
# 信息提示框
tk.Label(self.root, text="提示信息:").pack(anchor=tk.W)
self.info_text = tk.Text(self.root, height=5, width=80, fg="red")
self.info_text.pack(pady=5)
def get_team_names(self):
"""从数据库获取所有阵容名称"""
try:
self.cursor.execute("SELECT team_name FROM aolaer_team order by team_name")
return [row[0] for row in self.cursor.fetchall()]
except sqlite3.Error as e:
self.info_text.insert(tk.END, f"获取阵容名称失败: {e}\n")
return []
def refresh_dropdown(self):
"""刷新下拉框数据"""
# 重新获取阵容名称
team_names = self.get_team_names()
# 更新下拉框选项
self.team_dropdown['values'] = team_names
# 重置选择
self.team_dropdown.set("请选择阵容")
# 在提示框中显示刷新成功
self.info_text.insert(tk.END, "下拉框数据已刷新!\n")
def copy_output(self):
"""复制脚本输出框的内容到剪贴板"""
content = self.output_text.get(1.0, tk.END).strip()
if content:
self.root.clipboard_clear()
self.root.clipboard_append(content)
self.info_text.insert(tk.END, "脚本内容已复制到剪贴板!\n")
else:
self.info_text.insert(tk.END, "脚本输出框为空,无法复制!\n")
def validate_ids(self, ids_str):
"""验证 ids 数组,确保格式正确"""
try:
ids = json.loads(ids_str)
if not isinstance(ids, list) or len(ids) != 5:
return False, "ids 必须是长度为 5 的数组"
for x in ids:
if not isinstance(x, int) or x < 0:
return False, "ids 中的值必须是非负整数"
return True, ids
except json.JSONDecodeError:
return False, "ids 格式不正确"
def generate_packets(self):
"""生成封包代码"""
self.output_text.delete(1.0, tk.END)
self.info_text.delete(1.0, tk.END)
packets = []
selected_team = self.team_var.get()
if not selected_team or selected_team == "请选择阵容":
self.info_text.insert(tk.END, "请先选择一个阵容!\n")
return
try:
# 获取 team_code 和 team_set
self.cursor.execute("SELECT team_code, team_set FROM aolaer_team WHERE team_name = ?", (selected_team,))
result = self.cursor.fetchone()
if not result:
self.info_text.insert(tk.END, f"未找到阵容 {selected_team} 的 team_code\n")
return
team_code, team_set = result
packets.append(f'|#send={{"id":13,"param":{{"pms":"{team_code}"}},"cmd":"1222"}}|')
# 处理 is_set = 1 的记录(重置魂卡)
self.cursor.execute("SELECT id FROM aolaer_id WHERE is_set = 1")
set_ids = [row[0] for row in self.cursor.fetchall()]
for set_id in set_ids:
packets.append(
f'|#send={{"id":42,"param":{{"bk":1,"petId":{set_id},"ids":[0,0,0,0,0]}},"cmd":"ASC221104_2"}}|')
self.cursor.execute("UPDATE aolaer_id SET is_set = 0 WHERE id = ?", (set_id,))
self.conn.commit()
# 处理 team_code 中的 ID
team_ids = team_code.split('#')
team_ids = [id for id in team_ids if id]
# 处理 team_set
team_set_codes = None
if team_set:
team_set_codes = team_set.split('#')
if len(team_set_codes) != len(team_ids):
self.info_text.insert(tk.END,
f"警告team_set 的长度 ({len(team_set_codes)}) 与 team_code 的 ID 数量 ({len(team_ids)}) 不匹配!\n")
return
# 查询 aolaer_id 表
self.cursor.execute(
"SELECT id, div_weapon, aolaer_typecode FROM aolaer_id WHERE id IN ({})".format(
','.join('?' for _ in team_ids)
),
team_ids
)
id_to_typecode = {row[0]: row for row in self.cursor.fetchall()}
# 分组生成封包,确保顺序:子宠物 → 魂艺 → 魂卡
sub_pet_packets = []
soul_art_packets = []
soul_card_packets = []
for idx, id in enumerate(team_ids):
id = int(id)
if id not in id_to_typecode:
self.info_text.insert(tk.END, f"警告ID {id} 在 aolaer_id 表中不存在!\n")
continue
div_weapon = id_to_typecode[id][1]
typecode = id_to_typecode[id][2]
if team_set_codes and team_set_codes[idx] != '0':
typecode = team_set_codes[idx]
# 生成子宠物封包(即使 div_weapon 为空,也生成占位封包)
if div_weapon is not None and str(div_weapon).strip():
sub_pet_packets.append(
f'|#send={{"id":42,"param":{{"subPetId":{id},"petId":{div_weapon}}},"cmd":"ASBS230623_con"}}|')
else:
sub_pet_packets.append(
f'|#send={{"id":42,"param":{{"subPetId":{id},"petId":0}},"cmd":"ASBS230623_con"}}|')
self.info_text.insert(tk.END, f"提示ID {id} 的 div_weapon 为空,使用默认值 0 生成子宠物封包!\n")
# 获取 soul_art 和 soul_card
self.cursor.execute("SELECT soul_art, soul_card FROM aolaer_type WHERE aolaer_typecode = ?",
(typecode,))
result = self.cursor.fetchone()
if not result:
self.info_text.insert(tk.END, f"警告typecode {typecode} 在 aolaer_type 表中不存在!\n")
continue
soul_art, soul_card = result
# 生成魂艺封包(即使 soul_art 为空,也生成占位封包)
if soul_art and soul_art.strip():
try:
sid1, sid2 = soul_art.split(',')
soul_art_packets.append(
f'|#send={{"id":42,"param":{{"ui":{sid1},"petId":{id}}},"cmd":"ATT231229_1"}}|')
soul_art_packets.append(
f'|#send={{"id":42,"param":{{"ui":{sid2},"petId":{id}}},"cmd":"ATT231229_1"}}|')
except ValueError:
self.info_text.insert(tk.END, f"错误soul_art 格式不正确 for typecode {typecode}\n")
continue
else:
soul_art_packets.append(f'|#send={{"id":42,"param":{{"ui":0,"petId":{id}}},"cmd":"ATT231229_1"}}|')
soul_art_packets.append(f'|#send={{"id":42,"param":{{"ui":0,"petId":{id}}},"cmd":"ATT231229_1"}}|')
self.info_text.insert(tk.END,
f"提示typecode {typecode} 的 soul_art 为空,使用默认值 0 生成魂艺封包!\n")
# 生成魂卡封包
if soul_card and soul_card.strip():
is_valid, ids_or_error = self.validate_ids(soul_card)
if is_valid:
soul_card_packets.append(
f'|#send={{"id":42,"param":{{"bk":1,"petId":{id},"ids":{soul_card}}},"cmd":"ASC221104_2"}}|')
self.cursor.execute("UPDATE aolaer_id SET is_set = 1 WHERE id = ?", (id,))
self.conn.commit()
else:
self.info_text.insert(tk.END,
f"警告typecode {typecode} 的 soul_card {soul_card} 无效({ids_or_error}),已跳过处理!\n")
else:
self.info_text.insert(tk.END, f"提示typecode {typecode} 的 soul_card 为空,已跳过处理!\n")
# 按顺序添加封包:子宠物 → 魂艺 → 魂卡
packets.extend(sub_pet_packets)
packets.extend(soul_art_packets)
packets.extend(soul_card_packets)
if packets:
self.output_text.insert(tk.END, '\n'.join(packets))
else:
self.info_text.insert(tk.END, "未生成任何封包,请检查数据!\n")
except sqlite3.Error as e:
self.info_text.insert(tk.END, f"数据库操作失败: {e}\n")
def run(self):
"""运行程序"""
self.root.mainloop()
def __del__(self):
"""清理数据库连接"""
self.conn.close()
if __name__ == "__main__":
app = TeamSwitchPacketGenerator()
app.run()

BIN
test/aola.sqlite Normal file

Binary file not shown.

38
test/compaare.py Normal file
View File

@ -0,0 +1,38 @@
def compare_strings(str1, str2):
# 清理字符串:去除首尾空白,按换行分割后去除每行空白,按逗号分割
list1 = [item.strip() for item in str1.strip().split('\n') if item.strip()]
list1 = [sub_item.strip() for item in list1 for sub_item in item.split(',') if sub_item.strip()]
list2 = [item.strip() for item in str2.strip().split('\n') if item.strip()]
list2 = [sub_item.strip() for item in list2 for sub_item in item.split(',') if sub_item.strip()]
# 找出差异
diff1 = [item for item in list1 if item not in list2] # str1 比 str2 多的元素
diff2 = [item for item in list2 if item not in list1] # str2 比 str1 多的元素
# 构建输出
result = []
if diff1:
result.append(f"第一个字符串比第二个字符串多了 {len(diff1)} 个元素:")
result.extend(diff1) # 每个元素占一行
if diff2:
result.append(f"第二个字符串比第一个字符串多了 {len(diff2)} 个元素:")
result.extend(diff2) # 每个元素占一行
# 返回结果
return '\n'.join(result) if result else "两个字符串相同"
# 从文件读取数据
try:
with open('str1.txt', 'r', encoding='utf-8') as f:
str1 = f.read()
with open('str2.txt', 'r', encoding='utf-8') as f:
str2 = f.read()
# 运行比较
print(compare_strings(str1, str2))
except FileNotFoundError as e:
print(f"错误:找不到文件 {e.filename}")
except Exception as e:
print(f"发生错误:{str(e)}")

234
test/m.py Normal file
View File

@ -0,0 +1,234 @@
# 生成 SQL 插入语句的 Python 脚本
# 数据集(可以替换为从文件读取)
data = """10.4101,睑球粘连羊膜移植修补术,LV3
10.4402,异体结膜移植术,LV3
11.3204,翼状胬肉切除术伴丝裂霉素注入,LV3
12.1401,虹膜全切除术,LV3
12.3504,瞳孔残膜切除术,LV3
12.4201,前房机化膜切除术,LV3
12.6700,眼房水引流装置置入,LV3
13.1100,经颞下入路晶状体囊内摘出术,LV3
13.9003,晶状体囊袋张力环植入术,LV3
14.0202,后段眼球壁异物取出术,LV3
14.7202,后入路玻璃体切割术伴替代物注入,LV4
14.7905,玻璃体气液交换术,LV3
15.2200,一条眼外肌的缩短术,LV3
10.4102,睑球粘连口唇黏膜移植修补术,LV3
10.4201,结膜穹窿羊膜移植重建术,LV3
10.4202,结膜穹窿口唇黏膜移植重建术,LV3
10.4401,自体结膜移植术,LV3
10.4901,结膜滤过泡瘘修补术,LV3
10.4903,结膜囊成形术,LV3
10.9901,结膜松弛矫正术,LV3
11.3200,胬肉切除术伴角膜移植术,LV3
11.3201,翼状胬肉切除伴自体干细胞移植术,LV3
11.3202,翼状胬肉切除术伴异体干细胞移植术,LV3
11.3203,翼状胬肉切除伴羊膜植片移植术,LV3
11.7903,羊膜移植眼表重建术,LV3
11.9901,角巩膜割烙术,LV3
12.1403,虹膜周边切除术,LV3
12.1404,虹膜周边激光切除术,LV3
12.3300,虹膜后粘连松解术,LV3
12.3301,虹膜粘连松解术,LV3
12.3500,瞳孔成形术,LV3
12.3501,瞳孔膜穿刺术,LV3
12.3502,瞳孔粘连松解术,LV3
12.3503,瞳孔切开术,LV3
12.3505,滤过泡针拨术,LV3
12.3901,虹膜离断缝合术,LV3
12.3902,虹膜复位术,LV3
12.4200,虹膜病损切除术,LV3
12.5300,眼前房角切开伴眼前房角穿刺,LV3
12.5400,外路小梁切开术,LV3
12.5501,睫状体切开术,LV3
12.6404,小梁切除术伴丝裂霉素注入,LV3
12.6405,非穿透性小梁切除术,LV3
12.6406,小梁切除术伴羊膜移植,LV3
12.6407,小梁切除术伴移植物,LV3
12.6601,滤泡修复术,LV3
12.6701,青光眼阀取出术,LV3
12.6702,青光眼阀修复调位术,LV3
12.6703,前房导管术,LV3
12.6704,青光眼阀置入术,LV3
12.8801,巩膜外加压术,LV3
13.0100,用磁吸法的去除晶状体异物,LV3
13.0201,晶状体切开异物取出术,LV3
13.1901,白内障囊内冷凝摘出术,LV3
13.1902,白内障囊内摘除术,LV3
13.6501,晶状体前囊膜切除术,LV3
13.6502,晶状体后囊膜切除术,LV3
13.6503,晶状体后囊膜激光切开术,LV3
13.6901,残留晶状体皮质切除术,LV3
13.7000,置入人工晶状体,LV3
13.9001,人工晶状体复位术,LV3
13.9002,人工晶状体悬吊术,LV3
14.0100,用磁吸法去除眼后节异物,LV3
14.0101,玻璃体异物磁吸术,LV3
14.0201,脉络膜切开异物取出术,LV3
14.2402,视网膜病损激光凝固术,LV3
14.4901,巩膜环扎术伴空气填塞,LV4
14.4902,巩膜环扎术伴巩膜切除术,LV4
14.7401,后入路玻璃体切割术,LV4
14.7501,玻璃体硅油填充术,LV3
14.7901,玻璃体腔探查术,LV3
14.7904,玻璃体腔残留晶体皮质取出术,LV3
15.0900,眼外肌和腱的其他诊断性操作,LV2
15.1100,一条眼外肌的后徙术,LV3
15.1200,一条眼外肌的前徙术,LV3
15.2100,一条眼外肌的延长术,LV3
15.2901,一条眼外肌的悬吊术,LV3
16.0101,外侧开眶术,LV4
10.4100x001,睑球粘连游离移植物修补术,LV3
12.3900x001,虹膜修补术,LV3
12.6400x010,滤过道再通术,LV3
13.1900x008,膜性白内障剪除术,LV3
13.4100x001,白内障超声乳化抽吸术,LV3
13.7200x001,人工晶体二期置入术,LV3
13.9000x007,人工晶体缝合术,LV3
14.0200x001,眼后节异物去除术,LV3
14.7500x003,玻璃体硅油置换术,LV3
15.3x01,两条或两条以上眼外肌的后徙术,LV3
15.9x00x001,眼肌部分切除术,LV3
16.0900x005,多个眶壁减压术,LV4
10.4200x001,结膜穹窿游离移植物重建术,LV3
10.4300x002,结膜穹窿成形术,LV3
10.4400x001,结膜移植术,LV3
10.4400x002,羊膜移植结膜修补术,LV3
10.4900x001,结膜成形术,LV3
10.4900x003,结膜修补术,LV3
10.4900x004,结膜瓣修补术,LV3
10.5x01,睑球粘连分离术,LV3
10.6x00x001,结膜缝合术,LV3
10.6x00x002,结膜撕裂修补术,LV3
10.9900x001,结膜瓣遮盖术,LV3
10.9900x004,结膜脱位复位术,LV3
11.6900x003,异体角膜缘干细胞移植术,LV3
11.9900x002,自体角膜缘干细胞取材术,LV3
12.1400x001,虹膜部分切除术,LV3
12.1400x008,瞳孔前膜激光切开术,LV3
12.3200x001,虹膜前粘连松解术,LV3
12.3900x004,虹膜还纳术,LV3
12.3900x005,虹膜周边激光成形术,LV3
12.4100x002,虹膜周边激光破坏术,LV3
12.4100x003,虹膜周边冷冻破坏术,LV3
12.4100x004,虹膜周边电灼破坏术,LV3
12.5200x001,前房角切开术,LV3
12.5900x001,房角分离术,LV3
12.6400x003,滤帘切除术[小梁切除术],LV3
12.6400x009,小梁切除术伴人造移植物,LV3
12.6700x010,眼压调节器再次置入术,LV3
12.8300x002,眼前节手术伤口修补术,LV3
12.8800x002,巩膜移植物加固术,LV3
13.1900x007,晶状体囊内摘除术,LV3
13.2x01,晶状体刮匙摘除术,LV3
13.3x00x001,晶状体单纯抽吸囊外摘除术,LV3
13.3x01,创伤性白内障冲洗术,LV3
13.4200x001,经后路白内障切割吸出术,LV3
13.4300x001,白内障切割吸出术,LV3
13.5900x001,白内障囊外摘除术,LV3
13.6400x001,后发性白内障切开术,LV3
13.6500x002,后发性白内障切除术,LV3
13.6900x002,激光后囊切开术[YAG],LV3
13.7100x001,白内障摘除伴人工晶体一期置入术,LV3
13.7200x002,人工晶体再置入术,LV3
13.8x00x003,人工晶体取出术,LV3
13.9000x004,后囊切开术,LV3
13.9000x005,张力环缝合术,LV3
13.9000x006,虹膜隔晶体置入术,LV3
13.9000x008,人工晶体前膜切除术,LV3
13.9000x009,人工晶体睫状沟固定术,LV3
13.9000x010,晶状体前囊切开术,LV3
13.9000x011,晶状体囊膜剪开术,LV3
13.9100x001,可植入式隐形眼镜置入术[ICL置入术],LV3
14.0200x002,玻璃体腔异物取出术,LV3
14.4900x001,巩膜环扎术,LV4
14.6x00x001,眼后节置入物取出术,LV3
14.6x01,巩膜环扎带取出术,LV3
14.6x02,玻璃体硅油取出术,LV3
14.7100x001,前入路玻璃体切除术,LV4
14.7300x001,前入路玻璃体切割术,LV4
14.7500x004,玻璃体重水置换术,LV3
14.7900x001,人工玻璃体球囊置入术,LV3
14.9x00x001,巩膜外环扎带调整术,LV3
15.1900x001,一条眼外肌离断术,LV3
15.3x02,两条或两条以上眼外肌的前徙术,LV3
15.4x01,两条或两条以上眼外肌缩短术,LV3
15.5x00,眼外肌移位术,LV3
15.6x00,眼外肌手术后的修复术,LV3
15.7x00,眼外肌损伤修补术,LV3
15.7x01,眼肌粘连松解术,LV3
15.9x00x007,眼肌探查术,LV3
15.9x00x008,眼睑轮匝肌切断术,LV3
15.9x00x009,眼外肌病损切除术,LV3
15.9x01,眼阔筋膜切除术,LV3
16.0200x001,眼内自膨胀水凝胶注入术,LV3
16.0900x004,一个眶壁减压术,LV3
12.3500x001,瞳孔激光成形术,LV3
12.4100x001,眼前房病损激光切除术,LV3
12.6400x001,激光小梁成形术[ALPKLP],LV3
12.8200x001,巩膜瘘修补术,LV3
13.1900x006,白内障针吸术,LV3
15.0100x001,眼外肌活检术,LV3
15.4x02,两条或两条以上眼外肌悬吊术,LV3
15.9x00x010,眼外肌本体感受器破坏术,LV3
12.1402,虹膜激光切除术,LV3
12.3100,虹膜前房角粘连松解术,LV3
12.3400,角膜玻璃体粘连松解术,LV3
12.8703,巩膜外加压术伴填充,LV3
13.4101,飞秒激光白内障超声乳化抽吸术,LV3
13.5100,经颞下入路晶状体囊外摘出术,LV3
13.6600,后发膜机械性碎裂术[复发性白内障],LV3
14.4100,巩膜环扎术伴有植入物,LV4
14.4903,巩膜环扎术伴玻璃体切除术,LV4
14.7203,后入路玻璃体切割术伴人工玻璃体置入术,LV4
14.7902,玻璃体腔脱位晶状体取出术,LV3
15.1300,一条眼外肌的部分切除术,LV3
16.0200,眼眶切开术伴置入眼眶植入物,LV4"""
# SQL 模板Oracle
sql_template = """
INSERT INTO MET_ORDT_OPERATION_COMPARE
( COMPARE_ID, DOC_CODE, DOC_NAME, OPERATION_CODE, OPERATION_NAME, OPERATION_LEVEL, OPER_CODE, OPER_DATE)
VALUES
( '1992006-{operation_code}',
'1992006',
'刘岚',
'{operation_code}',
'{operation_name}',
'{operation_level}',
'admin',
SYSDATE);
"""
# 存储生成的 SQL 语句
sql_statements = []
# 解析数据集
for line in data.strip().split('\n'):
# 按逗号分割,提取字段
operation_code, operation_name, operation_level = line.strip().split(',')
# 替换模板中的占位符
sql = sql_template.format(
operation_code=operation_code,
operation_name=operation_name.replace("'", "''"), # 防止 SQL 注入,处理单引号
operation_level=operation_level
)
# 添加到语句列表
sql_statements.append(sql)
# 打印 SQL 语句(可选)
for sql in sql_statements:
print(sql)
# 保存 SQL 语句到文件
output_file = 'insert_operations.sql'
with open(output_file, 'w', encoding='utf-8') as f:
for sql in sql_statements:
f.write(sql + '\n')
f.write('COMMIT;\n')
print(f"\nSQL 语句已保存到 {output_file}")

0
test/str1.txt Normal file
View File

0
test/str2.txt Normal file
View File