twoot/twoot.py

1044 lines
40 KiB
Python
Raw Normal View History

2019-07-31 20:42:38 +00:00
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
2022-08-22 12:27:18 +00:00
Copyright (C) 2019-2022 Jean-Christophe Francois
2019-07-31 20:42:38 +00:00
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
2019-07-31 20:42:38 +00:00
2019-08-01 12:58:41 +00:00
import argparse
2022-12-11 10:15:50 +00:00
from datetime import datetime, timedelta
2022-11-17 19:18:42 +00:00
import logging
2019-07-31 20:42:38 +00:00
import os
2022-12-11 10:15:50 +00:00
import shutil
2019-08-01 10:31:26 +00:00
import random
2022-11-17 19:18:42 +00:00
import re
import shutil
2019-07-31 20:42:38 +00:00
import sqlite3
2022-11-17 19:18:42 +00:00
import sys
2020-12-19 08:21:39 +00:00
import time
from pathlib import Path
2023-01-03 10:00:41 +00:00
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse, urljoin
2022-11-17 19:18:42 +00:00
import requests
from bs4 import BeautifulSoup, element
from mastodon import Mastodon, MastodonError, MastodonAPIError, MastodonIllegalArgumentError
2020-03-25 16:40:07 +00:00
2022-09-15 17:58:17 +00:00
# Number of records to keep in db table for each twitter account
MAX_REC_COUNT = 50
2022-11-06 10:50:08 +00:00
# How many seconds to wait before giving up on a download (except video download)
HTTPS_REQ_TIMEOUT = 10
NITTER_URLS = [
2022-11-18 12:04:30 +00:00
'https://nitter.lacontrevoie.fr',
2022-12-11 10:15:50 +00:00
'https://nitter.privacydev.net',
2021-06-01 09:05:33 +00:00
'https://nitter.fdn.fr',
'https://nitter.namazso.eu',
2022-12-11 10:15:50 +00:00
'https://twitter.beparanoid.de',
'https://n.l5.ca',
2022-12-11 10:15:50 +00:00
# 'https://nitter.hu',
2022-11-17 19:18:42 +00:00
]
2019-09-17 13:44:03 +00:00
# Update from https://www.whatismybrowser.com/guides/the-latest-user-agent/
2019-08-01 10:31:26 +00:00
USER_AGENTS = [
2022-11-17 19:56:21 +00:00
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.42',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:107.0) Gecko/20100101 Firefox/107.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Vivaldi/5.4.2753.51',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Vivaldi/5.4.2753.51',
2022-11-17 19:18:42 +00:00
]
2022-11-23 08:59:06 +00:00
2022-12-11 10:15:50 +00:00
def build_config(args):
"""
Receives the arguments passed on the command line
populates the TOML global dict with default values for all 'options' keys
if a config file is provided, load the keys from the config file
if no config file is provided, use command-line args
verify that a valid config is available (all keys in 'config' present)
:param args: list of command line arguments
"""
# Create global struct containing configuration
global TOML
# Default options
options = {
'upload_videos': False,
'post_reply_to': False,
'skip_retweets': False,
'remove_link_redirections': False,
'remove_trackers_from_urls': False,
'footer': '',
'remove_original_tweet_ref': False,
'tweet_max_age': float(1),
'tweet_delay': float(0),
'toot_cap': int(0),
'subst_twitter': [],
'subst_youtube': [],
'subst_reddit': [],
'log_level': "WARNING",
'log_days': 3,
}
# Create default config object
TOML = {'config': {},'options': options}
# Load config file if it was provided
toml_file = args['f']
if toml_file is not None:
try: # Included in python from version 3.11
import tomllib
except ModuleNotFoundError:
# for python < 3.11, tomli module must be installed
import tomli as tomllib
loaded_toml = None
# Load toml file
try:
with open(toml_file, 'rb') as config_file:
loaded_toml = tomllib.load(config_file)
except FileNotFoundError:
print('config file not found')
terminate(-1)
except tomllib.TOMLDecodeError:
print('Malformed config file')
terminate(-1)
TOML['config'] = loaded_toml['config']
for k in TOML['options'].keys():
try: # Go through all valid keys
TOML['options'][k] = loaded_toml['options'][k]
except KeyError: # Key was not found in file
pass
else:
# Override config parameters with command-line values provided
if args['t'] is not None:
TOML['config']['twitter_account'] = args['t']
if args['i'] is not None:
TOML['config']['mastodon_instance'] = args['i']
if args['m'] is not None:
TOML['config']['mastodon_user'] = args['m']
if args['v'] is True:
TOML['options']['upload_videos'] = args['v']
if args['r'] is True:
TOML['options']['post_reply_to'] = args['r']
if args['s'] is True:
TOML['options']['skip_retweets'] = args['s']
if args['l'] is True:
TOML['options']['remove_link_redirections'] = args['l']
if args['u'] is True:
TOML['options']['remove_trackers_from_urls'] = args['u']
if args['o'] is True:
TOML['options']['remove_original_tweet_ref'] = args['o']
if args['a'] is not None:
TOML['options']['tweet_max_age'] = float(args['a'])
if args['d'] is not None:
TOML['options']['tweet_delay'] = float(args['d'])
if args['c'] is not None:
TOML['options']['toot_cap'] = int(args['c'])
# Verify that we have a minimum config to run
if 'twitter_account' not in TOML['config'].keys() or TOML['config']['twitter_account'] == "":
print('CRITICAL: Missing Twitter account')
terminate(-1)
if 'mastodon_instance' not in TOML['config'].keys() or TOML['config']['mastodon_instance'] == "":
print('CRITICAL: Missing Mastodon instance')
terminate(-1)
if 'mastodon_user' not in TOML['config'].keys() or TOML['config']['mastodon_user'] == "":
print('CRITICAL: Missing Mastodon user')
terminate(-1)
2022-11-22 10:05:16 +00:00
def deredir_url(url):
"""
Given a URL, return the URL that the page really downloads from
:param url: url to be de-redirected
:return: direct url
"""
2022-12-11 10:15:50 +00:00
# Check if we need to do anyting
if TOML['options']['remove_link_redirections'] is False:
return url
2022-11-22 10:05:16 +00:00
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
# Update default headers with randomly selected user agent
headers.update(
{
'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS) - 1)],
}
)
ret = None
try:
# Download the page
2023-01-31 20:58:07 +00:00
ret = requests.head(url, headers=headers, allow_redirects=True, timeout=5)
2022-11-22 10:05:16 +00:00
except:
# If anything goes wrong keep the URL intact
return url
2022-11-22 10:08:29 +00:00
if ret.url != url:
logging.debug("Removed redirection from: " + url + " to: " + ret.url)
2022-11-22 10:05:16 +00:00
# Return the URL that the page was downloaded from
return ret.url
2022-11-17 19:18:42 +00:00
2022-11-22 10:38:49 +00:00
2022-11-19 12:12:41 +00:00
def _remove_trackers_query(query_str):
2022-11-17 19:18:42 +00:00
"""
private function
Given a query string from a URL, strip out the known trackers
:param query_str: query to be cleaned
:return: query cleaned
"""
# Avalaible URL tracking parameters :
# UTM tags by Google Ads, M$ Ads, ...
# tag by TikTok
# tags by Snapchat
# tags by Facebook
params_to_remove = {
"gclid", "_ga", "gclsrc", "dclid",
2023-01-31 21:13:30 +00:00
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content", "utm_cid",
"utm_reader", "utm_name", "utm_referrer", "utm_social", "utm_social-type", "utm_brand"
2022-11-17 19:18:42 +00:00
"mkt_tok",
"campaign_name", "ad_set_name", "campaign_id", "ad_set_id",
"fbclid", "campaign_name", "ad_set_name", "ad_set_id", "media", "interest_group_name", "ad_set_id"
"igshid",
"cvid", "oicd", "msclkid",
"soc_src", "soc_trk",
"_openstat", "yclid",
"xtor", "xtref", "adid",
}
2022-11-17 19:18:42 +00:00
query_to_clean = dict(parse_qsl(query_str, keep_blank_values=True))
2022-11-23 08:59:06 +00:00
query_cleaned = [(k, v) for k, v in query_to_clean.items() if k not in params_to_remove]
2022-11-17 19:18:42 +00:00
return urlencode(query_cleaned, doseq=True)
2022-11-19 12:12:41 +00:00
def _remove_trackers_fragment(fragment_str):
"""
private function
Given a fragment string from a URL, strip out the known trackers
:param query_str: fragment to be cleaned
:return: cleaned fragment
"""
params_to_remove = {
"Echobox",
}
2022-11-23 08:59:06 +00:00
if '=' in fragment_str:
fragment_str = fragment_str.split('&')
query_cleaned = [i for i in fragment_str if i.split('=')[0] not in params_to_remove]
fragment_str = '&'.join(query_cleaned)
2022-11-19 12:12:41 +00:00
return fragment_str
2022-12-11 10:15:50 +00:00
def substitute_source(orig_url):
"""
param orig_url: url to check for substitutes
:return: url with replaced domains
"""
parsed_url = urlparse(orig_url)
domain = parsed_url.netloc
logging.debug("Checking domain %s for substitution ", domain)
# Handle twitter
twitter_subst = TOML["options"]["subst_twitter"]
# Do not substitiute if subdomain is present (e.g. i.twitter.com)
if (domain == 'twitter.com' or domain == 'www.twitter.com') and twitter_subst != []:
domain = twitter_subst[random.randint(0, len(twitter_subst) - 1)]
logging.debug("Replaced twitter.com by " + domain)
# Handle youtube
youtube_subst = TOML["options"]["subst_youtube"]
# Do not substitiute if subdomain is present (e.g. i.youtube.com)
if (domain == 'youtube.com' or domain == 'wwww.youtube.com') and youtube_subst != []:
domain = youtube_subst[random.randint(0, len(youtube_subst) - 1)]
logging.debug("Replaced youtube.com by " + domain)
# Handle reddit
reddit_subst = TOML["options"]["subst_reddit"]
# Do not substitiute if subdomain is present (e.g. i.reddit.com)
if (domain == 'reddit.com' or domain == 'www.reddit.com') and reddit_subst != []:
domain = reddit_subst[random.randint(0, len(reddit_subst) - 1)]
logging.debug("Replaced reddit.com by " + domain)
dest_url = urlunparse([
parsed_url.scheme,
domain,
parsed_url.path,
parsed_url.params,
parsed_url.query,
parsed_url.fragment
])
return dest_url
def clean_url(orig_url):
2022-11-17 19:18:42 +00:00
"""
Given a URL, return it with the UTM parameters removed from query and fragment
:param dirty_url: url to be cleaned
:return: url cleaned
2022-11-18 10:55:06 +00:00
>>> clean_url('https://example.com/video/this-aerial-ropeway?utm_source=Twitter&utm_medium=video&utm_campaign=organic&utm_content=Nov13&a=aaa&b=1#mkt_tok=tik&mkt_tik=tok')
'https://example.com/video/this-aerial-ropeway?a=aaa&b=1#mkt_tik=tok'
2022-11-17 19:18:42 +00:00
"""
2022-12-11 10:15:50 +00:00
# Check if we have to do anything
if TOML['options']['remove_trackers_from_urls'] is False:
return orig_url
2022-11-17 19:18:42 +00:00
2023-01-30 10:11:15 +00:00
# Parse a URL into 6 components:
# <scheme>://<netloc>/<path>;<params>?<query>#<fragment>
2022-12-11 10:15:50 +00:00
url_parsed = urlparse(orig_url)
2022-11-17 19:18:42 +00:00
2023-01-30 10:11:15 +00:00
# Reassemble URL after removal of trackers
2022-12-11 10:15:50 +00:00
dest_url = urlunparse([
2022-11-17 19:18:42 +00:00
url_parsed.scheme,
url_parsed.netloc,
url_parsed.path,
url_parsed.params,
2022-11-19 12:12:41 +00:00
_remove_trackers_query(url_parsed.query),
_remove_trackers_fragment(url_parsed.fragment)
2022-11-17 19:18:42 +00:00
])
2022-12-11 10:15:50 +00:00
if dest_url != orig_url:
logging.debug('Cleaned URL from: ' + orig_url + ' to: ' + dest_url)
2022-11-17 19:18:42 +00:00
2022-12-11 10:15:50 +00:00
return dest_url
2022-11-18 11:57:44 +00:00
2019-07-31 20:42:38 +00:00
2022-12-11 10:15:50 +00:00
def process_media_body(tt_iter):
2020-12-18 10:45:43 +00:00
"""
Receives an iterator over all the elements contained in the tweet-text container.
2020-12-17 21:08:43 +00:00
Processes them to make them suitable for posting on Mastodon
:param tt_iter: iterator over the HTML elements in the text of the tweet
2020-12-17 21:08:43 +00:00
:return: cleaned up text of the tweet
2020-12-18 10:45:43 +00:00
"""
2022-12-11 10:15:50 +00:00
2019-07-31 20:42:38 +00:00
tweet_text = ''
# Iterate elements
for tag in tt_iter:
# If element is plain text, copy it verbatim
if isinstance(tag, element.NavigableString):
tweet_text += tag.string
# If it is an 'a' html tag
2020-12-17 21:08:43 +00:00
elif tag.name == 'a':
tag_text = tag.get_text()
2020-12-18 13:57:22 +00:00
if tag_text.startswith('@'):
2020-12-17 21:08:43 +00:00
# Only keep user name
tweet_text += tag_text
2020-12-18 13:57:22 +00:00
elif tag_text.startswith('#'):
2020-12-17 21:08:43 +00:00
# Only keep hashtag text
tweet_text += tag_text
else:
2022-11-22 10:05:16 +00:00
# This is a real link
2022-12-11 10:15:50 +00:00
url = deredir_url(tag.get('href'))
url = substitute_source(url)
url = clean_url(url)
tweet_text += url
2019-07-31 20:42:38 +00:00
else:
2020-12-17 09:15:46 +00:00
logging.warning("No handler for tag in twitter text: " + tag.prettify())
2019-07-31 20:42:38 +00:00
return tweet_text
def process_card(nitter_url, card_container):
2020-12-18 10:45:43 +00:00
"""
2020-12-17 21:59:21 +00:00
Extract image from card in case mastodon does not do it
:param card_container: soup of 'a' tag containing card markup
:return: list with url of image
2020-12-18 10:45:43 +00:00
"""
2020-12-17 21:59:21 +00:00
list = []
2020-12-18 20:32:26 +00:00
img = card_container.div.div.img
if img is not None:
image_url = nitter_url + img.get('src')
2020-12-18 20:32:26 +00:00
list.append(image_url)
logging.debug('Extracted image from card')
2020-12-17 21:59:21 +00:00
return list
2020-12-18 10:45:43 +00:00
2022-12-11 10:15:50 +00:00
def process_attachments(nitter_url, attachments_container, status_id, author_account):
2020-12-18 10:45:43 +00:00
"""
Extract images or video from attachments. Videos are downloaded on the file system.
:param nitter_url: url of nitter mirror
:param attachments_container: soup of 'div' tag containing attachments markup
2020-12-18 12:26:26 +00:00
:param twit_account: name of twitter account
2020-12-18 16:55:12 +00:00
:param status_id: id of tweet being processed
2020-12-18 12:26:26 +00:00
:param author_account: author of tweet with video attachment
2020-12-18 10:45:43 +00:00
:return: list with url of images
"""
# Collect url of images
pics = []
images = attachments_container.find_all('a', class_='still-image')
for image in images:
pics.append(nitter_url + image.get('href'))
2020-12-18 13:57:22 +00:00
2022-12-11 10:15:50 +00:00
logging.debug('collected ' + str(len(pics)) + ' image(s) from attachments')
2020-12-18 10:45:43 +00:00
2020-12-18 12:26:26 +00:00
# Download nitter video (converted animated GIF)
gif_class = attachments_container.find('video', class_='gif')
if gif_class is not None:
gif_video_file = nitter_url + gif_class.source.get('src')
2020-12-18 12:26:26 +00:00
2022-12-11 10:15:50 +00:00
video_path = os.path.join('output', TOML['config']['twitter_account'], status_id, author_account, status_id)
2020-12-18 16:55:12 +00:00
os.makedirs(video_path, exist_ok=True)
2020-12-18 13:28:17 +00:00
# Open directory for writing file
2020-12-18 20:06:05 +00:00
orig_dir = os.getcwd()
os.chdir(video_path)
2022-11-06 10:50:08 +00:00
with requests.get(gif_video_file, stream=True, timeout=HTTPS_REQ_TIMEOUT) as r:
try:
# Raise exception if response code is not 200
r.raise_for_status()
# Download chunks and write them to file
with open('gif_video.mp4', 'wb') as f:
for chunk in r.iter_content(chunk_size=16 * 1024):
f.write(chunk)
logging.debug('Downloaded video of GIF animation from attachments')
except: # Don't do anything if video can't be found or downloaded
logging.debug('Could not download video of GIF animation from attachments')
pass
2020-12-18 13:28:17 +00:00
# Close directory
2020-12-18 20:06:05 +00:00
os.chdir(orig_dir)
2020-12-18 13:28:17 +00:00
# Download twitter video
2020-12-18 20:06:05 +00:00
vid_in_tweet = False
2020-12-18 13:28:17 +00:00
vid_class = attachments_container.find('div', class_='video-container')
if vid_class is not None:
2022-12-11 10:15:50 +00:00
if TOML['options']['upload_videos']:
2022-11-03 21:10:23 +00:00
import youtube_dl
2023-01-03 10:00:41 +00:00
video_path = f"{author_account}/status/{status_id}"
video_file = urljoin('https://twitter.com', video_path)
2022-11-03 21:10:23 +00:00
ydl_opts = {
2022-12-11 10:15:50 +00:00
'outtmpl': "output/" + TOML['config']['twitter_account'] + "/" + status_id + "/%(id)s.%(ext)s",
2022-11-03 21:10:23 +00:00
'format': "best[width<=500]",
'socket_timeout': 60,
2022-11-06 10:24:57 +00:00
'quiet': True,
2022-11-03 21:10:23 +00:00
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([video_file])
except Exception as e:
2022-12-11 10:15:50 +00:00
logging.warning('Error downloading twitter video: ' + str(e))
2020-12-18 20:06:05 +00:00
vid_in_tweet = True
else:
logging.debug('downloaded twitter video from attachments')
2020-12-18 20:06:05 +00:00
return pics, vid_in_tweet
2020-12-18 10:45:43 +00:00
2020-02-15 14:39:01 +00:00
def contains_class(body_classes, some_class):
2020-12-18 10:45:43 +00:00
"""
2020-02-15 14:39:01 +00:00
:param body_classes: list of classes to search
:param some_class: class that we are interested in
:return: True if found, false otherwise
2020-12-18 10:45:43 +00:00
"""
2020-02-15 14:39:01 +00:00
found = False
for body_class in body_classes:
if body_class == some_class:
found = True
return found
2022-11-03 21:10:23 +00:00
2022-12-11 10:15:50 +00:00
def is_time_valid(timestamp):
ret = True
# Check that the tweet is not too young (might be deleted) or too old
age_in_hours = (time.time() - float(timestamp)) / 3600.0
2022-12-11 10:15:50 +00:00
min_delay_in_hours = TOML['options']['tweet_delay'] / 60.0
max_age_in_hours = TOML['options']['tweet_max_age'] * 24.0
if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours:
ret = False
return ret
2022-09-14 14:28:48 +00:00
2022-12-11 10:15:50 +00:00
def login(password):
"""
Login to Mastodon account and return mastodon object used to post content
:param password: Password associated to account. None if not provided
:return: mastodon object
"""
# Create Mastodon application if it does not exist yet
if not os.path.isfile(TOML['config']['mastodon_instance'] + '.secret'):
try:
Mastodon.create_app(
2022-12-11 10:15:50 +00:00
'feedtoot',
api_base_url='https://' + TOML['config']['mastodon_instance'],
to_file=TOML['config']['mastodon_instance'] + '.secret'
)
except MastodonError as me:
2022-12-11 10:15:50 +00:00
logging.fatal('failed to create app on ' + TOML['config']['mastodon_instance'])
logging.fatal(me)
2022-12-11 10:15:50 +00:00
terminate(-1)
2022-12-11 10:15:50 +00:00
mastodon = None
# Log in to Mastodon instance with password
if password is not None:
try:
mastodon = Mastodon(
client_id=TOML['config']['mastodon_instance'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
)
mastodon.log_in(
username=TOML['config']['mastodon_user'],
password=password,
to_file=TOML['config']['mastodon_user'] + ".secret"
)
logging.info('Logging in to ' + TOML['config']['mastodon_instance'])
except MastodonError as me:
logging.fatal('Login to ' + TOML['config']['mastodon_instance'] + ' Failed\n')
logging.fatal(me)
terminate(-1)
if os.path.isfile(TOML['config']['mastodon_user'] + '.secret'):
logging.warning('You successfully logged in using a password and an access token \
has been saved. The password can therefore be omitted from the \
command-line in future invocations')
else: # No password provided, login with token
# Using token in existing .secret file
if os.path.isfile(TOML['config']['mastodon_user'] + '.secret'):
try:
mastodon = Mastodon(
access_token=TOML['config']['mastodon_user'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
)
except MastodonError as me:
logging.fatal('Login to ' + TOML['config']['mastodon_instance'] + ' Failed\n')
logging.fatal(me)
terminate(-1)
else:
logging.fatal('No .secret file found. Password required to log in')
terminate(-1)
2022-09-14 14:28:48 +00:00
return mastodon
2020-12-16 18:43:17 +00:00
2022-12-11 10:15:50 +00:00
def terminate(exit_code):
"""
Cleanly stop execution with a message on execution duration
Remove log messages older that duration specified in config from log file
:param exit_code: return value to pass to shell when exiting
"""
logging.info('Run time : {t:2.1f} seconds.'.format(t=time.time() - START_TIME))
logging.info('_____________________________________________________________________________________')
# Close logger and log file
logging.shutdown()
# Remove older log messages
# Max allowed age of log message
max_delta = timedelta(TOML['options']['log_days'])
# Open log file
2022-12-11 11:01:32 +00:00
log_file_name = TOML['config']['twitter_account'].lower() + '.log'
new_log_file_name = TOML['config']['twitter_account'].lower() + '.log.new'
2022-12-11 10:15:50 +00:00
try:
log_file = open(log_file_name, 'r')
except FileNotFoundError:
# Nothing to do if there is no log file
exit(exit_code)
# Check each line
pos = log_file.tell()
2022-12-11 21:42:19 +00:00
while True:
2022-12-11 10:15:50 +00:00
line = log_file.readline()
2022-12-11 21:42:19 +00:00
# Check if we reached the end of the file
if not line:
exit(exit_code)
2022-12-11 10:15:50 +00:00
try:
# Extract date on log line
date = datetime.strptime(line[:10], '%Y-%m-%d')
except ValueError:
# date was not found on this line, try next one
continue
2022-12-11 21:42:19 +00:00
2022-12-11 10:15:50 +00:00
# Time difference between log message and now
log_delta = datetime.now() - date
# Only keep the number of days of the difference
log_delta = timedelta(days=log_delta.days)
if log_delta < max_delta:
2023-01-30 10:11:15 +00:00
logging.debug("Truncating log file")
2022-12-11 10:15:50 +00:00
# Reset file pointer to position before reading last line
log_file.seek(pos)
remainder = log_file.read()
output_file = open(new_log_file_name, 'w')
output_file.write(remainder)
output_file.close()
# replace log file by new one
shutil.move(new_log_file_name, log_file_name)
break # Exit while loop
# Update read pointer position
pos = log_file.tell()
exit(exit_code)
2019-08-01 12:58:41 +00:00
def main(argv):
2020-12-19 08:21:39 +00:00
# Start stopwatch
2022-12-11 10:15:50 +00:00
global START_TIME
START_TIME = time.time()
2019-08-01 12:58:41 +00:00
# Build parser for command line arguments
parser = argparse.ArgumentParser(description='toot tweets.')
2022-12-11 10:15:50 +00:00
parser.add_argument('-f', metavar='<.toml config file>', action='store')
parser.add_argument('-t', metavar='<twitter account>', action='store')
parser.add_argument('-i', metavar='<mastodon instance>', action='store')
parser.add_argument('-m', metavar='<mastodon account>', action='store')
parser.add_argument('-p', metavar='<mastodon password>', action='store')
parser.add_argument('-r', action='store_true', help='Also post replies to other tweets')
2022-11-13 21:17:43 +00:00
parser.add_argument('-s', action='store_true', help='Suppress retweets')
2022-11-23 08:59:06 +00:00
parser.add_argument('-l', action='store_true', help='Remove link redirection')
2022-11-18 10:55:06 +00:00
parser.add_argument('-u', action='store_true', help='Remove trackers from URLs')
parser.add_argument('-v', action='store_true', help='Ingest twitter videos and upload to Mastodon instance')
2022-12-11 10:15:50 +00:00
parser.add_argument('-o', action='store_true', help='Do not add reference to Original tweet')
parser.add_argument('-a', metavar='<max age (in days)>', action='store', type=float)
parser.add_argument('-d', metavar='<min delay (in mins)>', action='store', type=float)
parser.add_argument('-c', metavar='<max # of toots to post>', action='store', type=int)
2019-08-01 12:58:41 +00:00
# Parse command line
args = vars(parser.parse_args())
2022-12-11 10:15:50 +00:00
build_config(args)
2019-08-01 12:58:41 +00:00
mast_password = args['p']
2020-12-18 16:21:41 +00:00
2020-12-18 16:06:09 +00:00
# Setup logging to file
2021-06-01 13:49:11 +00:00
logging.basicConfig(
2022-12-11 11:01:32 +00:00
filename=TOML['config']['twitter_account'].lower() + '.log',
2021-06-01 14:12:05 +00:00
format='%(asctime)s %(levelname)-8s %(message)s',
2021-06-01 13:49:11 +00:00
datefmt='%Y-%m-%d %H:%M:%S',
)
2022-12-21 08:41:59 +00:00
# Set default level of logging
2022-12-11 10:15:50 +00:00
log_level = logging.WARNING
2022-12-21 08:41:59 +00:00
# log level as an uppercase string from config
ll_str = TOML['options']['log_level'].upper()
if ll_str == "DEBUG":
log_level = logging.DEBUG
elif ll_str == "INFO":
log_level = logging.INFO
elif ll_str == "WARNING":
log_level = logging.WARNING
elif ll_str == "ERROR":
log_level = logging.ERROR
elif ll_str == "CRITICAL":
log_level == logging.CRITICAL
elif ll_str == "OFF":
# Disable all logging
logging.disable(logging.CRITICAL)
else:
logging.error('Invalid log_level %s in config file. Using WARNING.', str(TOML['options']['log_level']))
# Set desired level of logging
2022-12-11 10:15:50 +00:00
logger = logging.getLogger()
logger.setLevel(log_level)
logging.info('Running with the following configuration:')
logging.info(' Config File : ' + str(args['f']))
logging.info(' twitter_account : ' + TOML['config']['twitter_account'])
logging.info(' mastodon_instance : ' + TOML['config']['mastodon_instance'])
logging.info(' mastodon_user : ' + TOML['config']['mastodon_user'])
logging.info(' upload_videos : ' + str(TOML['options']['upload_videos']))
logging.info(' post_reply_to : ' + str(TOML['options']['post_reply_to']))
logging.info(' skip_retweets : ' + str(TOML['options']['skip_retweets']))
logging.info(' remove_link_redirections : ' + str(TOML['options']['remove_link_redirections']))
logging.info(' remove_trackers_from_urls: ' + str(TOML['options']['remove_trackers_from_urls']))
logging.info(' footer : ' + TOML['options']['footer'])
logging.info(' remove_original_tweet_ref: ' + str(TOML['options']['remove_original_tweet_ref']))
logging.info(' tweet_max_age : ' + str(TOML['options']['tweet_max_age']))
logging.info(' tweet_delay : ' + str(TOML['options']['tweet_delay']))
logging.info(' toot_cap : ' + str(TOML['options']['toot_cap']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_twitter']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_youtube']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_reddit']))
logging.info(' log_level : ' + str(TOML['options']['log_level']))
logging.info(' log_days : ' + str(TOML['options']['log_days']))
# Try to open database. If it does not exist, create it
sql = sqlite3.connect('twoot.db')
db = sql.cursor()
db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT,
mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''')
2022-08-22 12:50:03 +00:00
db.execute('''CREATE INDEX IF NOT EXISTS main_index ON toots (twitter_account,
mastodon_instance, mastodon_account, tweet_id)''')
# Select random nitter instance to fetch updates from
2022-09-14 14:28:48 +00:00
nitter_url = NITTER_URLS[random.randint(0, len(NITTER_URLS) - 1)]
2019-08-01 12:58:41 +00:00
# **********************************************************
# Load twitter page of user. Process all tweets and generate
# list of dictionaries ready to be posted on Mastodon
# **********************************************************
# To store content of all tweets from this user
tweets = []
# Initiate session
session = requests.Session()
2019-08-01 12:58:41 +00:00
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
# Update default headers with randomly selected user agent
headers.update(
{
2022-11-17 19:18:42 +00:00
'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS) - 1)],
2020-12-18 16:55:12 +00:00
'Cookie': 'replaceTwitter=; replaceYouTube=; hlsPlayback=on; proxyVideos=',
2019-08-01 12:58:41 +00:00
}
)
2022-12-11 10:15:50 +00:00
url = nitter_url + '/' + TOML['config']['twitter_account']
2020-12-17 16:50:10 +00:00
# Use different page if we need to handle replies
2022-12-11 10:15:50 +00:00
if TOML['options']['post_reply_to']:
2020-12-17 16:50:10 +00:00
url += '/with_replies'
2022-11-22 10:05:16 +00:00
# Download twitter page of user
try:
2022-11-06 10:50:08 +00:00
twit_account_page = session.get(url, headers=headers, timeout=HTTPS_REQ_TIMEOUT)
except requests.exceptions.ConnectionError:
logging.fatal('Host did not respond when trying to download ' + url)
2022-12-11 10:15:50 +00:00
terminate(-1)
2022-11-02 17:38:23 +00:00
except requests.exceptions.Timeout:
logging.fatal(nitter_url + ' took too long to respond')
2022-12-11 10:15:50 +00:00
terminate(-1)
2020-02-13 17:01:45 +00:00
2020-02-15 14:39:01 +00:00
# Verify that download worked
if twit_account_page.status_code != 200:
2022-11-17 19:18:42 +00:00
logging.fatal('The Nitter page did not download correctly from ' + url + ' (' + str(
twit_account_page.status_code) + '). Aborting')
2022-12-11 10:15:50 +00:00
terminate(-1)
2020-02-15 14:39:01 +00:00
2022-12-11 10:15:50 +00:00
logging.debug('Nitter page downloaded successfully from ' + url)
2020-02-13 17:01:45 +00:00
# DEBUG: Save page to file
2022-12-11 10:15:50 +00:00
# of = open(toml['config']['twitter_account'] + '.html', 'w')
2022-09-14 14:28:48 +00:00
# of.write(twit_account_page.text)
# of.close()
2020-12-16 18:43:17 +00:00
2020-02-15 14:39:01 +00:00
# Make soup
soup = BeautifulSoup(twit_account_page.text, 'html.parser')
2019-08-01 12:58:41 +00:00
2022-12-11 11:01:32 +00:00
# Replace twitter_account with version with correct capitalization
2022-12-11 20:57:10 +00:00
# ta = soup.find('meta', property='og:title').get('content')
# ta_match = re.search(r'\(@(.+)\)', ta)
# if ta_match is not None:
# TOML['config']['twitter_account'] = ta_match.group(1)
2020-12-16 19:48:00 +00:00
2019-08-01 12:58:41 +00:00
# Extract twitter timeline
2020-12-16 19:55:26 +00:00
timeline = soup.find_all('div', class_='timeline-item')
2020-12-17 16:50:10 +00:00
logging.info('Processing ' + str(len(timeline)) + ' tweets found in timeline')
# **********************************************************
# Process each tweets and generate dictionary
# with data ready to be posted on Mastodon
# **********************************************************
2020-12-18 21:09:34 +00:00
out_date_cnt = 0
in_db_cnt = 0
for status in timeline:
# Extract tweet ID and status ID
2020-12-16 20:55:13 +00:00
tweet_id = status.find('a', class_='tweet-link').get('href').strip('#m')
status_id = tweet_id.split('/')[3]
2020-10-14 19:51:00 +00:00
logging.debug('processing tweet %s', tweet_id)
# Extract time stamp
time_string = status.find('span', class_='tweet-date').a.get('title')
2022-01-03 17:11:40 +00:00
try:
2022-12-11 10:15:50 +00:00
timestamp = datetime.strptime(time_string, '%d/%m/%Y, %H:%M:%S').timestamp()
2022-01-03 17:11:40 +00:00
except:
# Dec 21, 2021 · 12:00 PM UTC
2022-12-11 10:15:50 +00:00
timestamp = datetime.strptime(time_string, '%b %d, %Y · %I:%M %p %Z').timestamp()
# Check if time is within acceptable range
2022-12-11 10:15:50 +00:00
if not is_time_valid(timestamp):
2020-12-18 21:09:34 +00:00
out_date_cnt += 1
logging.debug("Tweet outside valid time range, skipping")
continue
2022-11-13 21:17:43 +00:00
# Check if retweets must be skipped
2022-12-11 10:15:50 +00:00
if TOML['options']['skip_retweets']:
2022-11-13 21:17:43 +00:00
# Check if this tweet is a retweet
2022-11-13 21:35:46 +00:00
if len(status.select("div.tweet-body > div > div.retweet-header")) != 0:
2022-11-13 21:17:43 +00:00
logging.debug("Retweet ignored per command-line configuration")
continue
# Check in database if tweet has already been posted
2022-11-17 19:18:42 +00:00
db.execute(
"SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?",
2022-12-11 10:15:50 +00:00
(TOML['config']['twitter_account'], TOML['config']['mastodon_instance'], TOML['config']['mastodon_user'], tweet_id))
tweet_in_db = db.fetchone()
if tweet_in_db is not None:
2020-12-18 21:09:34 +00:00
in_db_cnt += 1
2020-10-14 19:51:00 +00:00
logging.debug("Tweet %s already in database", tweet_id)
# Skip to next tweet
continue
2020-11-09 14:55:42 +00:00
else:
logging.debug('Tweet %s not found in database', tweet_id)
2020-10-14 19:51:00 +00:00
2020-02-15 14:39:01 +00:00
# extract author
author = status.find('a', class_='fullname').get('title')
2019-08-01 12:58:41 +00:00
# Extract user name
author_account = status.find('a', class_='username').get('title').lstrip('@')
2019-08-01 12:58:41 +00:00
2020-12-16 21:46:01 +00:00
# Extract URL of full status page (for video download)
full_status_url = 'https://twitter.com' + tweet_id
2020-12-17 21:59:21 +00:00
# Initialize containers
2020-12-17 16:56:12 +00:00
tweet_text = ''
2020-12-17 21:59:21 +00:00
photos = []
2020-12-17 16:56:12 +00:00
2020-12-17 17:59:02 +00:00
# Add prefix if the tweet is a reply-to
# Only consider item of class 'replying-to' that is a direct child
2022-09-14 14:28:48 +00:00
# of class 'tweet-body' in status. Others can be in a quoted tweet.
replying_to_class = status.select("div.tweet-body > div.replying-to")
2022-08-22 07:33:27 +00:00
if len(replying_to_class) != 0:
2022-08-22 07:30:52 +00:00
tweet_text += 'Replying to ' + replying_to_class[0].a.get_text() + '\n\n'
# Check it the tweet is a retweet from somebody else
2022-11-13 21:35:46 +00:00
if len(status.select("div.tweet-body > div > div.retweet-header")) != 0:
tweet_text = 'RT from ' + author + ' (@' + author_account + ')\n\n'
2020-02-14 06:58:39 +00:00
2019-08-01 12:58:41 +00:00
# extract iterator over tweet text contents
2020-12-16 21:46:01 +00:00
tt_iter = status.find('div', class_='tweet-content media-body').children
2019-08-01 12:58:41 +00:00
2020-12-17 21:59:21 +00:00
# Process text of tweet
2022-12-11 10:15:50 +00:00
tweet_text += process_media_body(tt_iter)
2020-12-17 20:44:32 +00:00
2020-12-17 21:59:21 +00:00
# Process quote: append link to tweet_text
2020-12-18 21:41:57 +00:00
quote_div = status.find('a', class_='quote-link')
2020-12-17 21:59:21 +00:00
if quote_div is not None:
2022-12-11 10:15:50 +00:00
tweet_text += substitute_source('\n\nhttps://twitter.com' + quote_div.get('href').strip('#m'))
2020-12-17 20:44:32 +00:00
2020-12-17 21:59:21 +00:00
# Process card : extract image if necessary
card_class = status.find('a', class_='card-container')
if card_class is not None:
photos.extend(process_card(nitter_url, card_class))
2020-12-17 20:44:32 +00:00
# Process attachment: capture image or .mp4 url or download twitter video
2020-12-18 13:57:22 +00:00
attachments_class = status.find('div', class_='attachments')
2020-12-18 10:45:43 +00:00
if attachments_class is not None:
2022-12-11 10:15:50 +00:00
pics, vid_in_tweet = process_attachments(nitter_url,
attachments_class,
status_id, author_account
)
2020-12-18 20:06:05 +00:00
photos.extend(pics)
if vid_in_tweet:
tweet_text += '\n\n[Video embedded in original tweet]'
2019-08-01 12:58:41 +00:00
2022-12-11 10:15:50 +00:00
# Add custom footer from config file
if TOML['options']['footer'] != '':
tweet_text += '\n\n' + TOML['options']['footer']
2019-08-01 12:58:41 +00:00
# Add footer with link to original tweet
2022-12-11 10:15:50 +00:00
if TOML['options']['remove_original_tweet_ref'] == False:
if TOML['options']['footer'] != '':
tweet_text += '\nOriginal tweet : ' + substitute_source(full_status_url)
else:
tweet_text += '\n\nOriginal tweet : ' + substitute_source(full_status_url)
2019-08-01 12:58:41 +00:00
# If no media was specifically added in the tweet, try to get the first picture
# with "twitter:image" meta tag in first linked page in tweet text
if not photos:
m = re.search(r"http[^ \n\xa0]*", tweet_text)
if m is not None:
link_url = m.group(0)
if link_url.endswith(".html"): # Only process a web page
try:
2022-11-06 10:50:08 +00:00
r = requests.get(link_url, timeout=HTTPS_REQ_TIMEOUT)
if r.status_code == 200:
# Matches the first instance of either twitter:image or twitter:image:src meta tag
match = re.search(r'<meta name="twitter:image(?:|:src)" content="(.+?)".*?>', r.text)
if match is not None:
url = match.group(1).replace('&amp;', '&') # Remove HTML-safe encoding from URL if any
photos.append(url)
# Give up if anything goes wrong
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ContentDecodingError,
requests.exceptions.TooManyRedirects,
requests.exceptions.MissingSchema):
pass
2020-12-19 09:53:11 +00:00
else:
logging.debug("downloaded twitter:image from linked page")
# Check if video was downloaded
video_file = None
2022-12-11 10:15:50 +00:00
video_path = Path('./output') / TOML['config']['twitter_account'] / status_id
if video_path.exists():
# list video files
video_file_list = list(video_path.glob('*.mp4'))
if len(video_file_list) != 0:
# Extract posix path of first video file in list
video_file = video_file_list[0].absolute().as_posix()
2019-08-01 12:58:41 +00:00
# Add dictionary with content of tweet to list
tweet = {
"author": author,
"author_account": author_account,
"timestamp": timestamp,
"tweet_id": tweet_id,
"tweet_text": tweet_text,
"video": video_file,
2019-08-01 12:58:41 +00:00
"photos": photos,
}
tweets.append(tweet)
logging.debug('Tweet %s added to list of toots to upload', tweet_id)
# Log summary stats
2020-12-18 21:09:34 +00:00
logging.info(str(out_date_cnt) + ' tweets outside of valid time range')
logging.info(str(in_db_cnt) + ' tweets already in database')
2020-10-14 19:51:00 +00:00
2022-09-15 17:58:17 +00:00
# Login to account on maston instance
mastodon = None
if len(tweets) != 0:
2022-12-11 10:15:50 +00:00
mastodon = login(mast_password)
2019-08-01 12:58:41 +00:00
# **********************************************************
# Iterate tweets in list.
# post each on Mastodon and record it in database
2019-08-01 12:58:41 +00:00
# **********************************************************
2020-12-18 21:09:34 +00:00
posted_cnt = 0
2019-08-01 12:58:41 +00:00
for tweet in reversed(tweets):
2021-06-01 09:54:08 +00:00
# Check if we have reached the cap on the number of toots to post
2022-12-11 10:15:50 +00:00
if TOML['options']['toot_cap'] != 0 and posted_cnt >= TOML['options']['toot_cap']:
logging.info('%d toots not posted due to configured cap', len(tweets) - TOML['options']['toot_cap'])
2021-06-01 09:54:08 +00:00
break
2020-10-14 19:51:00 +00:00
logging.debug('Uploading Tweet %s', tweet["tweet_id"])
2019-08-01 12:58:41 +00:00
media_ids = []
# Upload video if there is one
if tweet['video'] is not None:
try:
2020-12-19 09:59:23 +00:00
logging.debug("Uploading video to Mastodon")
media_posted = mastodon.media_post(tweet['video'])
media_ids.append(media_posted['id'])
2022-11-17 19:18:42 +00:00
except (MastodonAPIError, MastodonIllegalArgumentError,
TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
2020-11-09 14:55:42 +00:00
logging.debug("Uploading video failed")
pass
2019-08-01 12:58:41 +00:00
else: # Only upload pic if no video was uploaded
# Upload photos
for photo in tweet['photos']:
media = False
# Download picture
try:
2020-12-19 09:59:23 +00:00
logging.debug('downloading picture')
2022-11-06 10:50:08 +00:00
media = requests.get(photo, timeout=HTTPS_REQ_TIMEOUT)
except: # Picture cannot be downloaded for any reason
pass
# Upload picture to Mastodon instance
if media:
try:
2020-12-19 09:59:23 +00:00
logging.debug('uploading picture to Mastodon')
media_posted = mastodon.media_post(media.content, mime_type=media.headers['content-type'])
media_ids.append(media_posted['id'])
2022-11-17 19:18:42 +00:00
except (MastodonAPIError, MastodonIllegalArgumentError,
TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
pass
2019-08-01 12:58:41 +00:00
# Post toot
2022-10-08 08:25:04 +00:00
toot = {}
2019-08-01 12:58:41 +00:00
try:
mastodon = Mastodon(
2022-12-11 10:15:50 +00:00
access_token=TOML['config']['mastodon_user'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
2019-08-01 12:58:41 +00:00
)
if len(media_ids) == 0:
toot = mastodon.status_post(tweet['tweet_text'])
2019-08-01 12:58:41 +00:00
else:
toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids)
2019-08-01 12:58:41 +00:00
except MastodonError as me:
2022-12-11 10:15:50 +00:00
logging.error('posting ' + tweet['tweet_text'] + ' to ' + TOML['config']['mastodon_instance'] + ' Failed')
2020-10-14 19:51:00 +00:00
logging.error(me)
2019-08-01 12:58:41 +00:00
2020-12-18 21:09:34 +00:00
else:
posted_cnt += 1
2022-12-11 10:15:50 +00:00
logging.debug('Tweet %s posted on %s', tweet['tweet_id'], TOML['config']['mastodon_user'])
2020-10-14 19:51:00 +00:00
2019-08-01 12:58:41 +00:00
# Insert toot id into database
if 'id' in toot:
db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )",
2022-12-11 10:15:50 +00:00
(TOML['config']['twitter_account'], TOML['config']['mastodon_instance'], TOML['config']['mastodon_user'], tweet['tweet_id'], toot['id']))
2019-08-01 12:58:41 +00:00
sql.commit()
2022-09-24 11:26:08 +00:00
logging.info(str(posted_cnt) + ' tweets posted to Mastodon')
2020-12-18 21:09:34 +00:00
# Cleanup downloaded video files
try:
2022-12-11 10:15:50 +00:00
shutil.rmtree('./output/' + TOML['config']['twitter_account'])
except FileNotFoundError: # The directory does not exist
pass
2019-08-01 12:58:41 +00:00
2022-09-15 17:58:17 +00:00
# Evaluate excess records in database
excess_count = 0
2022-12-11 10:15:50 +00:00
db.execute('SELECT count(*) FROM toots WHERE twitter_account=?', (TOML['config']['twitter_account'],))
2022-09-15 17:58:17 +00:00
db_count = db.fetchone()
if db_count is not None:
excess_count = db_count[0] - MAX_REC_COUNT
# Delete excess records
if excess_count > 0:
db.execute('''
WITH excess AS (
SELECT tweet_id
FROM toots
WHERE twitter_account=?
2022-09-15 18:12:20 +00:00
ORDER BY toot_id ASC
2022-09-15 17:58:17 +00:00
LIMIT ?
)
DELETE from toots
2022-12-11 10:15:50 +00:00
WHERE tweet_id IN excess''', (TOML['config']['twitter_account'], excess_count))
2022-09-15 17:58:17 +00:00
sql.commit()
2022-09-15 18:12:20 +00:00
logging.info('Deleted ' + str(excess_count) + ' old records from database.')
2022-12-11 10:15:50 +00:00
terminate(0)
2019-08-01 12:58:41 +00:00
if __name__ == "__main__":
main(sys.argv)