192 lines
7.0 KiB
Python
192 lines
7.0 KiB
Python
![]() |
import requests
|
||
|
import re
|
||
|
import json
|
||
|
import pandas as pd
|
||
|
import argparse
|
||
|
|
||
|
GUEST_TOKEN_ENDPOINT = "https://api.twitter.com/1.1/guest/activate.json"
|
||
|
STATUS_ENDPOINT = "https://twitter.com/i/api/graphql/"
|
||
|
|
||
|
CURSOR_PATTERN = re.compile('TimelineCursor","value":"([^\"]+)"[^\}]+Bottom"')
|
||
|
ID_PATTERN = re.compile('"rest_id":"([^"]+)"')
|
||
|
COUNT_PATTERN = re.compile('"statuses_count":([0-9]+)')
|
||
|
|
||
|
variables = {
|
||
|
"count": 200,
|
||
|
"withTweetQuoteCount": True,
|
||
|
"includePromotedContent": True,
|
||
|
"withQuickPromoteEligibilityTweetFields": False,
|
||
|
"withSuperFollowsUserFields": True,
|
||
|
"withUserResults": True,
|
||
|
"withBirdwatchPivots": False,
|
||
|
"withDownvotePerspective": False,
|
||
|
"withReactionsMetadata": False,
|
||
|
"withReactionsPerspective": False,
|
||
|
"withSuperFollowsTweetFields": True,
|
||
|
"withVoice": True,
|
||
|
"withV2Timeline": False
|
||
|
}
|
||
|
|
||
|
def send_request(url, session_method, headers, params=None):
|
||
|
if params:
|
||
|
response = session_method(url, headers=headers, stream=True, params={"variables": json.dumps(params)})
|
||
|
else:
|
||
|
response = session_method(url, headers=headers, stream=True)
|
||
|
|
||
|
if response.status_code != 200:
|
||
|
print(response.request.url)
|
||
|
print(response.status_code)
|
||
|
|
||
|
assert response.status_code == 200, f"Failed request to {url}. {response.status_code}. Please submit an issue including this information."
|
||
|
result = [line.decode("utf-8") for line in response.iter_lines()]
|
||
|
return "".join(result)
|
||
|
|
||
|
|
||
|
def search_json(j, target_key, result):
|
||
|
if type(j) == dict:
|
||
|
for key in j:
|
||
|
if key == target_key:
|
||
|
result.append(j[key])
|
||
|
|
||
|
search_json(j[key], target_key, result)
|
||
|
return result
|
||
|
|
||
|
if type(j) == list:
|
||
|
for item in j:
|
||
|
search_json(item, target_key, result)
|
||
|
|
||
|
return result
|
||
|
|
||
|
return result
|
||
|
|
||
|
def tweet_subset(d):
|
||
|
return {
|
||
|
"id": d["id_str"],
|
||
|
"text": d["full_text"],
|
||
|
"created_at": d["created_at"],
|
||
|
"retweet_count": d["retweet_count"],
|
||
|
"favorite_count": d["favorite_count"],
|
||
|
"reply_count": d["reply_count"],
|
||
|
"quote_count": d["quote_count"],
|
||
|
"retweeted": d["retweeted"],
|
||
|
"is_quote_status": d["is_quote_status"],
|
||
|
"possibly_sensitive": d.get("possibly_sensitive", "No data"),
|
||
|
}
|
||
|
|
||
|
def get_tweets(query_id, session, headers, variables, expected_total):
|
||
|
resp = send_request(f"{STATUS_ENDPOINT}{query_id}/UserTweetsAndReplies", session.get, headers, variables)
|
||
|
j = json.loads(resp)
|
||
|
all_tweets = search_json(j, "legacy", [])
|
||
|
|
||
|
all_tweets = [tweet for tweet in all_tweets if "id_str" in tweet]
|
||
|
ids = {tweet["id_str"] for tweet in all_tweets}
|
||
|
|
||
|
while True:
|
||
|
cursor = CURSOR_PATTERN.findall(resp)[0]
|
||
|
variables["cursor"] = cursor
|
||
|
resp = send_request(f"{STATUS_ENDPOINT}{query_id}/UserTweetsAndReplies", session.get, headers, variables)
|
||
|
j = json.loads(resp)
|
||
|
|
||
|
next_tweets = search_json(j, "legacy", [])
|
||
|
next_tweets = [tweet for tweet in next_tweets if "id_str" in tweet]
|
||
|
next_ids = {tweet["id_str"] for tweet in next_tweets}
|
||
|
|
||
|
old_id_size = len(ids)
|
||
|
ids.update(next_ids)
|
||
|
if old_id_size == len(ids):
|
||
|
break
|
||
|
|
||
|
all_tweets.extend(next_tweets)
|
||
|
print(f"{len(all_tweets)} / {expected_total}", end="\r")
|
||
|
|
||
|
all_tweets = [tweet for tweet in all_tweets if "full_text" in tweet and tweet.get("user_id_str", "") == variables["userId"]]
|
||
|
return all_tweets
|
||
|
|
||
|
def get_id_and_tweet_count(session, headers, query_id, username):
|
||
|
resp = send_request(
|
||
|
f"{STATUS_ENDPOINT}{query_id}/UserByScreenName",
|
||
|
session.get,
|
||
|
headers,
|
||
|
params={
|
||
|
"screen_name": username,
|
||
|
"withSafetyModeUserFields": True,
|
||
|
"withSuperFollowsUserFields": True
|
||
|
}
|
||
|
)
|
||
|
|
||
|
ids = ID_PATTERN.findall(resp)
|
||
|
assert len(ids) == 1, f"Failed to find user id for {username}. Please open this as an issue including this message."
|
||
|
|
||
|
counts = COUNT_PATTERN.findall(resp)
|
||
|
assert len(counts) == 1, f"Failed to find tweet count for {username}. Please open this as an issue including this message."
|
||
|
|
||
|
return ids[0], int(counts[0])
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
|
||
|
parser = argparse.ArgumentParser(description="Get tweets for a user.")
|
||
|
parser.add_argument("username", help="The username of the user to get tweets for.")
|
||
|
parser.add_argument("--output", help="The output file to write to. If not specified, prints to stdout.")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
session = requests.Session()
|
||
|
headers = {}
|
||
|
|
||
|
username = args.username
|
||
|
|
||
|
# One of the js files from original url holds the bearer token and query id.
|
||
|
container = send_request(f"https://twitter.com/{username}", session.get, headers)
|
||
|
js_files = re.findall("src=['\"]([^'\"()]*js)['\"]", container)
|
||
|
|
||
|
bearer_token = None
|
||
|
query_id = None
|
||
|
user_query_id = None
|
||
|
|
||
|
# Search the javascript files for a bearer token and UserTweets queryId
|
||
|
for f in js_files:
|
||
|
file_content = send_request(f, session.get, headers)
|
||
|
bt = re.search('["\'](AAA[a-zA-Z0-9%-]+%[a-zA-Z0-9%-]+)["\']', file_content)
|
||
|
|
||
|
ops = re.findall('\{queryId:"[a-zA-Z0-9_]+[^\}]+UserTweetsAndReplies"', file_content)
|
||
|
query_op = [op for op in ops if "UserTweetsAndReplies" in op]
|
||
|
|
||
|
if len(query_op) == 1:
|
||
|
query_id = re.findall('queryId:"([^"]+)"', query_op[0])[0]
|
||
|
|
||
|
if bt:
|
||
|
bearer_token = bt.group(1)
|
||
|
|
||
|
ops = re.findall('\{queryId:"[a-zA-Z0-9_]+[^\}]+UserByScreenName"', file_content)
|
||
|
user_query_op = [op for op in ops if "UserByScreenName" in op]
|
||
|
|
||
|
if len(user_query_op) == 1:
|
||
|
user_query_id = re.findall('queryId:"([^"]+)"', user_query_op[0])[0]
|
||
|
|
||
|
assert bearer_token, f"Did not find bearer token. Are you sure you used the right username? {username}"
|
||
|
assert query_id, f"Did not find query id. Are you sure you used the right twitter username? {username}"
|
||
|
assert user_query_id, f"Did not find user query id. Are you sure you used the right twitter username? {username}"
|
||
|
|
||
|
headers['authorization'] = f"Bearer {bearer_token}"
|
||
|
|
||
|
guest_token_resp = send_request(GUEST_TOKEN_ENDPOINT, session.post, headers)
|
||
|
guest_token = json.loads(guest_token_resp)['guest_token']
|
||
|
assert guest_token, f"Did not find guest token. Probably means the script is broken. Please submit an issue. Include this message in your issue: {username}"
|
||
|
headers['x-guest-token'] = guest_token
|
||
|
|
||
|
user_id, total_count = get_id_and_tweet_count(session, headers, user_query_id, username)
|
||
|
|
||
|
variables["userId"] = user_id
|
||
|
|
||
|
resp = get_tweets(query_id, session, headers, variables, total_count)
|
||
|
all_tweets = [tweet_subset(tweet) for tweet in resp]
|
||
|
|
||
|
df = pd.DataFrame(all_tweets, index=range(len(all_tweets)))
|
||
|
url_series = df.apply(lambda row: f"https://twitter.com/{username}/status/{row['id']}", axis=1)
|
||
|
df["url"] = url_series
|
||
|
df.set_index("id", inplace=True)
|
||
|
|
||
|
if args.output:
|
||
|
df.to_csv(args.output)
|
||
|
else:
|
||
|
print(df.to_string())
|