#! /usr/bin/env python3 # -*- coding: utf-8 -*- """ Copyright (C) 2019-2022 Jean-Christophe Francois This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import sys import logging import argparse import os import random import requests from bs4 import BeautifulSoup, element import sqlite3 import datetime import time import re from pathlib import Path from mastodon import Mastodon, MastodonError, MastodonAPIError, MastodonIllegalArgumentError import subprocess import shutil # Number of records to keep in db table for each twitter account MAX_REC_COUNT = 50 # Set the desired verbosity of logging # One of logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL LOGGING_LEVEL = logging.DEBUG # How many seconds to wait before giving up on a download (except video download) HTTPS_REQ_TIMEOUT = 10 NITTER_URLS = [ 'https://nitter.42l.fr', 'https://nitter.pussthecat.org', 'https://nitter.fdn.fr', 'https://nitter.eu', 'https://nitter.namazso.eu', 'https://nitter.moomoo.me', 'https://n.ramle.be', ] # Update from https://www.whatismybrowser.com/guides/the-latest-user-agent/ USER_AGENTS = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 12_5_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Safari/605.1.15', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36 Vivaldi/5.4.2753.37', ] def process_media_body(tt_iter): """ Receives an iterator over all the elements contained in the tweet-text container. Processes them to make them suitable for posting on Mastodon :param tt_iter: iterator over the HTML elements in the text of the tweet :return: cleaned up text of the tweet """ tweet_text = '' # Iterate elements for tag in tt_iter: # If element is plain text, copy it verbatim if isinstance(tag, element.NavigableString): tweet_text += tag.string # If it is an 'a' html tag elif tag.name == 'a': tag_text = tag.get_text() if tag_text.startswith('@'): # Only keep user name tweet_text += tag_text elif tag_text.startswith('#'): # Only keep hashtag text tweet_text += tag_text else: # This is a real link, keep url tweet_text += tag.get('href') else: logging.warning("No handler for tag in twitter text: " + tag.prettify()) return tweet_text def process_card(nitter_url, card_container): """ Extract image from card in case mastodon does not do it :param card_container: soup of 'a' tag containing card markup :return: list with url of image """ list = [] img = card_container.div.div.img if img is not None: image_url = nitter_url + img.get('src') list.append(image_url) logging.debug('Extracted image from card') return list def process_attachments(nitter_url, attachments_container, get_vids, twit_account, status_id, author_account): """ Extract images or video from attachments. Videos are downloaded on the file system. :param nitter_url: url of nitter mirror :param attachments_container: soup of 'div' tag containing attachments markup :param get_vids: whether to download videos or not :param twit_account: name of twitter account :param status_id: id of tweet being processed :param author_account: author of tweet with video attachment :return: list with url of images """ # Collect url of images pics = [] images = attachments_container.find_all('a', class_='still-image') for image in images: pics.append(nitter_url + image.get('href')) logging.debug('collected ' + str(len(pics)) + ' images from attachments') # Download nitter video (converted animated GIF) gif_class = attachments_container.find('video', class_='gif') if gif_class is not None: gif_video_file = nitter_url + gif_class.source.get('src') video_path = os.path.join('output', twit_account, status_id, author_account, status_id) os.makedirs(video_path, exist_ok=True) # Open directory for writing file orig_dir = os.getcwd() os.chdir(video_path) with requests.get(gif_video_file, stream=True, timeout=HTTPS_REQ_TIMEOUT) as r: try: # Raise exception if response code is not 200 r.raise_for_status() # Download chunks and write them to file with open('gif_video.mp4', 'wb') as f: for chunk in r.iter_content(chunk_size=16 * 1024): f.write(chunk) logging.debug('Downloaded video of GIF animation from attachments') except: # Don't do anything if video can't be found or downloaded logging.debug('Could not download video of GIF animation from attachments') pass # Close directory os.chdir(orig_dir) # Download twitter video vid_in_tweet = False vid_class = attachments_container.find('div', class_='video-container') if vid_class is not None: if get_vids: import youtube_dl video_file = os.path.join('https://twitter.com', author_account, 'status', status_id) ydl_opts = { 'outtmpl': "output/" + twit_account + "/" + status_id + "/%(id)s.%(ext)s", 'format': "best[width<=500]", 'socket_timeout': 60, 'quiet': True, } with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: ydl.download([video_file]) except Exception as e: logging.warn('Error downloading twitter video: ' + str(e)) vid_in_tweet = True else: logging.debug('downloaded twitter video from attachments') return pics, vid_in_tweet def contains_class(body_classes, some_class): """ :param body_classes: list of classes to search :param some_class: class that we are interested in :return: True if found, false otherwise """ found = False for body_class in body_classes: if body_class == some_class: found = True return found def is_time_valid(timestamp, max_age, min_delay): ret = True # Check that the tweet is not too young (might be deleted) or too old age_in_hours = (time.time() - float(timestamp)) / 3600.0 min_delay_in_hours = min_delay / 60.0 max_age_in_hours = max_age * 24.0 if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours: ret = False return ret def login(instance, account, password): # Create Mastodon application if it does not exist yet if not os.path.isfile(instance + '.secret'): try: Mastodon.create_app( 'twoot', api_base_url='https://' + instance, to_file=instance + '.secret' ) except MastodonError as me: logging.fatal('failed to create app on ' + instance) logging.fatal(me) sys.exit(-1) # Log in to Mastodon instance try: mastodon = Mastodon( client_id=instance + '.secret', api_base_url='https://' + instance ) mastodon.log_in( username=account, password=password, to_file=account + ".secret" ) logging.info('Logging in to ' + instance) except MastodonError as me: logging.fatal('ERROR: Login to ' + instance + ' Failed') logging.fatal(me) sys.exit(-1) # Check ratelimit status logging.debug('Ratelimit allowed requests: ' + str(mastodon.ratelimit_limit)) logging.debug('Ratelimit remaining requests: ' + str(mastodon.ratelimit_remaining)) logging.debug('Ratelimit reset time: ' + time.asctime(time.localtime(mastodon.ratelimit_reset))) logging.debug('Ratelimit last call: ' + time.asctime(time.localtime(mastodon.ratelimit_lastcall))) return mastodon def main(argv): # Start stopwatch start_time = time.time() # Build parser for command line arguments parser = argparse.ArgumentParser(description='toot tweets.') parser.add_argument('-t', metavar='', action='store', required=True) parser.add_argument('-i', metavar='', action='store', required=True) parser.add_argument('-m', metavar='', action='store', required=True) parser.add_argument('-p', metavar='', action='store', required=True) parser.add_argument('-r', action='store_true', help='Also post replies to other tweets') parser.add_argument('-v', action='store_true', help='Ingest twitter videos and upload to Mastodon instance') parser.add_argument('-a', metavar='', action='store', type=float, default=1) parser.add_argument('-d', metavar='', action='store', type=float, default=0) parser.add_argument('-c', metavar='', action='store', type=int, default=0) # Parse command line args = vars(parser.parse_args()) twit_account = args['t'] mast_instance = args['i'] mast_account = args['m'] mast_password = args['p'] tweets_and_replies = args['r'] get_vids = args['v'] max_age = float(args['a']) min_delay = float(args['d']) cap = int(args['c']) # Remove previous log file # try: # os.remove(twit_account + '.log') #except FileNotFoundError: # pass # Setup logging to file logging.basicConfig( filename=twit_account + '.log', level=LOGGING_LEVEL, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) logging.info('Running with the following parameters:') logging.info(' -t ' + twit_account) logging.info(' -i ' + mast_instance) logging.info(' -m ' + mast_account) logging.info(' -r ' + str(tweets_and_replies)) logging.info(' -v ' + str(get_vids)) logging.info(' -a ' + str(max_age)) logging.info(' -d ' + str(min_delay)) logging.info(' -c ' + str(cap)) # Try to open database. If it does not exist, create it sql = sqlite3.connect('twoot.db') db = sql.cursor() db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT, mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''') db.execute('''CREATE INDEX IF NOT EXISTS main_index ON toots (twitter_account, mastodon_instance, mastodon_account, tweet_id)''') # Select random nitter instance to fetch updates from nitter_url = NITTER_URLS[random.randint(0, len(NITTER_URLS) - 1)] # ********************************************************** # Load twitter page of user. Process all tweets and generate # list of dictionaries ready to be posted on Mastodon # ********************************************************** # To store content of all tweets from this user tweets = [] # Initiate session session = requests.Session() # Get a copy of the default headers that requests would use headers = requests.utils.default_headers() # Update default headers with randomly selected user agent headers.update( { 'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)], 'Cookie': 'replaceTwitter=; replaceYouTube=; hlsPlayback=on; proxyVideos=', } ) url = nitter_url + '/' + twit_account # Use different page if we need to handle replies if tweets_and_replies: url += '/with_replies' # Download twitter page of user. try: twit_account_page = session.get(url, headers=headers, timeout=HTTPS_REQ_TIMEOUT) except requests.exceptions.ConnectionError: logging.fatal('Host did not respond when trying to download ' + url) exit(-1) except requests.exceptions.Timeout: logging.fatal(nitter_url + ' took too long to respond') exit(-1) # Verify that download worked if twit_account_page.status_code != 200: logging.fatal('The Nitter page did not download correctly from ' + url + ' (' + str(twit_account_page.status_code) + '). Aborting') exit(-1) logging.info('Nitter page downloaded successfully from ' + url) # DEBUG: Save page to file # of = open(twit_account + '.html', 'w') # of.write(twit_account_page.text) # of.close() # Make soup soup = BeautifulSoup(twit_account_page.text, 'html.parser') # Replace twit_account with version with correct capitalization ta = soup.find('meta', property='og:title').get('content') ta_match = re.search(r'\(@(.+)\)', ta) if ta_match is not None: twit_account = ta_match.group(1) # Extract twitter timeline timeline = soup.find_all('div', class_='timeline-item') logging.info('Processing ' + str(len(timeline)) + ' tweets found in timeline') # ********************************************************** # Process each tweets and generate dictionary # with data ready to be posted on Mastodon # ********************************************************** out_date_cnt = 0 in_db_cnt = 0 for status in timeline: # Extract tweet ID and status ID tweet_id = status.find('a', class_='tweet-link').get('href').strip('#m') status_id = tweet_id.split('/')[3] logging.debug('processing tweet %s', tweet_id) # Extract time stamp time_string = status.find('span', class_='tweet-date').a.get('title') try: timestamp = datetime.datetime.strptime(time_string, '%d/%m/%Y, %H:%M:%S').timestamp() except: # Dec 21, 2021 · 12:00 PM UTC timestamp = datetime.datetime.strptime(time_string, '%b %d, %Y · %I:%M %p %Z').timestamp() # Check if time is within acceptable range if not is_time_valid(timestamp, max_age, min_delay): out_date_cnt += 1 logging.debug("Tweet outside valid time range, skipping") continue # Check in database if tweet has already been posted db.execute("SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?", (twit_account, mast_instance, mast_account, tweet_id)) tweet_in_db = db.fetchone() if tweet_in_db is not None: in_db_cnt += 1 logging.debug("Tweet %s already in database", tweet_id) # Skip to next tweet continue else: logging.debug('Tweet %s not found in database', tweet_id) # extract author author = status.find('a', class_='fullname').get('title') # Extract user name author_account = status.find('a', class_='username').get('title').lstrip('@') # Extract URL of full status page (for video download) full_status_url = 'https://twitter.com' + tweet_id # Initialize containers tweet_text = '' photos = [] # Add prefix if the tweet is a reply-to # Only consider item of class 'replying-to' that is a direct child # of class 'tweet-body' in status. Others can be in a quoted tweet. replying_to_class = status.select("div.tweet-body > div.replying-to") if len(replying_to_class) != 0: tweet_text += 'Replying to ' + replying_to_class[0].a.get_text() + '\n\n' # Check it the tweet is a retweet from somebody else if author_account.lower() != twit_account.lower(): tweet_text = 'RT from ' + author + ' (@' + author_account + ')\n\n' # extract iterator over tweet text contents tt_iter = status.find('div', class_='tweet-content media-body').children # Process text of tweet tweet_text += process_media_body(tt_iter) # Process quote: append link to tweet_text quote_div = status.find('a', class_='quote-link') if quote_div is not None: tweet_text += '\n\nhttps://twitter.com' + quote_div.get('href').strip('#m') # Process card : extract image if necessary card_class = status.find('a', class_='card-container') if card_class is not None: photos.extend(process_card(nitter_url, card_class)) # Process attachment: capture image or .mp4 url or download twitter video attachments_class = status.find('div', class_='attachments') if attachments_class is not None: pics, vid_in_tweet = process_attachments(nitter_url, attachments_class, get_vids, twit_account, status_id, author_account) photos.extend(pics) if vid_in_tweet: tweet_text += '\n\n[Video embedded in original tweet]' # Add footer with link to original tweet tweet_text += '\n\nOriginal tweet : ' + full_status_url # If no media was specifically added in the tweet, try to get the first picture # with "twitter:image" meta tag in first linked page in tweet text if not photos: m = re.search(r"http[^ \n\xa0]*", tweet_text) if m is not None: link_url = m.group(0) if link_url.endswith(".html"): # Only process a web page try: r = requests.get(link_url, timeout=HTTPS_REQ_TIMEOUT) if r.status_code == 200: # Matches the first instance of either twitter:image or twitter:image:src meta tag match = re.search(r'', r.text) if match is not None: url = match.group(1).replace('&', '&') # Remove HTML-safe encoding from URL if any photos.append(url) # Give up if anything goes wrong except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.ContentDecodingError, requests.exceptions.TooManyRedirects, requests.exceptions.MissingSchema): pass else: logging.debug("downloaded twitter:image from linked page") # Check if video was downloaded video_file = None video_path = Path('./output') / twit_account / status_id if video_path.exists(): # list video files video_file_list = list(video_path.glob('*.mp4')) if len(video_file_list) != 0: # Extract posix path of first video file in list video_file = video_file_list[0].absolute().as_posix() # Add dictionary with content of tweet to list tweet = { "author": author, "author_account": author_account, "timestamp": timestamp, "tweet_id": tweet_id, "tweet_text": tweet_text, "video": video_file, "photos": photos, } tweets.append(tweet) logging.debug('Tweet %s added to list of toots to upload', tweet_id) # Log summary stats logging.info(str(out_date_cnt) + ' tweets outside of valid time range') logging.info(str(in_db_cnt) + ' tweets already in database') # DEBUG: Print extracted tweets # for t in tweets: # print(t) # Login to account on maston instance mastodon = None if len(tweets) != 0: mastodon = login(mast_instance, mast_account, mast_password) # ********************************************************** # Iterate tweets in list. # post each on Mastodon and record it in database # ********************************************************** posted_cnt = 0 for tweet in reversed(tweets): # Check if we have reached the cap on the number of toots to post if cap != 0 and posted_cnt >= cap: logging.info('%d toots not posted due to configured cap', len(tweets) - cap) break logging.debug('Uploading Tweet %s', tweet["tweet_id"]) media_ids = [] # Upload video if there is one if tweet['video'] is not None: try: logging.debug("Uploading video to Mastodon") media_posted = mastodon.media_post(tweet['video']) media_ids.append(media_posted['id']) except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.) logging.debug("Uploading video failed") pass else: # Only upload pic if no video was uploaded # Upload photos for photo in tweet['photos']: media = False # Download picture try: logging.debug('downloading picture') media = requests.get(photo, timeout=HTTPS_REQ_TIMEOUT) except: # Picture cannot be downloaded for any reason pass # Upload picture to Mastodon instance if media: try: logging.debug('uploading picture to Mastodon') media_posted = mastodon.media_post(media.content, mime_type=media.headers['content-type']) media_ids.append(media_posted['id']) except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.) pass # Post toot toot = {} try: mastodon = Mastodon( access_token=mast_account + '.secret', api_base_url='https://' + mast_instance ) if len(media_ids) == 0: toot = mastodon.status_post(tweet['tweet_text'], visibility='public') else: toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public') except MastodonError as me: logging.error('posting ' + tweet['tweet_text'] + ' to ' + mast_instance + ' Failed') logging.error(me) else: posted_cnt += 1 logging.debug('Tweet %s posted on %s', tweet['tweet_id'], mast_account) # Insert toot id into database if 'id' in toot: db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )", (twit_account, mast_instance, mast_account, tweet['tweet_id'], toot['id'])) sql.commit() logging.info(str(posted_cnt) + ' tweets posted to Mastodon') # Cleanup downloaded video files try: shutil.rmtree('./output/' + twit_account) except FileNotFoundError: # The directory does not exist pass # Evaluate excess records in database excess_count = 0 db.execute('SELECT count(*) FROM toots WHERE twitter_account=?', (twit_account,)) db_count = db.fetchone() if db_count is not None: excess_count = db_count[0] - MAX_REC_COUNT # Delete excess records if excess_count > 0: db.execute(''' WITH excess AS ( SELECT tweet_id FROM toots WHERE twitter_account=? ORDER BY toot_id ASC LIMIT ? ) DELETE from toots WHERE tweet_id IN excess''', (twit_account, excess_count)) sql.commit() logging.info('Deleted ' + str(excess_count) + ' old records from database.') logging.info('Run time : %2.1f seconds' % (time.time() - start_time)) logging.info('_____________________________________________________________________________________') if __name__ == "__main__": main(sys.argv)