From 32f3eccc709b791f3ed5d9b5d06b8d21baa32c38 Mon Sep 17 00:00:00 2001 From: JC Francois Date: Thu, 1 Aug 2019 14:58:41 +0200 Subject: [PATCH] Implemented command line parsing --- twoot.py | 378 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 198 insertions(+), 180 deletions(-) mode change 100644 => 100755 twoot.py diff --git a/twoot.py b/twoot.py old mode 100644 new mode 100755 index f3cc2de..ce16ad3 --- a/twoot.py +++ b/twoot.py @@ -19,6 +19,7 @@ ''' import sys +import argparse import os import random import requests @@ -29,14 +30,6 @@ import re from mastodon import Mastodon, MastodonError -#TODO manage command line -TWIT_ACCOUNT = 'hackaday' -MAST_ACCOUNT = 'twoot@noirextreme.com' -MAST_PASSWORD = 'AcX/ZK5Ml6fRVDFi' -MAST_INSTANCE = 'mastodon.host' -MAX_AGE = 5 # in days -MIN_DELAY = 0 # in minutes - USER_AGENTS = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.87 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/68.0', @@ -50,6 +43,7 @@ USER_AGENTS = [ #TODO log to file + def cleanup_tweet_text(tt_iter): ''' Receives an iterator over all the elements contained in the tweet-text container @@ -109,200 +103,224 @@ def cleanup_tweet_text(tt_iter): return tweet_text -# ********************************************************** -# Load twitter page of user. Process all tweets and generate -# list of dictionaries ready to be posted on Mastodon -# ********************************************************** -# To store content of all tweets from this user -tweets = [] +def main(argv): -# Get a copy of the default headers that requests would use -headers = requests.utils.default_headers() + # Build parser for command line arguments + parser = argparse.ArgumentParser(description='toot tweets.') + parser.add_argument('-t', metavar='', action='store', required=True) + parser.add_argument('-i', metavar='', action='store', required=True) + parser.add_argument('-m', metavar='', action='store', required=True) + parser.add_argument('-p', metavar='', action='store', required=True) + parser.add_argument('-a', metavar='', action='store', type=float, default=1) + parser.add_argument('-d', metavar='', action='store', type=float, default=0) -# Update default headers with randomly selected user agent -headers.update( - { - 'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)], - } -) + # Parse command line + args = vars(parser.parse_args()) -# Download twitter page of user -response = requests.get('https://twitter.com/' + TWIT_ACCOUNT, headers=headers) + twit_account = args['t'] + mast_instance = args['i'] + mast_account = args['m'] + mast_password = args['p'] + max_age = float(args['a']) + min_delay = float(args['d']) -# DEBUG: Save page to file -of = open('twitter.html', 'w') -of.write(response.text) -of.close() + # ********************************************************** + # Load twitter page of user. Process all tweets and generate + # list of dictionaries ready to be posted on Mastodon + # ********************************************************** + # To store content of all tweets from this user + tweets = [] -# Verify that download worked -if response.status_code != 200: - print("Could not download twitter timeline. Aborting.") - exit(-1) + # Get a copy of the default headers that requests would use + headers = requests.utils.default_headers() -# Build tree of html elements for processing -soup = BeautifulSoup(response.text, 'html.parser') - -# Extract twitter timeline -results = soup.find_all('div', class_='content') - -for result in results: - # Isolate tweet header - sih = result.find('div', class_='stream-item-header') - - # extract author - author = sih.find('strong', class_='fullname').get_text() - - # Extract author's logo - author_logo_url = sih.find('img', class_='avatar')['src'] - - # Extract time stamp - timestamp = sih.find('a', class_='tweet-timestamp').find('span', class_='_timestamp')['data-time'] - - # Extract tweet id - tweet_id = sih.find('a', class_='tweet-timestamp')['href'] - - # Extract user name - author_account = re.search('^/(.+?)/', tweet_id).group(1) - - # Isolate tweet text container - ttc = result.find('div', class_='js-tweet-text-container') - - # extract iterator over tweet text contents - tt_iter = ttc.find('p', class_='tweet-text').children - - tweet_text = cleanup_tweet_text(tt_iter) - - # Check it the tweet is a retweet from somebody else - if author_account.lower() != TWIT_ACCOUNT.lower(): - tweet_text = 'RT from ' + author + ' @' + author_account + '\n\n' + tweet_text - - # Add footer with link to original tweet - tweet_text += '\n\nOriginal tweet : https://twitter.com' + tweet_id - - # Isolate attached media container - amoc = result.find('div', class_='AdaptiveMediaOuterContainer') - - photos = [] - if amoc: - # Extract photos - photo_conts = amoc.find_all('div', class_='AdaptiveMedia-photoContainer') - for p in photo_conts: - photos.append(p['data-image-url']) - - # Mention presence in videos in tweet - videos = amoc.find_all('div', class_='AdaptiveMedia-videoContainer') - if len(videos) != 0: - tweet_text += '\n\n[Embedded video in original tweet]' - - # Add dictionary with content of tweet to list - tweet = { - "author": author, - "author_account": author_account, - "author_logo_url": author_logo_url, - "timestamp": timestamp, - "tweet_id": tweet_id, - "tweet_text": tweet_text, - "photos": photos, - } - tweets.append(tweet) - -# DEBUG: Print extracted tweets -for t in tweets: - print(t) - - -# ********************************************************** -# Iterate tweets. Check if the tweet has already been posted -# on Mastodon. If not, post it and add it to database -# ********************************************************** - -# Try to open database. If it does not exist, create it -sql = sqlite3.connect('twoot.db') -db = sql.cursor() -db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT, - mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''') - -# Create Mastodon application if it does not exist yet -if not os.path.isfile(MAST_INSTANCE + '.secret'): - try: - Mastodon.create_app( - 'twoot', - api_base_url='https://' + MAST_INSTANCE, - to_file=MAST_INSTANCE + '.secret' - ) - - except MastodonError as me: - print('failed to create app on ' + MAST_INSTANCE) - sys.exit(1) - -# Log in to Mastodon instance -try: - mastodon = Mastodon( - client_id=MAST_INSTANCE + '.secret', - api_base_url='https://' + MAST_INSTANCE + # Update default headers with randomly selected user agent + headers.update( + { + 'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)], + } ) - mastodon.log_in( - username=MAST_ACCOUNT, - password=MAST_PASSWORD, - to_file=MAST_ACCOUNT + ".secret" - ) + # Download twitter page of user + response = requests.get('https://twitter.com/' + twit_account, headers=headers) -except MastodonError as me: - print('ERROR: Login to ' + MAST_INSTANCE + ' Failed') - print(me) - sys.exit(1) + # DEBUG: Save page to file + of = open('twitter.html', 'w') + of.write(response.text) + of.close() -# Upload tweets -for tweet in reversed(tweets): - # Check in database if tweet has already been posted - db.execute('''SELECT * FROM toots WHERE twitter_account = ? AND mastodon_instance = ? AND - mastodon_account = ? AND tweet_id = ?''', - (TWIT_ACCOUNT, MAST_INSTANCE, MAST_ACCOUNT, tweet['tweet_id'])) - tweet_in_db = db.fetchone() + # Verify that download worked + if response.status_code != 200: + print("Could not download twitter timeline. Aborting.") + exit(-1) - if tweet_in_db is not None: - # Skip to next tweet - continue + # Build tree of html elements for processing + soup = BeautifulSoup(response.text, 'html.parser') - # Check that the tweet is not too young (might be deleted) or too old - age_in_hours = (time.time() - float(tweet['timestamp'])) / 3600.0 - min_delay_in_hours = float(MIN_DELAY) / 60.0 - max_age_in_hours = float(MAX_AGE) * 24.0 + # Extract twitter timeline + results = soup.find_all('div', class_='content') - if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours: - # Skip to next tweet - continue + for result in results: + # Isolate tweet header + sih = result.find('div', class_='stream-item-header') - # Upload photos - media_ids = [] - for photo in tweet['photos']: - # Download picture - media = requests.get(photo) + # extract author + author = sih.find('strong', class_='fullname').get_text() - # Upload picture to Mastodon instance - media_posted = mastodon.media_post(media.content, mime_type=media.headers.get('content-type')) - media_ids.append(media_posted['id']) + # Extract author's logo + author_logo_url = sih.find('img', class_='avatar')['src'] - # Post toot + # Extract time stamp + timestamp = sih.find('a', class_='tweet-timestamp').find('span', class_='_timestamp')['data-time'] + + # Extract tweet id + tweet_id = sih.find('a', class_='tweet-timestamp')['href'] + + # Extract user name + author_account = re.search('^/(.+?)/', tweet_id).group(1) + + # Isolate tweet text container + ttc = result.find('div', class_='js-tweet-text-container') + + # extract iterator over tweet text contents + tt_iter = ttc.find('p', class_='tweet-text').children + + tweet_text = cleanup_tweet_text(tt_iter) + + # Check it the tweet is a retweet from somebody else + if author_account.lower() != twit_account.lower(): + tweet_text = 'RT from ' + author + ' @' + author_account + '\n\n' + tweet_text + + # Add footer with link to original tweet + tweet_text += '\n\nOriginal tweet : https://twitter.com' + tweet_id + + # Isolate attached media container + amoc = result.find('div', class_='AdaptiveMediaOuterContainer') + + photos = [] + if amoc: + # Extract photos + photo_conts = amoc.find_all('div', class_='AdaptiveMedia-photoContainer') + for p in photo_conts: + photos.append(p['data-image-url']) + + # Mention presence in videos in tweet + videos = amoc.find_all('div', class_='AdaptiveMedia-videoContainer') + if len(videos) != 0: + tweet_text += '\n\n[Embedded video in original tweet]' + + # Add dictionary with content of tweet to list + tweet = { + "author": author, + "author_account": author_account, + "author_logo_url": author_logo_url, + "timestamp": timestamp, + "tweet_id": tweet_id, + "tweet_text": tweet_text, + "photos": photos, + } + tweets.append(tweet) + + # DEBUG: Print extracted tweets + for t in tweets: + print(t) + + # ********************************************************** + # Iterate tweets. Check if the tweet has already been posted + # on Mastodon. If not, post it and add it to database + # ********************************************************** + + # Try to open database. If it does not exist, create it + sql = sqlite3.connect('twoot.db') + db = sql.cursor() + db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT, + mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''') + + # Create Mastodon application if it does not exist yet + if not os.path.isfile(mast_instance + '.secret'): + try: + Mastodon.create_app( + 'twoot', + api_base_url='https://' + mast_instance, + to_file=mast_instance + '.secret' + ) + + except MastodonError as me: + print('failed to create app on ' + mast_instance) + sys.exit(1) + + # Log in to Mastodon instance try: mastodon = Mastodon( - access_token=MAST_ACCOUNT + '.secret', - api_base_url='https://' + MAST_INSTANCE + client_id=mast_instance + '.secret', + api_base_url='https://' + mast_instance ) - if len(media_ids) == 0: - toot = mastodon.status_post(tweet['tweet_text'], visibility='public') - else: - toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public') + mastodon.log_in( + username=mast_account, + password=mast_password, + to_file=mast_account + ".secret" + ) except MastodonError as me: - print('ERROR: posting ' + tweet['tweet_text'] + ' to ' + MAST_INSTANCE + ' Failed') + print('ERROR: Login to ' + mast_instance + ' Failed') print(me) sys.exit(1) - # Insert toot id into database - if 'id' in toot: - db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )", - (TWIT_ACCOUNT, MAST_INSTANCE, MAST_ACCOUNT, tweet['tweet_id'], toot['id'])) - sql.commit() + # Upload tweets + for tweet in reversed(tweets): + # Check in database if tweet has already been posted + db.execute('''SELECT * FROM toots WHERE twitter_account = ? AND mastodon_instance = ? AND + mastodon_account = ? AND tweet_id = ?''', + (twit_account, mast_instance, mast_account, tweet['tweet_id'])) + tweet_in_db = db.fetchone() + + if tweet_in_db is not None: + # Skip to next tweet + continue + + # Check that the tweet is not too young (might be deleted) or too old + age_in_hours = (time.time() - float(tweet['timestamp'])) / 3600.0 + min_delay_in_hours = min_delay / 60.0 + max_age_in_hours = max_age * 24.0 + + if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours: + # Skip to next tweet + continue + + # Upload photos + media_ids = [] + for photo in tweet['photos']: + # Download picture + media = requests.get(photo) + + # Upload picture to Mastodon instance + media_posted = mastodon.media_post(media.content, mime_type=media.headers.get('content-type')) + media_ids.append(media_posted['id']) + + # Post toot + try: + mastodon = Mastodon( + access_token=mast_account + '.secret', + api_base_url='https://' + mast_instance + ) + + if len(media_ids) == 0: + toot = mastodon.status_post(tweet['tweet_text'], visibility='public') + else: + toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public') + + except MastodonError as me: + print('ERROR: posting ' + tweet['tweet_text'] + ' to ' + mast_instance + ' Failed') + print(me) + sys.exit(1) + + # Insert toot id into database + if 'id' in toot: + db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )", + (twit_account, mast_instance, mast_account, tweet['tweet_id'], toot['id'])) + sql.commit() + + +if __name__ == "__main__": + main(sys.argv)