Implemented command line parsing

This commit is contained in:
JC Francois 2019-08-01 14:58:41 +02:00
parent 9b8b748b5a
commit 32f3eccc70

378
twoot.py Normal file → Executable file
View File

@ -19,6 +19,7 @@
'''
import sys
import argparse
import os
import random
import requests
@ -29,14 +30,6 @@ import re
from mastodon import Mastodon, MastodonError
#TODO manage command line
TWIT_ACCOUNT = 'hackaday'
MAST_ACCOUNT = 'twoot@noirextreme.com'
MAST_PASSWORD = 'AcX/ZK5Ml6fRVDFi'
MAST_INSTANCE = 'mastodon.host'
MAX_AGE = 5 # in days
MIN_DELAY = 0 # in minutes
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.87 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/68.0',
@ -50,6 +43,7 @@ USER_AGENTS = [
#TODO log to file
def cleanup_tweet_text(tt_iter):
'''
Receives an iterator over all the elements contained in the tweet-text container
@ -109,200 +103,224 @@ def cleanup_tweet_text(tt_iter):
return tweet_text
# **********************************************************
# Load twitter page of user. Process all tweets and generate
# list of dictionaries ready to be posted on Mastodon
# **********************************************************
# To store content of all tweets from this user
tweets = []
def main(argv):
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
# Build parser for command line arguments
parser = argparse.ArgumentParser(description='toot tweets.')
parser.add_argument('-t', metavar='<twitter account>', action='store', required=True)
parser.add_argument('-i', metavar='<mastodon instance>', action='store', required=True)
parser.add_argument('-m', metavar='<mastodon account>', action='store', required=True)
parser.add_argument('-p', metavar='<mastodon password>', action='store', required=True)
parser.add_argument('-a', metavar='<max age in days>', action='store', type=float, default=1)
parser.add_argument('-d', metavar='<min delay in mins>', action='store', type=float, default=0)
# Update default headers with randomly selected user agent
headers.update(
{
'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)],
}
)
# Parse command line
args = vars(parser.parse_args())
# Download twitter page of user
response = requests.get('https://twitter.com/' + TWIT_ACCOUNT, headers=headers)
twit_account = args['t']
mast_instance = args['i']
mast_account = args['m']
mast_password = args['p']
max_age = float(args['a'])
min_delay = float(args['d'])
# DEBUG: Save page to file
of = open('twitter.html', 'w')
of.write(response.text)
of.close()
# **********************************************************
# Load twitter page of user. Process all tweets and generate
# list of dictionaries ready to be posted on Mastodon
# **********************************************************
# To store content of all tweets from this user
tweets = []
# Verify that download worked
if response.status_code != 200:
print("Could not download twitter timeline. Aborting.")
exit(-1)
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
# Build tree of html elements for processing
soup = BeautifulSoup(response.text, 'html.parser')
# Extract twitter timeline
results = soup.find_all('div', class_='content')
for result in results:
# Isolate tweet header
sih = result.find('div', class_='stream-item-header')
# extract author
author = sih.find('strong', class_='fullname').get_text()
# Extract author's logo
author_logo_url = sih.find('img', class_='avatar')['src']
# Extract time stamp
timestamp = sih.find('a', class_='tweet-timestamp').find('span', class_='_timestamp')['data-time']
# Extract tweet id
tweet_id = sih.find('a', class_='tweet-timestamp')['href']
# Extract user name
author_account = re.search('^/(.+?)/', tweet_id).group(1)
# Isolate tweet text container
ttc = result.find('div', class_='js-tweet-text-container')
# extract iterator over tweet text contents
tt_iter = ttc.find('p', class_='tweet-text').children
tweet_text = cleanup_tweet_text(tt_iter)
# Check it the tweet is a retweet from somebody else
if author_account.lower() != TWIT_ACCOUNT.lower():
tweet_text = 'RT from ' + author + ' @' + author_account + '\n\n' + tweet_text
# Add footer with link to original tweet
tweet_text += '\n\nOriginal tweet : https://twitter.com' + tweet_id
# Isolate attached media container
amoc = result.find('div', class_='AdaptiveMediaOuterContainer')
photos = []
if amoc:
# Extract photos
photo_conts = amoc.find_all('div', class_='AdaptiveMedia-photoContainer')
for p in photo_conts:
photos.append(p['data-image-url'])
# Mention presence in videos in tweet
videos = amoc.find_all('div', class_='AdaptiveMedia-videoContainer')
if len(videos) != 0:
tweet_text += '\n\n[Embedded video in original tweet]'
# Add dictionary with content of tweet to list
tweet = {
"author": author,
"author_account": author_account,
"author_logo_url": author_logo_url,
"timestamp": timestamp,
"tweet_id": tweet_id,
"tweet_text": tweet_text,
"photos": photos,
}
tweets.append(tweet)
# DEBUG: Print extracted tweets
for t in tweets:
print(t)
# **********************************************************
# Iterate tweets. Check if the tweet has already been posted
# on Mastodon. If not, post it and add it to database
# **********************************************************
# Try to open database. If it does not exist, create it
sql = sqlite3.connect('twoot.db')
db = sql.cursor()
db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT,
mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''')
# Create Mastodon application if it does not exist yet
if not os.path.isfile(MAST_INSTANCE + '.secret'):
try:
Mastodon.create_app(
'twoot',
api_base_url='https://' + MAST_INSTANCE,
to_file=MAST_INSTANCE + '.secret'
)
except MastodonError as me:
print('failed to create app on ' + MAST_INSTANCE)
sys.exit(1)
# Log in to Mastodon instance
try:
mastodon = Mastodon(
client_id=MAST_INSTANCE + '.secret',
api_base_url='https://' + MAST_INSTANCE
# Update default headers with randomly selected user agent
headers.update(
{
'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)],
}
)
mastodon.log_in(
username=MAST_ACCOUNT,
password=MAST_PASSWORD,
to_file=MAST_ACCOUNT + ".secret"
)
# Download twitter page of user
response = requests.get('https://twitter.com/' + twit_account, headers=headers)
except MastodonError as me:
print('ERROR: Login to ' + MAST_INSTANCE + ' Failed')
print(me)
sys.exit(1)
# DEBUG: Save page to file
of = open('twitter.html', 'w')
of.write(response.text)
of.close()
# Upload tweets
for tweet in reversed(tweets):
# Check in database if tweet has already been posted
db.execute('''SELECT * FROM toots WHERE twitter_account = ? AND mastodon_instance = ? AND
mastodon_account = ? AND tweet_id = ?''',
(TWIT_ACCOUNT, MAST_INSTANCE, MAST_ACCOUNT, tweet['tweet_id']))
tweet_in_db = db.fetchone()
# Verify that download worked
if response.status_code != 200:
print("Could not download twitter timeline. Aborting.")
exit(-1)
if tweet_in_db is not None:
# Skip to next tweet
continue
# Build tree of html elements for processing
soup = BeautifulSoup(response.text, 'html.parser')
# Check that the tweet is not too young (might be deleted) or too old
age_in_hours = (time.time() - float(tweet['timestamp'])) / 3600.0
min_delay_in_hours = float(MIN_DELAY) / 60.0
max_age_in_hours = float(MAX_AGE) * 24.0
# Extract twitter timeline
results = soup.find_all('div', class_='content')
if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours:
# Skip to next tweet
continue
for result in results:
# Isolate tweet header
sih = result.find('div', class_='stream-item-header')
# Upload photos
media_ids = []
for photo in tweet['photos']:
# Download picture
media = requests.get(photo)
# extract author
author = sih.find('strong', class_='fullname').get_text()
# Upload picture to Mastodon instance
media_posted = mastodon.media_post(media.content, mime_type=media.headers.get('content-type'))
media_ids.append(media_posted['id'])
# Extract author's logo
author_logo_url = sih.find('img', class_='avatar')['src']
# Post toot
# Extract time stamp
timestamp = sih.find('a', class_='tweet-timestamp').find('span', class_='_timestamp')['data-time']
# Extract tweet id
tweet_id = sih.find('a', class_='tweet-timestamp')['href']
# Extract user name
author_account = re.search('^/(.+?)/', tweet_id).group(1)
# Isolate tweet text container
ttc = result.find('div', class_='js-tweet-text-container')
# extract iterator over tweet text contents
tt_iter = ttc.find('p', class_='tweet-text').children
tweet_text = cleanup_tweet_text(tt_iter)
# Check it the tweet is a retweet from somebody else
if author_account.lower() != twit_account.lower():
tweet_text = 'RT from ' + author + ' @' + author_account + '\n\n' + tweet_text
# Add footer with link to original tweet
tweet_text += '\n\nOriginal tweet : https://twitter.com' + tweet_id
# Isolate attached media container
amoc = result.find('div', class_='AdaptiveMediaOuterContainer')
photos = []
if amoc:
# Extract photos
photo_conts = amoc.find_all('div', class_='AdaptiveMedia-photoContainer')
for p in photo_conts:
photos.append(p['data-image-url'])
# Mention presence in videos in tweet
videos = amoc.find_all('div', class_='AdaptiveMedia-videoContainer')
if len(videos) != 0:
tweet_text += '\n\n[Embedded video in original tweet]'
# Add dictionary with content of tweet to list
tweet = {
"author": author,
"author_account": author_account,
"author_logo_url": author_logo_url,
"timestamp": timestamp,
"tweet_id": tweet_id,
"tweet_text": tweet_text,
"photos": photos,
}
tweets.append(tweet)
# DEBUG: Print extracted tweets
for t in tweets:
print(t)
# **********************************************************
# Iterate tweets. Check if the tweet has already been posted
# on Mastodon. If not, post it and add it to database
# **********************************************************
# Try to open database. If it does not exist, create it
sql = sqlite3.connect('twoot.db')
db = sql.cursor()
db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT,
mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''')
# Create Mastodon application if it does not exist yet
if not os.path.isfile(mast_instance + '.secret'):
try:
Mastodon.create_app(
'twoot',
api_base_url='https://' + mast_instance,
to_file=mast_instance + '.secret'
)
except MastodonError as me:
print('failed to create app on ' + mast_instance)
sys.exit(1)
# Log in to Mastodon instance
try:
mastodon = Mastodon(
access_token=MAST_ACCOUNT + '.secret',
api_base_url='https://' + MAST_INSTANCE
client_id=mast_instance + '.secret',
api_base_url='https://' + mast_instance
)
if len(media_ids) == 0:
toot = mastodon.status_post(tweet['tweet_text'], visibility='public')
else:
toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public')
mastodon.log_in(
username=mast_account,
password=mast_password,
to_file=mast_account + ".secret"
)
except MastodonError as me:
print('ERROR: posting ' + tweet['tweet_text'] + ' to ' + MAST_INSTANCE + ' Failed')
print('ERROR: Login to ' + mast_instance + ' Failed')
print(me)
sys.exit(1)
# Insert toot id into database
if 'id' in toot:
db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )",
(TWIT_ACCOUNT, MAST_INSTANCE, MAST_ACCOUNT, tweet['tweet_id'], toot['id']))
sql.commit()
# Upload tweets
for tweet in reversed(tweets):
# Check in database if tweet has already been posted
db.execute('''SELECT * FROM toots WHERE twitter_account = ? AND mastodon_instance = ? AND
mastodon_account = ? AND tweet_id = ?''',
(twit_account, mast_instance, mast_account, tweet['tweet_id']))
tweet_in_db = db.fetchone()
if tweet_in_db is not None:
# Skip to next tweet
continue
# Check that the tweet is not too young (might be deleted) or too old
age_in_hours = (time.time() - float(tweet['timestamp'])) / 3600.0
min_delay_in_hours = min_delay / 60.0
max_age_in_hours = max_age * 24.0
if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours:
# Skip to next tweet
continue
# Upload photos
media_ids = []
for photo in tweet['photos']:
# Download picture
media = requests.get(photo)
# Upload picture to Mastodon instance
media_posted = mastodon.media_post(media.content, mime_type=media.headers.get('content-type'))
media_ids.append(media_posted['id'])
# Post toot
try:
mastodon = Mastodon(
access_token=mast_account + '.secret',
api_base_url='https://' + mast_instance
)
if len(media_ids) == 0:
toot = mastodon.status_post(tweet['tweet_text'], visibility='public')
else:
toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public')
except MastodonError as me:
print('ERROR: posting ' + tweet['tweet_text'] + ' to ' + mast_instance + ' Failed')
print(me)
sys.exit(1)
# Insert toot id into database
if 'id' in toot:
db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )",
(twit_account, mast_instance, mast_account, tweet['tweet_id'], toot['id']))
sql.commit()
if __name__ == "__main__":
main(sys.argv)