Compare commits
5 Commits
8e3962db65
...
c40a1cb0dc
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c40a1cb0dc | ||
![]() |
1abc7e0b7c | ||
![]() |
d7976d598e | ||
![]() |
681f501c1c | ||
![]() |
abf820fe08 |
@ -26,12 +26,15 @@ interval_options = [
|
||||
{'label': '5 minutes', 'value': 5},
|
||||
{'label': '10 minutes', 'value': 10},
|
||||
{'label': '30 minutes', 'value': 30},
|
||||
{'label': '60 minutes', 'value': 60},
|
||||
{'label': '⅓ day', 'value': 480},
|
||||
{'label': '1 hour', 'value': 60},
|
||||
{'label': '6 hours(1/4 day)', 'value': 360},
|
||||
{'label': '8 hours(1/3 day)', 'value': 480},
|
||||
{'label': '12 hours(1/2 day)', 'value': 720},
|
||||
{'label': '1 day', 'value': 1440}
|
||||
]
|
||||
days_options = [
|
||||
{'label': '7 days', 'value': 7},
|
||||
{'label': '14 days', 'value': 14},
|
||||
{'label': '30 days', 'value': 30},
|
||||
{'label': '90 days', 'value': 90},
|
||||
{'label': '120 days', 'value': 120},
|
||||
|
@ -2,6 +2,7 @@ from datetime import timedelta, datetime
|
||||
from dash import dcc, html
|
||||
from pkg.config import interval_options, days_options, render_data
|
||||
|
||||
|
||||
def layout_config(app):
|
||||
app.layout = html.Div([
|
||||
html.Div(
|
||||
@ -112,27 +113,16 @@ def layout_config(app):
|
||||
'zIndex': 1000
|
||||
}
|
||||
),
|
||||
# Main content
|
||||
html.Div([
|
||||
html.H1("Elon Musk Tweet Time Analysis (EST)"),
|
||||
html.Div(id='date-picker-container', children=[
|
||||
html.Div(id='multi-interval-container', children=[
|
||||
dcc.Dropdown(
|
||||
id='multi-date-picker',
|
||||
options=[{'label': str(date), 'value': str(date)} for date in render_data.all_dates],
|
||||
value=render_data.default_date,
|
||||
multi=True,
|
||||
searchable=True,
|
||||
placeholder="Search and select dates (YYYY-MM-DD)",
|
||||
style={'width': '100%'}
|
||||
id='multi-interval-picker',
|
||||
options=interval_options,
|
||||
value=10,
|
||||
style={'width': '50%', 'marginTop': '10px'}
|
||||
)
|
||||
]),
|
||||
dcc.Dropdown(
|
||||
id='multi-interval-picker',
|
||||
options=interval_options,
|
||||
value=10,
|
||||
style={'width': '50%', 'marginTop': '10px'}
|
||||
),
|
||||
html.Div(id='days-display-container', style={'display': 'none'}, children=[
|
||||
html.Div(id='days-display-container', children=[
|
||||
dcc.Dropdown(
|
||||
id='days-display-picker',
|
||||
options=days_options,
|
||||
@ -141,18 +131,8 @@ def layout_config(app):
|
||||
)
|
||||
]),
|
||||
html.Div(id='multi-day-warning', style={'color': 'red', 'margin': '10px'}),
|
||||
dcc.Checklist(
|
||||
id='time-zone-checklist',
|
||||
options=[
|
||||
{'label': 'California Time (PST)', 'value': 'PST'},
|
||||
{'label': 'Texas Time (CST)', 'value': 'CST'}
|
||||
],
|
||||
value=['PST'],
|
||||
style={'margin': '10px'}
|
||||
),
|
||||
html.Div(id='multi-tweet-summary', style={'fontSize': '20px', 'margin': '10px'}),
|
||||
dcc.Tabs(id='tabs', value='line', children=[
|
||||
dcc.Tab(label='Line', value='line'),
|
||||
dcc.Tabs(id='tabs', value='heatmap', children=[
|
||||
dcc.Tab(label='Heatmap', value='heatmap'),
|
||||
dcc.Tab(label='Heatmap(1-day)', value='one_day_heatmap'),
|
||||
]),
|
||||
@ -193,83 +173,7 @@ def layout_config(app):
|
||||
style={'width': '100%'}
|
||||
)
|
||||
)
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("Predict Tweets Start:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
dcc.Input(
|
||||
id='prob-start-input',
|
||||
type='number',
|
||||
placeholder='输入 Probability Start 值',
|
||||
value=525,
|
||||
style={'width': '100%'}
|
||||
)
|
||||
)
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("Predict Tweets End:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
dcc.Input(
|
||||
id='prob-end-input',
|
||||
type='number',
|
||||
placeholder='输入 Probability End 值',
|
||||
value=549,
|
||||
style={'width': '100%'}
|
||||
)
|
||||
)
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("Calculate Probability:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
html.Button('Calculate', id='update-button', n_clicks=0)
|
||||
)
|
||||
]),
|
||||
html.Tr(id='manual-info-tooltip', style={'margin': '10px'})
|
||||
], style={
|
||||
'width': '50%',
|
||||
'marginTop': '10px',
|
||||
'borderCollapse': 'collapse'
|
||||
}),
|
||||
# 新增测试区域
|
||||
html.H2("Historical Probability Test", style={'marginTop': '20px'}),
|
||||
html.Table([
|
||||
html.Tr([
|
||||
html.Td("Test Date:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
dcc.DatePickerSingle(
|
||||
id='test-date-input',
|
||||
date=(datetime.now().date() - timedelta(days=1)).strftime('%Y-%m-%d'), # 默认昨天
|
||||
display_format='YYYY-MM-DD',
|
||||
style={'width': '100%'}
|
||||
)
|
||||
)
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("Test Time:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
html.Div([
|
||||
dcc.Input(
|
||||
id='test-time-input',
|
||||
type='text',
|
||||
placeholder='HH:MM:SS (e.g., 12:00:00)', # 增强提示
|
||||
value='12:00:00',
|
||||
pattern='[0-2][0-9]:[0-5][0-9]:[0-5][0-9]', # 限制格式
|
||||
style={'width': '100%'}
|
||||
),
|
||||
html.Span(
|
||||
"Enter time in HH:MM:SS format (e.g., 12:00:00)",
|
||||
style={'fontSize': '12px', 'color': 'gray', 'marginTop': '5px', 'display': 'block'}
|
||||
)
|
||||
])
|
||||
)
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("Test Probability:", style={'paddingRight': '10px'}),
|
||||
html.Td(
|
||||
html.Button('Test', id='test-button', n_clicks=0)
|
||||
)
|
||||
]),
|
||||
html.Tr(id='test-info-tooltip', style={'margin': '10px'})
|
||||
])
|
||||
], style={
|
||||
'width': '50%',
|
||||
'marginTop': '10px',
|
||||
@ -279,4 +183,4 @@ def layout_config(app):
|
||||
|
||||
dcc.Interval(id='clock-interval', interval=1000, n_intervals=0)
|
||||
])
|
||||
return app
|
||||
return app
|
||||
|
@ -1,37 +0,0 @@
|
||||
from pkg.dash.func.info_func import *
|
||||
from pkg.dash.app_init import app
|
||||
from dash.dependencies import Input, Output
|
||||
from dash import html
|
||||
|
||||
@app.callback(
|
||||
[Output('manual-info-tooltip', 'children')],
|
||||
[Input('update-button', 'n_clicks'),
|
||||
Input('prob-start-input', 'value'),
|
||||
Input('prob-end-input', 'value')]
|
||||
)
|
||||
def update_info_manual(n_clicks, prob_start, prob_end):
|
||||
if n_clicks == 0:
|
||||
return [html.Div("Click 'Manual Update' to see results.")]
|
||||
|
||||
tweet_count, days_to_next_friday = get_pace_params()
|
||||
prob_start = int(prob_start) if prob_start is not None else 525
|
||||
prob_end = int(prob_end) if prob_end is not None else 549
|
||||
|
||||
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
|
||||
|
||||
prob_low, prob_high = map(float, probability.split(" - "))
|
||||
formatted_probability = f"{prob_low * 100:.2f}% - {prob_high * 100:.2f}%"
|
||||
|
||||
pace_table_rows = [
|
||||
html.Tr([
|
||||
html.Th(f"Probability ({prob_start}-{prob_end})", colSpan=2, style={'paddingRight': '10px'}),
|
||||
html.Td(formatted_probability, colSpan=6, style={'paddingRight': '10px'})
|
||||
])
|
||||
]
|
||||
pace_table = html.Table(pace_table_rows, style={
|
||||
'width': '100%',
|
||||
'textAlign': 'left',
|
||||
'borderCollapse': 'collapse'
|
||||
})
|
||||
return [pace_table]
|
||||
|
@ -1,219 +0,0 @@
|
||||
from pkg.dash.func.info_func import *
|
||||
from pkg.dash.app_init import app
|
||||
from dash.dependencies import Input, Output
|
||||
from dash import html
|
||||
import os
|
||||
import csv
|
||||
import pandas as pd
|
||||
import re
|
||||
from datetime import timedelta
|
||||
from tqdm import tqdm
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
import multiprocessing as mp
|
||||
|
||||
# 全局数据,避免重复加载
|
||||
global_data = None
|
||||
|
||||
def initialize_global_data():
|
||||
global global_data
|
||||
if global_data is None:
|
||||
global_data = render_data.global_agg_df.copy()
|
||||
# 预计算常用列,避免重复操作
|
||||
global_data['hours'] = global_data['minute_of_day'] // 60
|
||||
global_data['minutes'] = global_data['minute_of_day'] % 60
|
||||
global_data['datetime_est'] = pd.to_datetime(
|
||||
global_data['date'].astype(str) + ' ' +
|
||||
global_data['hours'].astype(str) + ':' +
|
||||
global_data['minutes'].astype(str) + ':00',
|
||||
errors='coerce'
|
||||
).dt.tz_localize('US/Eastern', ambiguous='NaT')
|
||||
|
||||
@app.callback(
|
||||
[Output('test-info-tooltip', 'children')],
|
||||
[Input('test-button', 'n_clicks'),
|
||||
Input('test-date-input', 'date'),
|
||||
Input('test-time-input', 'value')]
|
||||
)
|
||||
def update_test_info(n_clicks, test_date, test_time, data=None):
|
||||
if n_clicks == 0:
|
||||
return [html.Div("Click 'Test' to see historical probability results.")]
|
||||
|
||||
est = pytz.timezone('US/Eastern')
|
||||
data = data if data is not None else render_data.global_agg_df.copy()
|
||||
|
||||
if not test_date or not test_time:
|
||||
return [html.Div("Date or time input is empty. Please provide both date (YYYY-MM-DD) and time (HH:MM:SS).")]
|
||||
|
||||
time_pattern = r'^(?:[01]\d|2[0-3]):[0-5]\d:[0-5]\d$'
|
||||
if not re.match(time_pattern, test_time):
|
||||
return [html.Div("Invalid time format. Use HH:MM:SS (e.g., 12:00:00).")]
|
||||
|
||||
try:
|
||||
test_datetime = pd.to_datetime(f"{test_date} {test_time}", format='%Y-%m-%d %H:%M:%S').tz_localize(est, ambiguous=True)
|
||||
except ValueError:
|
||||
return [html.Div("Invalid date or time format. Use YYYY-MM-DD and HH:MM:SS (e.g., 12:00:00).")]
|
||||
|
||||
test_day_of_week = test_datetime.weekday()
|
||||
test_hour = test_datetime.hour
|
||||
days_since_last_friday = (test_day_of_week - 4) % 7
|
||||
if test_hour < 12 and test_day_of_week == 4:
|
||||
cycle_start = test_datetime - timedelta(days=days_since_last_friday + 7)
|
||||
else:
|
||||
cycle_start = test_datetime - timedelta(days=days_since_last_friday)
|
||||
cycle_start = cycle_start.replace(hour=12, minute=0, second=0, microsecond=0)
|
||||
|
||||
cycle_end = cycle_start + timedelta(days=7)
|
||||
first_day = cycle_end.replace(day=1)
|
||||
second_sunday = first_day + timedelta(days=((6 - first_day.weekday()) % 7) + 7)
|
||||
if cycle_end.month == 3 and cycle_end >= second_sunday.replace(hour=2):
|
||||
cycle_end = cycle_end.tz_convert(est)
|
||||
else:
|
||||
cycle_end = cycle_end.tz_convert(est)
|
||||
|
||||
days_to_next_friday = (cycle_end - test_datetime).total_seconds() / (24 * 60 * 60)
|
||||
if days_to_next_friday <= 0:
|
||||
return [html.Div(f"Test time {test_datetime} is at or past cycle end {cycle_end}.")]
|
||||
|
||||
cycle_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= test_datetime)]
|
||||
if cycle_data.empty:
|
||||
return [html.Div(f"No data available from {cycle_start} to {test_datetime}")]
|
||||
|
||||
tweet_count = cycle_data['tweet_count'].sum()
|
||||
|
||||
actual_data = data[(data['datetime_est'] >= cycle_start) & (data['datetime_est'] <= cycle_end)]
|
||||
if actual_data.empty:
|
||||
return [html.Div(f"No data available for cycle ending {cycle_end}")]
|
||||
actual_end_count = actual_data['tweet_count'].sum()
|
||||
|
||||
days_elapsed = (test_datetime - cycle_start).total_seconds() / (24 * 60 * 60)
|
||||
if days_elapsed <= 0:
|
||||
return [html.Div(f"Test time {test_datetime} is before cycle start {cycle_start}.")]
|
||||
|
||||
daily_avg = tweet_count / days_elapsed
|
||||
predicted_end_count = daily_avg * 7
|
||||
prob_start = predicted_end_count * 0.9
|
||||
prob_end = predicted_end_count * 1.1
|
||||
|
||||
try:
|
||||
probability = calculate_tweet_probability(tweet_count, days_to_next_friday, prob_start, prob_end)
|
||||
prob_min, prob_max = map(float, probability.split(" - "))
|
||||
formatted_probability = f"{prob_min * 100:.2f}% - {prob_max * 100:.2f}%"
|
||||
except Exception as e:
|
||||
return [html.Div(f"Error calculating probability: {str(e)}")]
|
||||
|
||||
test_table_rows = [
|
||||
html.Tr([html.Th("Cycle Start:", colSpan=4), html.Td(str(cycle_start), colSpan=6)]),
|
||||
html.Tr([html.Th("Test Date and Time:", colSpan=4), html.Td(str(test_datetime), colSpan=6)]),
|
||||
html.Tr([html.Th("Cycle End:", colSpan=4), html.Td(str(cycle_end), colSpan=6)]),
|
||||
html.Tr([html.Th("Tweet Count at Test Time:", colSpan=4), html.Td(str(tweet_count), colSpan=6)]),
|
||||
html.Tr([html.Th("Actual Final Tweet Count:", colSpan=4), html.Td(str(actual_end_count), colSpan=6)]),
|
||||
html.Tr([html.Th(f"Predicted Range ({int(prob_start)}-{int(prob_end)}):", colSpan=4), html.Td(formatted_probability, colSpan=6)]),
|
||||
html.Tr([html.Th("Does Actual Fall in Range?", colSpan=4),
|
||||
html.Td("Yes" if prob_start <= actual_end_count <= prob_end else "No",
|
||||
colSpan=6, style={'color': 'green' if prob_start <= actual_end_count <= prob_end else 'red'})])
|
||||
]
|
||||
if prob_start <= actual_end_count <= prob_end:
|
||||
expected_prob = (prob_max + prob_min) / 2
|
||||
test_table_rows.append(html.Tr([html.Th("Expected Probability:", colSpan=4),
|
||||
html.Td(f"~{expected_prob * 100:.2f}%", colSpan=6)]))
|
||||
else:
|
||||
test_table_rows.append(html.Tr([html.Th("Note:", colSpan=4),
|
||||
html.Td("Prediction does not match actual outcome.", colSpan=6, style={'color': 'red'})]))
|
||||
|
||||
test_table = html.Table(test_table_rows, style={'width': '100%', 'textAlign': 'left', 'borderCollapse': 'collapse'})
|
||||
return [test_table]
|
||||
|
||||
|
||||
def process_test_case(args):
|
||||
test_datetime, data = args
|
||||
test_date = test_datetime.date().strftime('%Y-%m-%d')
|
||||
test_time = test_datetime.time().strftime('%H:%M:%S')
|
||||
n_clicks = 1
|
||||
|
||||
result = update_test_info(n_clicks, test_date, test_time, data)
|
||||
|
||||
if isinstance(result[0], html.Table):
|
||||
table = result[0]
|
||||
rows = table.children
|
||||
|
||||
cycle_start = str(rows[0].children[1].children)
|
||||
test_dt = str(rows[1].children[1].children)
|
||||
cycle_end = str(rows[2].children[1].children)
|
||||
tweet_count = int(rows[3].children[1].children)
|
||||
actual_end_count = int(rows[4].children[1].children)
|
||||
prob_range = rows[5].children[1].children
|
||||
prob_min, prob_max = [float(x.strip('%')) for x in prob_range.split(" - ")]
|
||||
prob_start, prob_end = map(int, rows[5].children[0].children.split("(")[1].split(")")[0].split("-"))
|
||||
in_range = rows[6].children[1].children == "Yes"
|
||||
expected_prob = None
|
||||
note = ""
|
||||
if len(rows) > 7:
|
||||
if "Expected" in rows[7].children[0].children:
|
||||
expected_prob = float(rows[7].children[1].children.split()[0][1:-1])
|
||||
elif "Note" in rows[7].children[0].children:
|
||||
note = rows[7].children[1].children
|
||||
|
||||
return [
|
||||
test_date, test_time, cycle_start, cycle_end, tweet_count,
|
||||
actual_end_count, prob_start, prob_end, prob_min, prob_max,
|
||||
"Yes" if in_range else "No", expected_prob if expected_prob is not None else "", note
|
||||
]
|
||||
else:
|
||||
return [test_date, test_time, "", "", "", "", "", "", "", "", "", "", result[0].children]
|
||||
|
||||
|
||||
def run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", max_workers=None, chunk_size=1000):
|
||||
est = pytz.timezone('US/Eastern')
|
||||
start_dt = pd.to_datetime(start_date).tz_localize(est)
|
||||
end_dt = pd.to_datetime(end_date).tz_localize(est)
|
||||
time_points = []
|
||||
current_dt = start_dt
|
||||
while current_dt <= end_dt:
|
||||
time_points.append(current_dt)
|
||||
current_dt += timedelta(hours=interval_hours)
|
||||
|
||||
headers = [
|
||||
"Test Date", "Test Time", "Cycle Start", "Cycle End", "Tweet Count at Test Time",
|
||||
"Actual Final Tweet Count", "Predicted Range Start", "Predicted Range End",
|
||||
"Probability Min (%)", "Probability Max (%)", "Actual in Range", "Expected Probability (%)", "Note"
|
||||
]
|
||||
|
||||
if not os.path.exists(output_file):
|
||||
with open(output_file, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(headers)
|
||||
|
||||
# 预加载数据
|
||||
initialize_global_data()
|
||||
data = global_data
|
||||
|
||||
total_steps = len(time_points)
|
||||
max_workers = max_workers or os.cpu_count() or 4
|
||||
chunk_size = min(chunk_size, total_steps) # 确保 chunk_size 不超过总任务数
|
||||
|
||||
# 分块处理时间点
|
||||
chunks = [time_points[i:i + chunk_size] for i in range(0, total_steps, chunk_size)]
|
||||
|
||||
with tqdm(total=total_steps, desc="Processing Test Cases", unit="step") as pbar:
|
||||
for chunk in chunks:
|
||||
results = []
|
||||
with ProcessPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {executor.submit(process_test_case, (test_datetime, data)): test_datetime for test_datetime in chunk}
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
result = future.result()
|
||||
results.append(result)
|
||||
pbar.update(1)
|
||||
except Exception as e:
|
||||
test_datetime = futures[future]
|
||||
results.append([test_datetime.date().strftime('%Y-%m-%d'),
|
||||
test_datetime.time().strftime('%H:%M:%S'),
|
||||
"", "", "", "", "", "", "", "", "", "", f"Error: {str(e)}"])
|
||||
pbar.update(1)
|
||||
|
||||
with open(output_file, 'a', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerows(results)
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_loop_test(start_date="2024-10-01", end_date="2025-03-12", interval_hours=1, output_file="test_results.csv", chunk_size=1000)
|
@ -2,37 +2,27 @@ from datetime import datetime, timedelta
|
||||
from dash.dependencies import Input, Output
|
||||
from pkg.dash.app_init import app
|
||||
from pkg.config import render_data
|
||||
from pkg.tool import aggregate_data, generate_xticks, minutes_to_time, get_tweets_since_last_friday
|
||||
from pkg.tool import aggregate_data, minutes_to_time, get_tweets_since_last_friday,get_pace_and_total_tweets
|
||||
from dash import dcc
|
||||
import plotly.graph_objs as go
|
||||
import pandas as pd
|
||||
|
||||
import pytz
|
||||
import numpy as np
|
||||
|
||||
@app.callback(
|
||||
[Output('tabs-content', 'children'),
|
||||
Output('multi-day-warning', 'children'),
|
||||
Output('multi-tweet-summary', 'children')],
|
||||
[Input('tabs', 'value'),
|
||||
Input('multi-date-picker', 'value'),
|
||||
Input('multi-interval-picker', 'value'),
|
||||
Input('time-zone-checklist', 'value'),
|
||||
Input('days-display-picker', 'value')]
|
||||
)
|
||||
def render_tab_content(tab, selected_dates, interval, time_zones, days_to_display):
|
||||
def render_tab_content(tab, interval, days_to_display):
|
||||
warning = ""
|
||||
if tab == 'line':
|
||||
if not selected_dates: # Handle None or empty list
|
||||
selected_dates = [datetime.now().date()] # Default to today
|
||||
warning = "No dates selected. Showing today’s data."
|
||||
if len(selected_dates) > 10:
|
||||
selected_dates = selected_dates[:10]
|
||||
warning = "Maximum of 10 days can be selected. Showing first 10 selected days."
|
||||
selected_dates = [datetime.strptime(date, '%Y-%m-%d').date() for date in selected_dates]
|
||||
else:
|
||||
available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
|
||||
selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
|
||||
if not available_dates:
|
||||
warning = "No data available. Showing today’s date with zero tweets."
|
||||
available_dates = sorted(render_data.global_agg_df['date'].unique(), reverse=True)
|
||||
selected_dates = available_dates[:days_to_display] if available_dates else [datetime.now().date()]
|
||||
if not available_dates:
|
||||
warning = "No data available. Showing today’s date with zero tweets."
|
||||
|
||||
multi_data_agg = render_data.global_agg_df[render_data.global_agg_df['date'].isin(selected_dates)].copy()
|
||||
if multi_data_agg.empty:
|
||||
@ -47,23 +37,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
|
||||
tweet_count_total = 0
|
||||
|
||||
agg_data = aggregate_data(multi_data_agg, interval)
|
||||
xticks, xtick_labels = generate_xticks(interval)
|
||||
|
||||
if tab == 'line':
|
||||
fig = go.Figure()
|
||||
for date in selected_dates:
|
||||
day_data = agg_data[agg_data['date'] == date]
|
||||
hover_times = [f"{date} {minutes_to_time(minute)} EST" for minute in day_data['interval_group']]
|
||||
fig.add_trace(go.Scatter(
|
||||
x=day_data['interval_group'],
|
||||
y=day_data['tweet_count'],
|
||||
mode='lines',
|
||||
name=str(date),
|
||||
customdata=hover_times,
|
||||
hovertemplate='%{customdata}<br>Tweets: %{y}<extra></extra>'
|
||||
))
|
||||
|
||||
elif tab == 'heatmap':
|
||||
if tab == 'heatmap':
|
||||
pivot_data = agg_data.pivot(index='date', columns='interval_group', values='tweet_count').fillna(0)
|
||||
pivot_data.index = pivot_data.index.astype(str)
|
||||
fig = go.Figure(data=go.Heatmap(
|
||||
@ -77,7 +52,7 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
|
||||
|
||||
for i, date_str in enumerate(pivot_data.index):
|
||||
date = datetime.strptime(date_str, '%Y-%m-%d').date()
|
||||
if date.weekday() == 4: # Friday
|
||||
if date.weekday() == 4:
|
||||
prev_date = date - timedelta(days=1)
|
||||
if str(prev_date) in pivot_data.index:
|
||||
y_position = i / len(pivot_data.index)
|
||||
@ -105,8 +80,8 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
|
||||
|
||||
for _, row in one_day_data.iterrows():
|
||||
minute = row['interval_group']
|
||||
hour = int(minute // 60) # Convert to integer
|
||||
interval_idx = int((minute % 60) // interval) # Convert to integer
|
||||
hour = int(minute // 60)
|
||||
interval_idx = int((minute % 60) // interval)
|
||||
if hour < 24:
|
||||
z_values[hour][interval_idx] = row['tweet_count']
|
||||
|
||||
@ -126,21 +101,13 @@ def render_tab_content(tab, selected_dates, interval, time_zones, days_to_displa
|
||||
hovertemplate='%{y}:%{x} EST<br>Tweets: %{z}<br>Rate: %{customdata:.2%}<extra></extra>'
|
||||
))
|
||||
|
||||
if tab in ['line', 'one_day_heatmap']:
|
||||
fig.update_layout(
|
||||
title=f'{"Line" if tab == "line" else "One-Day Heatmap"} Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
|
||||
xaxis_title='Minutes' if tab == 'one_day_heatmap' else 'Eastern Time (HH:MM)',
|
||||
yaxis_title='Hour of Day' if tab == 'one_day_heatmap' else 'Tweet Count',
|
||||
xaxis=dict(
|
||||
range=[0, 1440] if tab == 'line' else None,
|
||||
tickvals=xticks if tab == 'line' else None,
|
||||
ticktext=xtick_labels if tab == 'line' else None,
|
||||
tickangle=45 if tab == 'line' else 0
|
||||
),
|
||||
title=f'One-Day Heatmap Tweet Frequency (Interval: {interval} minutes, EST, {len(selected_dates)} days)',
|
||||
xaxis_title='Minutes',
|
||||
yaxis_title='Hour of Day',
|
||||
height=600,
|
||||
showlegend=(tab == 'line'),
|
||||
yaxis=dict(autorange='reversed') if tab == 'one_day_heatmap' else None
|
||||
yaxis=dict(autorange='reversed')
|
||||
)
|
||||
|
||||
summary = f"Total tweets for selected dates: {int(tweet_count_total)}Total tweets: {get_tweets_since_last_friday()}"
|
||||
summary = f"Total tweets: {get_tweets_since_last_friday()}"
|
||||
return dcc.Graph(figure=fig), warning, summary
|
@ -3,12 +3,9 @@ from dash.dependencies import Input, Output
|
||||
|
||||
|
||||
@app.callback(
|
||||
[Output('date-picker-container', 'style'),
|
||||
Output('days-display-container', 'style'),
|
||||
Output('time-zone-checklist', 'style')],
|
||||
[Output('days-display-container', 'style'),
|
||||
Output('multi-interval-container', 'style')],
|
||||
[Input('tabs', 'value')]
|
||||
)
|
||||
def toggle_controls_visibility(tab):
|
||||
if tab == 'heatmap' or tab == 'one_day_heatmap':
|
||||
return {'display': 'none'}, {'display': 'block'}, {'display': 'none'}
|
||||
return {'display': 'block'}, {'display': 'none'}, {'display': 'block'}
|
||||
return {'display': 'block'},{'display': 'block'}
|
||||
|
51
pkg/tool.py
51
pkg/tool.py
@ -96,3 +96,54 @@ def format_time_str(days_to_next_friday):
|
||||
return f"{days}d {hours:02d}h {minutes:02d}m {seconds:02d}s ({total_hours}h)"
|
||||
|
||||
|
||||
def get_pace_and_total_tweets(target_time: datetime) -> tuple[float, int]:
|
||||
est = pytz.timezone('US/Eastern')
|
||||
|
||||
# 如果 target_time 没有时区信息,假设为 EST
|
||||
if target_time.tzinfo is None:
|
||||
target_time = est.localize(target_time)
|
||||
|
||||
# 计算上周五 12:00 AM EST
|
||||
target_date = target_time.date()
|
||||
days_since_last_friday = (target_date.weekday() + 3) % 7 # 距离上周五的天数
|
||||
last_friday = target_time - timedelta(days=days_since_last_friday)
|
||||
last_friday_midnight = last_friday.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
# 计算下周五 12:00 AM EST
|
||||
days_to_next_friday = (4 - target_date.weekday()) % 7
|
||||
next_friday = target_time + timedelta(days=days_to_next_friday)
|
||||
next_friday_midnight = next_friday.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
if target_time > next_friday_midnight:
|
||||
next_friday_midnight += timedelta(days=7)
|
||||
|
||||
# 从 global_agg_df 中筛选从上周五 12:00 AM 到 target_time 的数据
|
||||
if hasattr(render_data, 'global_agg_df') and not render_data.global_agg_df.empty:
|
||||
multi_data_agg = render_data.global_agg_df[
|
||||
(render_data.global_agg_df['date'] >= last_friday_midnight.date()) &
|
||||
(render_data.global_agg_df['date'] <= target_date)
|
||||
].copy()
|
||||
else:
|
||||
multi_data_agg = pd.DataFrame()
|
||||
|
||||
if multi_data_agg.empty:
|
||||
total_tweets = 0
|
||||
else:
|
||||
# 使用 minute_of_day 转换为时间戳并筛选到 target_time 之前
|
||||
multi_data_agg['timestamp'] = pd.to_datetime(multi_data_agg['date'].astype(str)) + \
|
||||
pd.to_timedelta(multi_data_agg['minute_of_day'], unit='m')
|
||||
multi_data_agg['timestamp'] = multi_data_agg['timestamp'].dt.tz_localize(est)
|
||||
multi_data_agg = multi_data_agg[multi_data_agg['timestamp'] <= target_time]
|
||||
total_tweets = multi_data_agg['tweet_count'].sum() if 'tweet_count' in multi_data_agg else 0
|
||||
|
||||
# 计算 Pace
|
||||
days_elapsed = (target_time - last_friday_midnight).total_seconds() / (24 * 60 * 60)
|
||||
days_remaining = (next_friday_midnight - target_time).total_seconds() / (24 * 60 * 60)
|
||||
|
||||
if days_elapsed > 0 and total_tweets > 0:
|
||||
daily_avg = total_tweets / days_elapsed
|
||||
pace = daily_avg * days_remaining + total_tweets
|
||||
else:
|
||||
pace = float(total_tweets) # 如果没有数据或时间未开始,Pace 等于当前推文数
|
||||
|
||||
return round(pace, 2), int(total_tweets)
|
||||
|
||||
|
240
test/aola.py
Normal file
240
test/aola.py
Normal file
@ -0,0 +1,240 @@
|
||||
import sqlite3
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
class TeamSwitchPacketGenerator:
|
||||
def __init__(self):
|
||||
# 检查数据库文件是否存在
|
||||
db_file = 'aola.sqlite'
|
||||
if not os.path.exists(db_file):
|
||||
raise FileNotFoundError(f"数据库文件 {db_file} 未找到,请确保文件在 {os.getcwd()} 目录下!")
|
||||
|
||||
print(f"数据库文件路径: {os.path.abspath(db_file)}")
|
||||
self.conn = sqlite3.connect(db_file)
|
||||
self.cursor = self.conn.cursor()
|
||||
self.root = tk.Tk()
|
||||
self.root.title("阵容切换封包生成器")
|
||||
|
||||
# 创建下拉框
|
||||
self.team_var = tk.StringVar()
|
||||
self.team_dropdown = ttk.Combobox(self.root, textvariable=self.team_var)
|
||||
self.team_dropdown['values'] = self.get_team_names()
|
||||
self.team_dropdown.pack(pady=10)
|
||||
self.team_dropdown.set("请选择阵容")
|
||||
|
||||
# 创建按钮框架,包含“刷新”和“生成封包”按钮
|
||||
button_frame = tk.Frame(self.root)
|
||||
button_frame.pack(pady=5)
|
||||
|
||||
# 刷新按钮
|
||||
tk.Button(button_frame, text="刷新", command=self.refresh_dropdown).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 生成按钮
|
||||
tk.Button(button_frame, text="生成封包(第一次生成请先解锁二级密码)", command=self.generate_packets).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 创建 Frame 包含封包输出框和复制按钮
|
||||
output_frame = tk.Frame(self.root)
|
||||
output_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
# 封包输出框
|
||||
tk.Label(output_frame, text="封包输出:").pack(anchor=tk.W)
|
||||
self.output_text = tk.Text(output_frame, height=15, width=60)
|
||||
self.output_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||||
|
||||
# 复制按钮(放在右侧)
|
||||
tk.Button(output_frame, text="复制脚本", command=self.copy_output).pack(side=tk.RIGHT, padx=5)
|
||||
|
||||
# 信息提示框
|
||||
tk.Label(self.root, text="提示信息:").pack(anchor=tk.W)
|
||||
self.info_text = tk.Text(self.root, height=5, width=80, fg="red")
|
||||
self.info_text.pack(pady=5)
|
||||
|
||||
def get_team_names(self):
|
||||
"""从数据库获取所有阵容名称"""
|
||||
try:
|
||||
self.cursor.execute("SELECT team_name FROM aolaer_team order by team_name")
|
||||
return [row[0] for row in self.cursor.fetchall()]
|
||||
except sqlite3.Error as e:
|
||||
self.info_text.insert(tk.END, f"获取阵容名称失败: {e}\n")
|
||||
return []
|
||||
|
||||
def refresh_dropdown(self):
|
||||
"""刷新下拉框数据"""
|
||||
# 重新获取阵容名称
|
||||
team_names = self.get_team_names()
|
||||
# 更新下拉框选项
|
||||
self.team_dropdown['values'] = team_names
|
||||
# 重置选择
|
||||
self.team_dropdown.set("请选择阵容")
|
||||
# 在提示框中显示刷新成功
|
||||
self.info_text.insert(tk.END, "下拉框数据已刷新!\n")
|
||||
|
||||
def copy_output(self):
|
||||
"""复制脚本输出框的内容到剪贴板"""
|
||||
content = self.output_text.get(1.0, tk.END).strip()
|
||||
if content:
|
||||
self.root.clipboard_clear()
|
||||
self.root.clipboard_append(content)
|
||||
self.info_text.insert(tk.END, "脚本内容已复制到剪贴板!\n")
|
||||
else:
|
||||
self.info_text.insert(tk.END, "脚本输出框为空,无法复制!\n")
|
||||
|
||||
def validate_ids(self, ids_str):
|
||||
"""验证 ids 数组,确保格式正确"""
|
||||
try:
|
||||
ids = json.loads(ids_str)
|
||||
if not isinstance(ids, list) or len(ids) != 5:
|
||||
return False, "ids 必须是长度为 5 的数组"
|
||||
for x in ids:
|
||||
if not isinstance(x, int) or x < 0:
|
||||
return False, "ids 中的值必须是非负整数"
|
||||
return True, ids
|
||||
except json.JSONDecodeError:
|
||||
return False, "ids 格式不正确"
|
||||
|
||||
def generate_packets(self):
|
||||
"""生成封包代码"""
|
||||
self.output_text.delete(1.0, tk.END)
|
||||
self.info_text.delete(1.0, tk.END)
|
||||
packets = []
|
||||
|
||||
selected_team = self.team_var.get()
|
||||
if not selected_team or selected_team == "请选择阵容":
|
||||
self.info_text.insert(tk.END, "请先选择一个阵容!\n")
|
||||
return
|
||||
|
||||
try:
|
||||
# 获取 team_code 和 team_set
|
||||
self.cursor.execute("SELECT team_code, team_set FROM aolaer_team WHERE team_name = ?", (selected_team,))
|
||||
result = self.cursor.fetchone()
|
||||
if not result:
|
||||
self.info_text.insert(tk.END, f"未找到阵容 {selected_team} 的 team_code!\n")
|
||||
return
|
||||
team_code, team_set = result
|
||||
packets.append(f'|#send={{"id":13,"param":{{"pms":"{team_code}"}},"cmd":"1222"}}|')
|
||||
|
||||
# 处理 is_set = 1 的记录(重置魂卡)
|
||||
self.cursor.execute("SELECT id FROM aolaer_id WHERE is_set = 1")
|
||||
set_ids = [row[0] for row in self.cursor.fetchall()]
|
||||
|
||||
for set_id in set_ids:
|
||||
packets.append(
|
||||
f'|#send={{"id":42,"param":{{"bk":1,"petId":{set_id},"ids":[0,0,0,0,0]}},"cmd":"ASC221104_2"}}|')
|
||||
self.cursor.execute("UPDATE aolaer_id SET is_set = 0 WHERE id = ?", (set_id,))
|
||||
self.conn.commit()
|
||||
|
||||
# 处理 team_code 中的 ID
|
||||
team_ids = team_code.split('#')
|
||||
team_ids = [id for id in team_ids if id]
|
||||
|
||||
# 处理 team_set
|
||||
team_set_codes = None
|
||||
if team_set:
|
||||
team_set_codes = team_set.split('#')
|
||||
if len(team_set_codes) != len(team_ids):
|
||||
self.info_text.insert(tk.END,
|
||||
f"警告:team_set 的长度 ({len(team_set_codes)}) 与 team_code 的 ID 数量 ({len(team_ids)}) 不匹配!\n")
|
||||
return
|
||||
|
||||
# 查询 aolaer_id 表
|
||||
self.cursor.execute(
|
||||
"SELECT id, div_weapon, aolaer_typecode FROM aolaer_id WHERE id IN ({})".format(
|
||||
','.join('?' for _ in team_ids)
|
||||
),
|
||||
team_ids
|
||||
)
|
||||
|
||||
id_to_typecode = {row[0]: row for row in self.cursor.fetchall()}
|
||||
|
||||
# 分组生成封包,确保顺序:子宠物 → 魂艺 → 魂卡
|
||||
sub_pet_packets = []
|
||||
soul_art_packets = []
|
||||
soul_card_packets = []
|
||||
|
||||
for idx, id in enumerate(team_ids):
|
||||
id = int(id)
|
||||
if id not in id_to_typecode:
|
||||
self.info_text.insert(tk.END, f"警告:ID {id} 在 aolaer_id 表中不存在!\n")
|
||||
continue
|
||||
|
||||
div_weapon = id_to_typecode[id][1]
|
||||
typecode = id_to_typecode[id][2]
|
||||
if team_set_codes and team_set_codes[idx] != '0':
|
||||
typecode = team_set_codes[idx]
|
||||
|
||||
# 生成子宠物封包(即使 div_weapon 为空,也生成占位封包)
|
||||
if div_weapon is not None and str(div_weapon).strip():
|
||||
sub_pet_packets.append(
|
||||
f'|#send={{"id":42,"param":{{"subPetId":{id},"petId":{div_weapon}}},"cmd":"ASBS230623_con"}}|')
|
||||
else:
|
||||
sub_pet_packets.append(
|
||||
f'|#send={{"id":42,"param":{{"subPetId":{id},"petId":0}},"cmd":"ASBS230623_con"}}|')
|
||||
self.info_text.insert(tk.END, f"提示:ID {id} 的 div_weapon 为空,使用默认值 0 生成子宠物封包!\n")
|
||||
|
||||
# 获取 soul_art 和 soul_card
|
||||
self.cursor.execute("SELECT soul_art, soul_card FROM aolaer_type WHERE aolaer_typecode = ?",
|
||||
(typecode,))
|
||||
result = self.cursor.fetchone()
|
||||
if not result:
|
||||
self.info_text.insert(tk.END, f"警告:typecode {typecode} 在 aolaer_type 表中不存在!\n")
|
||||
continue
|
||||
soul_art, soul_card = result
|
||||
|
||||
# 生成魂艺封包(即使 soul_art 为空,也生成占位封包)
|
||||
if soul_art and soul_art.strip():
|
||||
try:
|
||||
sid1, sid2 = soul_art.split(',')
|
||||
soul_art_packets.append(
|
||||
f'|#send={{"id":42,"param":{{"ui":{sid1},"petId":{id}}},"cmd":"ATT231229_1"}}|')
|
||||
soul_art_packets.append(
|
||||
f'|#send={{"id":42,"param":{{"ui":{sid2},"petId":{id}}},"cmd":"ATT231229_1"}}|')
|
||||
except ValueError:
|
||||
self.info_text.insert(tk.END, f"错误:soul_art 格式不正确 for typecode {typecode}!\n")
|
||||
continue
|
||||
else:
|
||||
soul_art_packets.append(f'|#send={{"id":42,"param":{{"ui":0,"petId":{id}}},"cmd":"ATT231229_1"}}|')
|
||||
soul_art_packets.append(f'|#send={{"id":42,"param":{{"ui":0,"petId":{id}}},"cmd":"ATT231229_1"}}|')
|
||||
self.info_text.insert(tk.END,
|
||||
f"提示:typecode {typecode} 的 soul_art 为空,使用默认值 0 生成魂艺封包!\n")
|
||||
|
||||
# 生成魂卡封包
|
||||
if soul_card and soul_card.strip():
|
||||
is_valid, ids_or_error = self.validate_ids(soul_card)
|
||||
if is_valid:
|
||||
soul_card_packets.append(
|
||||
f'|#send={{"id":42,"param":{{"bk":1,"petId":{id},"ids":{soul_card}}},"cmd":"ASC221104_2"}}|')
|
||||
self.cursor.execute("UPDATE aolaer_id SET is_set = 1 WHERE id = ?", (id,))
|
||||
self.conn.commit()
|
||||
else:
|
||||
self.info_text.insert(tk.END,
|
||||
f"警告:typecode {typecode} 的 soul_card {soul_card} 无效({ids_or_error}),已跳过处理!\n")
|
||||
else:
|
||||
self.info_text.insert(tk.END, f"提示:typecode {typecode} 的 soul_card 为空,已跳过处理!\n")
|
||||
|
||||
# 按顺序添加封包:子宠物 → 魂艺 → 魂卡
|
||||
packets.extend(sub_pet_packets)
|
||||
packets.extend(soul_art_packets)
|
||||
packets.extend(soul_card_packets)
|
||||
|
||||
if packets:
|
||||
self.output_text.insert(tk.END, '\n'.join(packets))
|
||||
else:
|
||||
self.info_text.insert(tk.END, "未生成任何封包,请检查数据!\n")
|
||||
except sqlite3.Error as e:
|
||||
self.info_text.insert(tk.END, f"数据库操作失败: {e}\n")
|
||||
|
||||
def run(self):
|
||||
"""运行程序"""
|
||||
self.root.mainloop()
|
||||
|
||||
def __del__(self):
|
||||
"""清理数据库连接"""
|
||||
self.conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = TeamSwitchPacketGenerator()
|
||||
app.run()
|
BIN
test/aola.sqlite
Normal file
BIN
test/aola.sqlite
Normal file
Binary file not shown.
38
test/compaare.py
Normal file
38
test/compaare.py
Normal file
@ -0,0 +1,38 @@
|
||||
def compare_strings(str1, str2):
|
||||
# 清理字符串:去除首尾空白,按换行分割后去除每行空白,按逗号分割
|
||||
list1 = [item.strip() for item in str1.strip().split('\n') if item.strip()]
|
||||
list1 = [sub_item.strip() for item in list1 for sub_item in item.split(',') if sub_item.strip()]
|
||||
list2 = [item.strip() for item in str2.strip().split('\n') if item.strip()]
|
||||
list2 = [sub_item.strip() for item in list2 for sub_item in item.split(',') if sub_item.strip()]
|
||||
|
||||
# 找出差异
|
||||
diff1 = [item for item in list1 if item not in list2] # str1 比 str2 多的元素
|
||||
diff2 = [item for item in list2 if item not in list1] # str2 比 str1 多的元素
|
||||
|
||||
# 构建输出
|
||||
result = []
|
||||
if diff1:
|
||||
result.append(f"第一个字符串比第二个字符串多了 {len(diff1)} 个元素:")
|
||||
result.extend(diff1) # 每个元素占一行
|
||||
if diff2:
|
||||
result.append(f"第二个字符串比第一个字符串多了 {len(diff2)} 个元素:")
|
||||
result.extend(diff2) # 每个元素占一行
|
||||
|
||||
# 返回结果
|
||||
return '\n'.join(result) if result else "两个字符串相同"
|
||||
|
||||
|
||||
# 从文件读取数据
|
||||
try:
|
||||
with open('str1.txt', 'r', encoding='utf-8') as f:
|
||||
str1 = f.read()
|
||||
with open('str2.txt', 'r', encoding='utf-8') as f:
|
||||
str2 = f.read()
|
||||
|
||||
# 运行比较
|
||||
print(compare_strings(str1, str2))
|
||||
|
||||
except FileNotFoundError as e:
|
||||
print(f"错误:找不到文件 {e.filename}")
|
||||
except Exception as e:
|
||||
print(f"发生错误:{str(e)}")
|
234
test/m.py
Normal file
234
test/m.py
Normal file
@ -0,0 +1,234 @@
|
||||
# 生成 SQL 插入语句的 Python 脚本
|
||||
|
||||
# 数据集(可以替换为从文件读取)
|
||||
data = """10.4101,睑球粘连羊膜移植修补术,LV3
|
||||
10.4402,异体结膜移植术,LV3
|
||||
11.3204,翼状胬肉切除术伴丝裂霉素注入,LV3
|
||||
12.1401,虹膜全切除术,LV3
|
||||
12.3504,瞳孔残膜切除术,LV3
|
||||
12.4201,前房机化膜切除术,LV3
|
||||
12.6700,眼房水引流装置置入,LV3
|
||||
13.1100,经颞下入路晶状体囊内摘出术,LV3
|
||||
13.9003,晶状体囊袋张力环植入术,LV3
|
||||
14.0202,后段眼球壁异物取出术,LV3
|
||||
14.7202,后入路玻璃体切割术伴替代物注入,LV4
|
||||
14.7905,玻璃体气液交换术,LV3
|
||||
15.2200,一条眼外肌的缩短术,LV3
|
||||
10.4102,睑球粘连口唇黏膜移植修补术,LV3
|
||||
10.4201,结膜穹窿羊膜移植重建术,LV3
|
||||
10.4202,结膜穹窿口唇黏膜移植重建术,LV3
|
||||
10.4401,自体结膜移植术,LV3
|
||||
10.4901,结膜滤过泡瘘修补术,LV3
|
||||
10.4903,结膜囊成形术,LV3
|
||||
10.9901,结膜松弛矫正术,LV3
|
||||
11.3200,胬肉切除术伴角膜移植术,LV3
|
||||
11.3201,翼状胬肉切除伴自体干细胞移植术,LV3
|
||||
11.3202,翼状胬肉切除术伴异体干细胞移植术,LV3
|
||||
11.3203,翼状胬肉切除伴羊膜植片移植术,LV3
|
||||
11.7903,羊膜移植眼表重建术,LV3
|
||||
11.9901,角巩膜割烙术,LV3
|
||||
12.1403,虹膜周边切除术,LV3
|
||||
12.1404,虹膜周边激光切除术,LV3
|
||||
12.3300,虹膜后粘连松解术,LV3
|
||||
12.3301,虹膜粘连松解术,LV3
|
||||
12.3500,瞳孔成形术,LV3
|
||||
12.3501,瞳孔膜穿刺术,LV3
|
||||
12.3502,瞳孔粘连松解术,LV3
|
||||
12.3503,瞳孔切开术,LV3
|
||||
12.3505,滤过泡针拨术,LV3
|
||||
12.3901,虹膜离断缝合术,LV3
|
||||
12.3902,虹膜复位术,LV3
|
||||
12.4200,虹膜病损切除术,LV3
|
||||
12.5300,眼前房角切开伴眼前房角穿刺,LV3
|
||||
12.5400,外路小梁切开术,LV3
|
||||
12.5501,睫状体切开术,LV3
|
||||
12.6404,小梁切除术伴丝裂霉素注入,LV3
|
||||
12.6405,非穿透性小梁切除术,LV3
|
||||
12.6406,小梁切除术伴羊膜移植,LV3
|
||||
12.6407,小梁切除术伴移植物,LV3
|
||||
12.6601,滤泡修复术,LV3
|
||||
12.6701,青光眼阀取出术,LV3
|
||||
12.6702,青光眼阀修复调位术,LV3
|
||||
12.6703,前房导管术,LV3
|
||||
12.6704,青光眼阀置入术,LV3
|
||||
12.8801,巩膜外加压术,LV3
|
||||
13.0100,用磁吸法的去除晶状体异物,LV3
|
||||
13.0201,晶状体切开异物取出术,LV3
|
||||
13.1901,白内障囊内冷凝摘出术,LV3
|
||||
13.1902,白内障囊内摘除术,LV3
|
||||
13.6501,晶状体前囊膜切除术,LV3
|
||||
13.6502,晶状体后囊膜切除术,LV3
|
||||
13.6503,晶状体后囊膜激光切开术,LV3
|
||||
13.6901,残留晶状体皮质切除术,LV3
|
||||
13.7000,置入人工晶状体,LV3
|
||||
13.9001,人工晶状体复位术,LV3
|
||||
13.9002,人工晶状体悬吊术,LV3
|
||||
14.0100,用磁吸法去除眼后节异物,LV3
|
||||
14.0101,玻璃体异物磁吸术,LV3
|
||||
14.0201,脉络膜切开异物取出术,LV3
|
||||
14.2402,视网膜病损激光凝固术,LV3
|
||||
14.4901,巩膜环扎术伴空气填塞,LV4
|
||||
14.4902,巩膜环扎术伴巩膜切除术,LV4
|
||||
14.7401,后入路玻璃体切割术,LV4
|
||||
14.7501,玻璃体硅油填充术,LV3
|
||||
14.7901,玻璃体腔探查术,LV3
|
||||
14.7904,玻璃体腔残留晶体皮质取出术,LV3
|
||||
15.0900,眼外肌和腱的其他诊断性操作,LV2
|
||||
15.1100,一条眼外肌的后徙术,LV3
|
||||
15.1200,一条眼外肌的前徙术,LV3
|
||||
15.2100,一条眼外肌的延长术,LV3
|
||||
15.2901,一条眼外肌的悬吊术,LV3
|
||||
16.0101,外侧开眶术,LV4
|
||||
10.4100x001,睑球粘连游离移植物修补术,LV3
|
||||
12.3900x001,虹膜修补术,LV3
|
||||
12.6400x010,滤过道再通术,LV3
|
||||
13.1900x008,膜性白内障剪除术,LV3
|
||||
13.4100x001,白内障超声乳化抽吸术,LV3
|
||||
13.7200x001,人工晶体二期置入术,LV3
|
||||
13.9000x007,人工晶体缝合术,LV3
|
||||
14.0200x001,眼后节异物去除术,LV3
|
||||
14.7500x003,玻璃体硅油置换术,LV3
|
||||
15.3x01,两条或两条以上眼外肌的后徙术,LV3
|
||||
15.9x00x001,眼肌部分切除术,LV3
|
||||
16.0900x005,多个眶壁减压术,LV4
|
||||
10.4200x001,结膜穹窿游离移植物重建术,LV3
|
||||
10.4300x002,结膜穹窿成形术,LV3
|
||||
10.4400x001,结膜移植术,LV3
|
||||
10.4400x002,羊膜移植结膜修补术,LV3
|
||||
10.4900x001,结膜成形术,LV3
|
||||
10.4900x003,结膜修补术,LV3
|
||||
10.4900x004,结膜瓣修补术,LV3
|
||||
10.5x01,睑球粘连分离术,LV3
|
||||
10.6x00x001,结膜缝合术,LV3
|
||||
10.6x00x002,结膜撕裂修补术,LV3
|
||||
10.9900x001,结膜瓣遮盖术,LV3
|
||||
10.9900x004,结膜脱位复位术,LV3
|
||||
11.6900x003,异体角膜缘干细胞移植术,LV3
|
||||
11.9900x002,自体角膜缘干细胞取材术,LV3
|
||||
12.1400x001,虹膜部分切除术,LV3
|
||||
12.1400x008,瞳孔前膜激光切开术,LV3
|
||||
12.3200x001,虹膜前粘连松解术,LV3
|
||||
12.3900x004,虹膜还纳术,LV3
|
||||
12.3900x005,虹膜周边激光成形术,LV3
|
||||
12.4100x002,虹膜周边激光破坏术,LV3
|
||||
12.4100x003,虹膜周边冷冻破坏术,LV3
|
||||
12.4100x004,虹膜周边电灼破坏术,LV3
|
||||
12.5200x001,前房角切开术,LV3
|
||||
12.5900x001,房角分离术,LV3
|
||||
12.6400x003,滤帘切除术[小梁切除术],LV3
|
||||
12.6400x009,小梁切除术伴人造移植物,LV3
|
||||
12.6700x010,眼压调节器再次置入术,LV3
|
||||
12.8300x002,眼前节手术伤口修补术,LV3
|
||||
12.8800x002,巩膜移植物加固术,LV3
|
||||
13.1900x007,晶状体囊内摘除术,LV3
|
||||
13.2x01,晶状体刮匙摘除术,LV3
|
||||
13.3x00x001,晶状体单纯抽吸囊外摘除术,LV3
|
||||
13.3x01,创伤性白内障冲洗术,LV3
|
||||
13.4200x001,经后路白内障切割吸出术,LV3
|
||||
13.4300x001,白内障切割吸出术,LV3
|
||||
13.5900x001,白内障囊外摘除术,LV3
|
||||
13.6400x001,后发性白内障切开术,LV3
|
||||
13.6500x002,后发性白内障切除术,LV3
|
||||
13.6900x002,激光后囊切开术[YAG],LV3
|
||||
13.7100x001,白内障摘除伴人工晶体一期置入术,LV3
|
||||
13.7200x002,人工晶体再置入术,LV3
|
||||
13.8x00x003,人工晶体取出术,LV3
|
||||
13.9000x004,后囊切开术,LV3
|
||||
13.9000x005,张力环缝合术,LV3
|
||||
13.9000x006,虹膜隔晶体置入术,LV3
|
||||
13.9000x008,人工晶体前膜切除术,LV3
|
||||
13.9000x009,人工晶体睫状沟固定术,LV3
|
||||
13.9000x010,晶状体前囊切开术,LV3
|
||||
13.9000x011,晶状体囊膜剪开术,LV3
|
||||
13.9100x001,可植入式隐形眼镜置入术[ICL置入术],LV3
|
||||
14.0200x002,玻璃体腔异物取出术,LV3
|
||||
14.4900x001,巩膜环扎术,LV4
|
||||
14.6x00x001,眼后节置入物取出术,LV3
|
||||
14.6x01,巩膜环扎带取出术,LV3
|
||||
14.6x02,玻璃体硅油取出术,LV3
|
||||
14.7100x001,前入路玻璃体切除术,LV4
|
||||
14.7300x001,前入路玻璃体切割术,LV4
|
||||
14.7500x004,玻璃体重水置换术,LV3
|
||||
14.7900x001,人工玻璃体球囊置入术,LV3
|
||||
14.9x00x001,巩膜外环扎带调整术,LV3
|
||||
15.1900x001,一条眼外肌离断术,LV3
|
||||
15.3x02,两条或两条以上眼外肌的前徙术,LV3
|
||||
15.4x01,两条或两条以上眼外肌缩短术,LV3
|
||||
15.5x00,眼外肌移位术,LV3
|
||||
15.6x00,眼外肌手术后的修复术,LV3
|
||||
15.7x00,眼外肌损伤修补术,LV3
|
||||
15.7x01,眼肌粘连松解术,LV3
|
||||
15.9x00x007,眼肌探查术,LV3
|
||||
15.9x00x008,眼睑轮匝肌切断术,LV3
|
||||
15.9x00x009,眼外肌病损切除术,LV3
|
||||
15.9x01,眼阔筋膜切除术,LV3
|
||||
16.0200x001,眼内自膨胀水凝胶注入术,LV3
|
||||
16.0900x004,一个眶壁减压术,LV3
|
||||
12.3500x001,瞳孔激光成形术,LV3
|
||||
12.4100x001,眼前房病损激光切除术,LV3
|
||||
12.6400x001,激光小梁成形术[ALP、KLP],LV3
|
||||
12.8200x001,巩膜瘘修补术,LV3
|
||||
13.1900x006,白内障针吸术,LV3
|
||||
15.0100x001,眼外肌活检术,LV3
|
||||
15.4x02,两条或两条以上眼外肌悬吊术,LV3
|
||||
15.9x00x010,眼外肌本体感受器破坏术,LV3
|
||||
12.1402,虹膜激光切除术,LV3
|
||||
12.3100,虹膜前房角粘连松解术,LV3
|
||||
12.3400,角膜玻璃体粘连松解术,LV3
|
||||
12.8703,巩膜外加压术伴填充,LV3
|
||||
13.4101,飞秒激光白内障超声乳化抽吸术,LV3
|
||||
13.5100,经颞下入路晶状体囊外摘出术,LV3
|
||||
13.6600,后发膜机械性碎裂术[复发性白内障],LV3
|
||||
14.4100,巩膜环扎术伴有植入物,LV4
|
||||
14.4903,巩膜环扎术伴玻璃体切除术,LV4
|
||||
14.7203,后入路玻璃体切割术伴人工玻璃体置入术,LV4
|
||||
14.7902,玻璃体腔脱位晶状体取出术,LV3
|
||||
15.1300,一条眼外肌的部分切除术,LV3
|
||||
16.0200,眼眶切开术伴置入眼眶植入物,LV4"""
|
||||
|
||||
# SQL 模板(Oracle)
|
||||
sql_template = """
|
||||
INSERT INTO MET_ORDT_OPERATION_COMPARE
|
||||
( COMPARE_ID, DOC_CODE, DOC_NAME, OPERATION_CODE, OPERATION_NAME, OPERATION_LEVEL, OPER_CODE, OPER_DATE)
|
||||
VALUES
|
||||
( '1992006-{operation_code}',
|
||||
'1992006',
|
||||
'刘岚',
|
||||
'{operation_code}',
|
||||
'{operation_name}',
|
||||
'{operation_level}',
|
||||
'admin',
|
||||
SYSDATE);
|
||||
"""
|
||||
|
||||
# 存储生成的 SQL 语句
|
||||
sql_statements = []
|
||||
|
||||
# 解析数据集
|
||||
for line in data.strip().split('\n'):
|
||||
# 按逗号分割,提取字段
|
||||
operation_code, operation_name, operation_level = line.strip().split(',')
|
||||
|
||||
# 替换模板中的占位符
|
||||
sql = sql_template.format(
|
||||
operation_code=operation_code,
|
||||
operation_name=operation_name.replace("'", "''"), # 防止 SQL 注入,处理单引号
|
||||
operation_level=operation_level
|
||||
)
|
||||
|
||||
# 添加到语句列表
|
||||
sql_statements.append(sql)
|
||||
|
||||
# 打印 SQL 语句(可选)
|
||||
for sql in sql_statements:
|
||||
print(sql)
|
||||
|
||||
# 保存 SQL 语句到文件
|
||||
output_file = 'insert_operations.sql'
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
for sql in sql_statements:
|
||||
f.write(sql + '\n')
|
||||
f.write('COMMIT;\n')
|
||||
|
||||
print(f"\nSQL 语句已保存到 {output_file}")
|
||||
|
0
test/str1.txt
Normal file
0
test/str1.txt
Normal file
0
test/str2.txt
Normal file
0
test/str2.txt
Normal file
Loading…
x
Reference in New Issue
Block a user