Merge branch 'susbt'

This commit is contained in:
jeancf 2022-12-11 11:15:50 +01:00
parent 8dbce09530
commit daaf15a6c3
4 changed files with 447 additions and 181 deletions

3
.gitignore vendored
View File

@ -7,4 +7,5 @@ venv/
*.png
*.xcf
twoot.db
__pycache__
*.toml
!default.toml

View File

@ -1,3 +1,5 @@
# Changelog
**23 NOV 2022** VERSION 2.5 Added command-line option (`-l`) to remove
redirection from links included in tweets. Obfuscated links are replaced
by the URL that the resource is directly downloaded from. Also improved

View File

@ -1,12 +1,20 @@
# Twoot
**Twoot is a python script that mirrors tweets from a twitter account to a Mastodon account.
It is simple to set-up on a local machine, configurable and feature-rich.**
Twoot is a python script that mirrors tweets from a twitter account to a Mastodon account.
It is simple to set-up on a local machine, configurable and feature-rich.
**UPDATE 23 NOV 2022** VERSION 2.5 Added command-line option (`-l`) to remove redirection
from links included in tweets. Obfuscated links are replaced by the URL that the resource
is directly downloaded from. Also improved tracker removal by cleaning URL fragments as well
(contrib: mathdatech, thanks!).
**UPDATE 11 DEC 2022** VERSION 3.0 brings some important changes and new features:
* Only potentially breaking change: **If you are using a version of python < 3.11 you need to install the `tomli` module**
* Twoot can be configured with a config file in [TOML](https://toml.io/) format. Check `default.toml` for details
* Domain susbtitution can be configured in the config file to replace links to Twitter, Youtube and
Reddit domains with alternatives (e.g. [Nitter](https://github.com/zedeus/nitter/wiki/Instances),
[Invidious](https://redirect.invidious.io/) and [teddit](https://teddit.net/) respectively)
* A footer line can be specified in the config file that gets added to all toots (with e.g. tags)
* Added option to not add reference to "Original tweet" at the bottom of toots
* A password must be provided with `-p` on the command-line for the first run only. After that it is no longer required.
* The verbosity of logging messages can be set in the config file with `log_level=`.
* Config file option `log_days =` specifies how long to keep log messages in file. Older messages are deleted.
> Previous updates can be found in CHANGELOG.
@ -26,48 +34,67 @@ is directly downloaded from. Also improved tracker removal by cleaning URL fragm
## Usage
```
twoot.py [-h] -t <twitter account> -i <mastodon instance> -m <mastodon account>
-p <mastodon password> [-r] [-s] [-u] [-v] [-a <max age in days)>]
[-d <min delay (in mins)>] [-c <max # of toots to post>]
```sh
twoot.py [-h] [-f <.toml config file>] [-t <twitter account>] [-i <mastodon instance>]
[-m <mastodon account>] [-p <mastodon password>] [-r] [-s] [-l] [-u] [-v] [-o]
[-a <max age in days)>] [-d <min delay (in mins>] [-c <max # of toots to post>]
```
## Arguments
Assuming that the Twitter handle is @SuperDuperBot and the Mastodon account
is @superduperbot@botsin.space
is sd@example.com on instance masto.space:
|Switch |Description | Example | Req |
|-------|--------------------------------------------------|--------------------|-----|
| -t | twitter account name without '@' | `SuperDuper` | Yes |
| -i | Mastodon instance domain name | `botsin.space` | Yes |
| -m | Mastodon username | `sd@example.com` | Yes |
| -p | Mastodon password | `my_Sup3r-S4f3*pw` | Yes |
| -v | upload videos to Mastodon | *N/A* | No |
| -r | Post reply-to tweets (ignored by default) | *N/A* | No |
| -s | Skip retweets (posted by default) | *N/A* | No |
| -l | Remove link redirection | *N/A* | No |
| -u | Remove trackers from URLs | *N/A* | No |
| -a | Max. age of tweet to post (in days) | `5` | No |
| -d | Min. age before posting new tweet (in minutes) | `15` | No |
| -c | Max number of toots allowed to post (cap) | `1` | No |
|Switch |Description | Example | Required |
|-------|--------------------------------------------------|--------------------|--------------------|
| -f | path of `.toml` file with configuration | `SuperDuper.toml` | No |
| -t | twitter account name without '@' | `SuperDuper` | If no config file |
| -i | Mastodon instance domain name | `masto.space` | If no config file |
| -m | Mastodon username | `sd@example.com` | If no config file |
| -p | Mastodon password | `my_Sup3r-S4f3*pw` | Once at first run |
| -v | Upload videos to Mastodon | *N/A* | No |
| -o | Do not add "Original tweet" line | *N/A* | No |
| -r | Post reply-to tweets (ignored by default) | *N/A* | No |
| -s | Skip retweets (posted by default) | *N/A* | No |
| -l | Remove link redirections | *N/A* | No |
| -u | Remove trackers from URLs | *N/A* | No |
| -a | Max. age of tweet to post (in days) | `5` | No |
| -d | Min. age before posting new tweet (in minutes) | `15` | No |
| -c | Max number of toots allowed to post (cap) | `1` | No |
## Notes
### Password
A password must be provided for the first run only. Once twoot has connected successfully to the
Mastodon host, an access token is saved in a `.secret` file named after the mastodon account,
and a password is no longer necessary (command-line switch `-p` is not longer required).
### Config file
A `default.toml` file is provided to be used as template. If `-f` is used to specify a config file
to use, all the other command-line parameters are ignored, except `-p` (password) if provided.
### Removing redirected links
`-l` will follow every link included in the tweet and replace them with the url that the
resource is directly dowmnloaded from (if applicable). e.g. bit.ly/xxyyyzz -> example.com
Every link visit can take up to 5 sec (timeout) therefore this option will slow down
tweet processing.
If you are interested by tracker removal (`-u`) you should also select redirection removal(`-l`)
If you are interested by tracker removal (`-u`) you should also select redirection removal
as trackers are often hidden behind the redirection of a short URL.
### Uploading videos
When using the `-v` switch consider:
* whether the copyright of the content that you want to cross-post allows it
* the storage / transfer limitations of the Mastodon instance that you are posting to
* the upstream bandwidth that you may consume on your internet connection
### Rate control
Default max age is 1 day. Decimal values are OK.
Default min delay is 0 minutes.
@ -78,7 +105,8 @@ No limitation is applied to the number of toots uploaded if `-c` is not specifie
Make sure python3 is installed.
Twoot depends on `beautifulsoup4` and `Mastodon.py` python modules.
Twoot depends on `beautifulsoup4` and `Mastodon.py` python modules. Additionally, if you are using
a version of python < 3.11 you also need to install the `tomli` module.
**Only If you plan to download videos** with the `-v` switch, are the additional dependencies required:
@ -96,17 +124,15 @@ Add command line to crontab. For example, to run every 15 minutes starting at mi
and process the tweets posted in the last 5 days but at least 15 minutes
ago:
```
1-59/15 * * * * /path/to/twoot.py -t SuperDuperBot -i botsin.space -m superduperbot -p my_Sup3r-S4f3*pw -a 5 -d 15
```crontab
1-59/15 * * * * /path/to/twoot.py -t SuperDuper -i masto.space -m sd@example.com -p my_Sup3r-S4f3*pw -a 5 -d 15
```
## Examples
Twoot is known to be used for the following feeds (older first):
* [@internetofshit@botsin.space](https://botsin.space/@internetofshit)
* [@hackaday@botsin.space](https://botsin.space/@hackaday)
* [@todayilearned@botsin.space](https://botsin.space/@todayilearned)
* [@todayilearned@botsin.space](https://noc.social/@todayilearned)
* [@moznews@noc.social](https://noc.social/@moznews)
* [@hackster_io@noc.social](https://noc.social/@hackster_io)
* [@cnxsoft@noc.social](https://noc.social/@cnxsoft)
@ -116,6 +142,6 @@ Twoot is known to be used for the following feeds (older first):
## Background
I started twoot when [tootbot](https://github.com/cquest/tootbot)
stopped working. Tootbot relied on RSS feeds from https://twitrss.me
that broke when Twitter refreshed their web UI in July 2019.
I started twoot when [tootbot](https://github.com/cquest/tootbot)stopped working.
Tootbot relied on RSS feeds from [https://twitrss.me](https://twitrss.me)that broke when Twitter
refreshed their web UI in July 2019.

523
twoot.py
View File

@ -19,9 +19,10 @@
"""
import argparse
import datetime
from datetime import datetime, timedelta
import logging
import os
import shutil
import random
import re
import shutil
@ -38,21 +39,17 @@ from mastodon import Mastodon, MastodonError, MastodonAPIError, MastodonIllegalA
# Number of records to keep in db table for each twitter account
MAX_REC_COUNT = 50
# Set the desired verbosity of logging
# One of logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL
LOGGING_LEVEL = logging.INFO
# How many seconds to wait before giving up on a download (except video download)
HTTPS_REQ_TIMEOUT = 10
NITTER_URLS = [
'https://nitter.lacontrevoie.fr',
'https://nitter.pussthecat.org',
'https://nitter.privacydev.net',
'https://nitter.fdn.fr',
'https://nitter.eu',
'https://nitter.namazso.eu',
'https://twitter.beparanoid.de',
'https://n.l5.ca',
'https://nitter.bus-hit.me',
# 'https://nitter.hu',
]
# Update from https://www.whatismybrowser.com/guides/the-latest-user-agent/
@ -67,12 +64,115 @@ USER_AGENTS = [
]
def build_config(args):
"""
Receives the arguments passed on the command line
populates the TOML global dict with default values for all 'options' keys
if a config file is provided, load the keys from the config file
if no config file is provided, use command-line args
verify that a valid config is available (all keys in 'config' present)
:param args: list of command line arguments
"""
# Create global struct containing configuration
global TOML
# Default options
options = {
'upload_videos': False,
'post_reply_to': False,
'skip_retweets': False,
'remove_link_redirections': False,
'remove_trackers_from_urls': False,
'footer': '',
'remove_original_tweet_ref': False,
'tweet_max_age': float(1),
'tweet_delay': float(0),
'toot_cap': int(0),
'subst_twitter': [],
'subst_youtube': [],
'subst_reddit': [],
'log_level': "WARNING",
'log_days': 3,
}
# Create default config object
TOML = {'config': {},'options': options}
# Load config file if it was provided
toml_file = args['f']
if toml_file is not None:
try: # Included in python from version 3.11
import tomllib
except ModuleNotFoundError:
# for python < 3.11, tomli module must be installed
import tomli as tomllib
loaded_toml = None
# Load toml file
try:
with open(toml_file, 'rb') as config_file:
loaded_toml = tomllib.load(config_file)
except FileNotFoundError:
print('config file not found')
terminate(-1)
except tomllib.TOMLDecodeError:
print('Malformed config file')
terminate(-1)
TOML['config'] = loaded_toml['config']
for k in TOML['options'].keys():
try: # Go through all valid keys
TOML['options'][k] = loaded_toml['options'][k]
except KeyError: # Key was not found in file
pass
else:
# Override config parameters with command-line values provided
if args['t'] is not None:
TOML['config']['twitter_account'] = args['t']
if args['i'] is not None:
TOML['config']['mastodon_instance'] = args['i']
if args['m'] is not None:
TOML['config']['mastodon_user'] = args['m']
if args['v'] is True:
TOML['options']['upload_videos'] = args['v']
if args['r'] is True:
TOML['options']['post_reply_to'] = args['r']
if args['s'] is True:
TOML['options']['skip_retweets'] = args['s']
if args['l'] is True:
TOML['options']['remove_link_redirections'] = args['l']
if args['u'] is True:
TOML['options']['remove_trackers_from_urls'] = args['u']
if args['o'] is True:
TOML['options']['remove_original_tweet_ref'] = args['o']
if args['a'] is not None:
TOML['options']['tweet_max_age'] = float(args['a'])
if args['d'] is not None:
TOML['options']['tweet_delay'] = float(args['d'])
if args['c'] is not None:
TOML['options']['toot_cap'] = int(args['c'])
# Verify that we have a minimum config to run
if 'twitter_account' not in TOML['config'].keys() or TOML['config']['twitter_account'] == "":
print('CRITICAL: Missing Twitter account')
terminate(-1)
if 'mastodon_instance' not in TOML['config'].keys() or TOML['config']['mastodon_instance'] == "":
print('CRITICAL: Missing Mastodon instance')
terminate(-1)
if 'mastodon_user' not in TOML['config'].keys() or TOML['config']['mastodon_user'] == "":
print('CRITICAL: Missing Mastodon user')
terminate(-1)
def deredir_url(url):
"""
Given a URL, return the URL that the page really downloads from
:param url: url to be de-redirected
:return: direct url
"""
# Check if we need to do anyting
if TOML['options']['remove_link_redirections'] is False:
return url
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
@ -87,7 +187,7 @@ def deredir_url(url):
ret = None
try:
# Download the page
ret = requests.get(url, headers=headers, timeout=5)
ret = requests.head(url, headers=headers, timeout=5)
except:
# If anything goes wrong keep the URL intact
return url
@ -135,7 +235,6 @@ def _remove_trackers_fragment(fragment_str):
:param query_str: fragment to be cleaned
:return: cleaned fragment
"""
params_to_remove = {
"Echobox",
}
@ -147,7 +246,50 @@ def _remove_trackers_fragment(fragment_str):
return fragment_str
def clean_url(dirty_url):
def substitute_source(orig_url):
"""
param orig_url: url to check for substitutes
:return: url with replaced domains
"""
parsed_url = urlparse(orig_url)
domain = parsed_url.netloc
logging.debug("Checking domain %s for substitution ", domain)
# Handle twitter
twitter_subst = TOML["options"]["subst_twitter"]
# Do not substitiute if subdomain is present (e.g. i.twitter.com)
if (domain == 'twitter.com' or domain == 'www.twitter.com') and twitter_subst != []:
domain = twitter_subst[random.randint(0, len(twitter_subst) - 1)]
logging.debug("Replaced twitter.com by " + domain)
# Handle youtube
youtube_subst = TOML["options"]["subst_youtube"]
# Do not substitiute if subdomain is present (e.g. i.youtube.com)
if (domain == 'youtube.com' or domain == 'wwww.youtube.com') and youtube_subst != []:
domain = youtube_subst[random.randint(0, len(youtube_subst) - 1)]
logging.debug("Replaced youtube.com by " + domain)
# Handle reddit
reddit_subst = TOML["options"]["subst_reddit"]
# Do not substitiute if subdomain is present (e.g. i.reddit.com)
if (domain == 'reddit.com' or domain == 'www.reddit.com') and reddit_subst != []:
domain = reddit_subst[random.randint(0, len(reddit_subst) - 1)]
logging.debug("Replaced reddit.com by " + domain)
dest_url = urlunparse([
parsed_url.scheme,
domain,
parsed_url.path,
parsed_url.params,
parsed_url.query,
parsed_url.fragment
])
return dest_url
def clean_url(orig_url):
"""
Given a URL, return it with the UTM parameters removed from query and fragment
:param dirty_url: url to be cleaned
@ -155,10 +297,13 @@ def clean_url(dirty_url):
>>> clean_url('https://example.com/video/this-aerial-ropeway?utm_source=Twitter&utm_medium=video&utm_campaign=organic&utm_content=Nov13&a=aaa&b=1#mkt_tok=tik&mkt_tik=tok')
'https://example.com/video/this-aerial-ropeway?a=aaa&b=1#mkt_tik=tok'
"""
# Check if we have to do anything
if TOML['options']['remove_trackers_from_urls'] is False:
return orig_url
url_parsed = urlparse(dirty_url)
url_parsed = urlparse(orig_url)
cleaned_url = urlunparse([
dest_url = urlunparse([
url_parsed.scheme,
url_parsed.netloc,
url_parsed.path,
@ -166,22 +311,20 @@ def clean_url(dirty_url):
_remove_trackers_query(url_parsed.query),
_remove_trackers_fragment(url_parsed.fragment)
])
if dest_url != orig_url:
logging.debug('Cleaned URL from: ' + orig_url + ' to: ' + dest_url)
if cleaned_url != dirty_url:
logging.debug('Cleaned URL from: ' + dirty_url + ' to: ' + cleaned_url)
return cleaned_url
return dest_url
def process_media_body(tt_iter, remove_redir, remove_trackers):
def process_media_body(tt_iter):
"""
Receives an iterator over all the elements contained in the tweet-text container.
Processes them to make them suitable for posting on Mastodon
:param tt_iter: iterator over the HTML elements in the text of the tweet
:param remove_redir: bool to indicate if redirections should be removed
:param remove_trackers: bool to indicate if trackers should be removed
:return: cleaned up text of the tweet
"""
tweet_text = ''
# Iterate elements
for tag in tt_iter:
@ -200,15 +343,11 @@ def process_media_body(tt_iter, remove_redir, remove_trackers):
tweet_text += tag_text
else:
# This is a real link
if remove_redir:
url = deredir_url(tag.get('href'))
else:
url = tag.get('href')
url = deredir_url(tag.get('href'))
url = substitute_source(url)
url = clean_url(url)
if remove_trackers:
tweet_text += clean_url(url)
else:
tweet_text += url
tweet_text += url
else:
logging.warning("No handler for tag in twitter text: " + tag.prettify())
@ -232,12 +371,11 @@ def process_card(nitter_url, card_container):
return list
def process_attachments(nitter_url, attachments_container, get_vids, twit_account, status_id, author_account):
def process_attachments(nitter_url, attachments_container, status_id, author_account):
"""
Extract images or video from attachments. Videos are downloaded on the file system.
:param nitter_url: url of nitter mirror
:param attachments_container: soup of 'div' tag containing attachments markup
:param get_vids: whether to download videos or not
:param twit_account: name of twitter account
:param status_id: id of tweet being processed
:param author_account: author of tweet with video attachment
@ -249,14 +387,14 @@ def process_attachments(nitter_url, attachments_container, get_vids, twit_accoun
for image in images:
pics.append(nitter_url + image.get('href'))
logging.debug('collected ' + str(len(pics)) + ' images from attachments')
logging.debug('collected ' + str(len(pics)) + ' image(s) from attachments')
# Download nitter video (converted animated GIF)
gif_class = attachments_container.find('video', class_='gif')
if gif_class is not None:
gif_video_file = nitter_url + gif_class.source.get('src')
video_path = os.path.join('output', twit_account, status_id, author_account, status_id)
video_path = os.path.join('output', TOML['config']['twitter_account'], status_id, author_account, status_id)
os.makedirs(video_path, exist_ok=True)
# Open directory for writing file
@ -283,12 +421,12 @@ def process_attachments(nitter_url, attachments_container, get_vids, twit_accoun
vid_in_tweet = False
vid_class = attachments_container.find('div', class_='video-container')
if vid_class is not None:
if get_vids:
if TOML['options']['upload_videos']:
import youtube_dl
video_file = os.path.join('https://twitter.com', author_account, 'status', status_id)
ydl_opts = {
'outtmpl': "output/" + twit_account + "/" + status_id + "/%(id)s.%(ext)s",
'outtmpl': "output/" + TOML['config']['twitter_account'] + "/" + status_id + "/%(id)s.%(ext)s",
'format': "best[width<=500]",
'socket_timeout': 60,
'quiet': True,
@ -298,7 +436,7 @@ def process_attachments(nitter_url, attachments_container, get_vids, twit_accoun
try:
ydl.download([video_file])
except Exception as e:
logging.warn('Error downloading twitter video: ' + str(e))
logging.warning('Error downloading twitter video: ' + str(e))
vid_in_tweet = True
else:
logging.debug('downloaded twitter video from attachments')
@ -320,12 +458,12 @@ def contains_class(body_classes, some_class):
return found
def is_time_valid(timestamp, max_age, min_delay):
def is_time_valid(timestamp):
ret = True
# Check that the tweet is not too young (might be deleted) or too old
age_in_hours = (time.time() - float(timestamp)) / 3600.0
min_delay_in_hours = min_delay / 60.0
max_age_in_hours = max_age * 24.0
min_delay_in_hours = TOML['options']['tweet_delay'] / 60.0
max_age_in_hours = TOML['options']['tweet_max_age'] * 24.0
if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours:
ret = False
@ -333,110 +471,205 @@ def is_time_valid(timestamp, max_age, min_delay):
return ret
def login(instance, account, password):
# Create Mastodon application if it does not exist yet
if not os.path.isfile(instance + '.secret'):
def login(password):
"""
Login to Mastodon account and return mastodon object used to post content
:param password: Password associated to account. None if not provided
:return: mastodon object
"""
# Create Mastodon application if it does not exist yet
if not os.path.isfile(TOML['config']['mastodon_instance'] + '.secret'):
try:
Mastodon.create_app(
'twoot',
api_base_url='https://' + instance,
to_file=instance + '.secret'
'feedtoot',
api_base_url='https://' + TOML['config']['mastodon_instance'],
to_file=TOML['config']['mastodon_instance'] + '.secret'
)
except MastodonError as me:
logging.fatal('failed to create app on ' + instance)
logging.fatal('failed to create app on ' + TOML['config']['mastodon_instance'])
logging.fatal(me)
sys.exit(-1)
terminate(-1)
# Log in to Mastodon instance
try:
mastodon = Mastodon(
client_id=instance + '.secret',
api_base_url='https://' + instance
)
mastodon = None
mastodon.log_in(
username=account,
password=password,
to_file=account + ".secret"
)
logging.info('Logging in to ' + instance)
# Log in to Mastodon instance with password
if password is not None:
try:
mastodon = Mastodon(
client_id=TOML['config']['mastodon_instance'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
)
except MastodonError as me:
logging.fatal('ERROR: Login to ' + instance + ' Failed')
logging.fatal(me)
sys.exit(-1)
mastodon.log_in(
username=TOML['config']['mastodon_user'],
password=password,
to_file=TOML['config']['mastodon_user'] + ".secret"
)
logging.info('Logging in to ' + TOML['config']['mastodon_instance'])
# Check ratelimit status
logging.debug('Ratelimit allowed requests: ' + str(mastodon.ratelimit_limit))
logging.debug('Ratelimit remaining requests: ' + str(mastodon.ratelimit_remaining))
logging.debug('Ratelimit reset time: ' + time.asctime(time.localtime(mastodon.ratelimit_reset)))
logging.debug('Ratelimit last call: ' + time.asctime(time.localtime(mastodon.ratelimit_lastcall)))
except MastodonError as me:
logging.fatal('Login to ' + TOML['config']['mastodon_instance'] + ' Failed\n')
logging.fatal(me)
terminate(-1)
if os.path.isfile(TOML['config']['mastodon_user'] + '.secret'):
logging.warning('You successfully logged in using a password and an access token \
has been saved. The password can therefore be omitted from the \
command-line in future invocations')
else: # No password provided, login with token
# Using token in existing .secret file
if os.path.isfile(TOML['config']['mastodon_user'] + '.secret'):
try:
mastodon = Mastodon(
access_token=TOML['config']['mastodon_user'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
)
except MastodonError as me:
logging.fatal('Login to ' + TOML['config']['mastodon_instance'] + ' Failed\n')
logging.fatal(me)
terminate(-1)
else:
logging.fatal('No .secret file found. Password required to log in')
terminate(-1)
return mastodon
def terminate(exit_code):
"""
Cleanly stop execution with a message on execution duration
Remove log messages older that duration specified in config from log file
:param exit_code: return value to pass to shell when exiting
"""
logging.info('Run time : {t:2.1f} seconds.'.format(t=time.time() - START_TIME))
logging.info('_____________________________________________________________________________________')
# Close logger and log file
logging.shutdown()
# Remove older log messages
# Max allowed age of log message
max_delta = timedelta(TOML['options']['log_days'])
# Open log file
log_file_name = TOML['config']['twitter_account'] + '.log'
new_log_file_name = TOML['config']['twitter_account'] + '.log.new'
try:
log_file = open(log_file_name, 'r')
except FileNotFoundError:
# Nothing to do if there is no log file
exit(exit_code)
# Check each line
pos = log_file.tell()
while log_file:
line = log_file.readline()
try:
# Extract date on log line
date = datetime.strptime(line[:10], '%Y-%m-%d')
except ValueError:
# date was not found on this line, try next one
continue
# Time difference between log message and now
log_delta = datetime.now() - date
# Only keep the number of days of the difference
log_delta = timedelta(days=log_delta.days)
if log_delta < max_delta:
# Reset file pointer to position before reading last line
log_file.seek(pos)
remainder = log_file.read()
output_file = open(new_log_file_name, 'w')
output_file.write(remainder)
output_file.close()
# replace log file by new one
shutil.move(new_log_file_name, log_file_name)
break # Exit while loop
# Update read pointer position
pos = log_file.tell()
exit(exit_code)
def main(argv):
# Start stopwatch
start_time = time.time()
global START_TIME
START_TIME = time.time()
# Build parser for command line arguments
parser = argparse.ArgumentParser(description='toot tweets.')
parser.add_argument('-t', metavar='<twitter account>', action='store', required=True)
parser.add_argument('-i', metavar='<mastodon instance>', action='store', required=True)
parser.add_argument('-m', metavar='<mastodon account>', action='store', required=True)
parser.add_argument('-p', metavar='<mastodon password>', action='store', required=True)
parser.add_argument('-f', metavar='<.toml config file>', action='store')
parser.add_argument('-t', metavar='<twitter account>', action='store')
parser.add_argument('-i', metavar='<mastodon instance>', action='store')
parser.add_argument('-m', metavar='<mastodon account>', action='store')
parser.add_argument('-p', metavar='<mastodon password>', action='store')
parser.add_argument('-r', action='store_true', help='Also post replies to other tweets')
parser.add_argument('-s', action='store_true', help='Suppress retweets')
parser.add_argument('-l', action='store_true', help='Remove link redirection')
parser.add_argument('-u', action='store_true', help='Remove trackers from URLs')
parser.add_argument('-v', action='store_true', help='Ingest twitter videos and upload to Mastodon instance')
parser.add_argument('-a', metavar='<max age (in days)>', action='store', type=float, default=1)
parser.add_argument('-d', metavar='<min delay (in mins)>', action='store', type=float, default=0)
parser.add_argument('-c', metavar='<max # of toots to post>', action='store', type=int, default=0)
parser.add_argument('-o', action='store_true', help='Do not add reference to Original tweet')
parser.add_argument('-a', metavar='<max age (in days)>', action='store', type=float)
parser.add_argument('-d', metavar='<min delay (in mins)>', action='store', type=float)
parser.add_argument('-c', metavar='<max # of toots to post>', action='store', type=int)
# Parse command line
args = vars(parser.parse_args())
twit_account = args['t']
mast_instance = args['i']
mast_account = args['m']
mast_password = args['p']
tweets_and_replies = args['r']
suppress_retweets = args['s']
remove_redir = args['l']
remove_trackers = args['u']
get_vids = args['v']
max_age = float(args['a'])
min_delay = float(args['d'])
cap = int(args['c'])
build_config(args)
# Remove previous log file
# try:
# os.remove(twit_account + '.log')
# except FileNotFoundError:
# pass
mast_password = args['p']
# Setup logging to file
logging.basicConfig(
filename=twit_account + '.log',
level=LOGGING_LEVEL,
filename=TOML['config']['twitter_account'] + '.log',
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logging.info('Running with the following parameters:')
logging.info(' -t ' + twit_account)
logging.info(' -i ' + mast_instance)
logging.info(' -m ' + mast_account)
logging.info(' -r ' + str(tweets_and_replies))
logging.info(' -s ' + str(suppress_retweets))
logging.info(' -l ' + str(remove_redir))
logging.info(' -u ' + str(remove_trackers))
logging.info(' -v ' + str(get_vids))
logging.info(' -a ' + str(max_age))
logging.info(' -d ' + str(min_delay))
logging.info(' -c ' + str(cap))
# Set level of logging
log_level = logging.WARNING
match TOML['options']['log_level'].upper():
case 'DEBUG':
log_level = logging.DEBUG
case 'INFO':
log_level = logging.INFO
case 'WARNING':
log_level = logging.WARNING
case 'ERROR':
log_level = logging.ERROR
case 'CRITICAL':
log_level = logging.CRITICAL
case 'OFF':
# Disable all logging
logging.disable(logging.CRITICAL)
case _:
logging.error('Invalid log_level %s in config file. Using WARNING.', str(TOML['options']['log_level']))
logger = logging.getLogger()
logger.setLevel(log_level)
logging.info('Running with the following configuration:')
logging.info(' Config File : ' + str(args['f']))
logging.info(' twitter_account : ' + TOML['config']['twitter_account'])
logging.info(' mastodon_instance : ' + TOML['config']['mastodon_instance'])
logging.info(' mastodon_user : ' + TOML['config']['mastodon_user'])
logging.info(' upload_videos : ' + str(TOML['options']['upload_videos']))
logging.info(' post_reply_to : ' + str(TOML['options']['post_reply_to']))
logging.info(' skip_retweets : ' + str(TOML['options']['skip_retweets']))
logging.info(' remove_link_redirections : ' + str(TOML['options']['remove_link_redirections']))
logging.info(' remove_trackers_from_urls: ' + str(TOML['options']['remove_trackers_from_urls']))
logging.info(' footer : ' + TOML['options']['footer'])
logging.info(' remove_original_tweet_ref: ' + str(TOML['options']['remove_original_tweet_ref']))
logging.info(' tweet_max_age : ' + str(TOML['options']['tweet_max_age']))
logging.info(' tweet_delay : ' + str(TOML['options']['tweet_delay']))
logging.info(' toot_cap : ' + str(TOML['options']['toot_cap']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_twitter']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_youtube']))
logging.info(' subst_twitter : ' + str(TOML['options']['subst_reddit']))
logging.info(' log_level : ' + str(TOML['options']['log_level']))
logging.info(' log_days : ' + str(TOML['options']['log_days']))
# Try to open database. If it does not exist, create it
sql = sqlite3.connect('twoot.db')
@ -470,9 +703,9 @@ def main(argv):
}
)
url = nitter_url + '/' + twit_account
url = nitter_url + '/' + TOML['config']['twitter_account']
# Use different page if we need to handle replies
if tweets_and_replies:
if TOML['options']['post_reply_to']:
url += '/with_replies'
# Download twitter page of user
@ -480,21 +713,21 @@ def main(argv):
twit_account_page = session.get(url, headers=headers, timeout=HTTPS_REQ_TIMEOUT)
except requests.exceptions.ConnectionError:
logging.fatal('Host did not respond when trying to download ' + url)
exit(-1)
terminate(-1)
except requests.exceptions.Timeout:
logging.fatal(nitter_url + ' took too long to respond')
exit(-1)
terminate(-1)
# Verify that download worked
if twit_account_page.status_code != 200:
logging.fatal('The Nitter page did not download correctly from ' + url + ' (' + str(
twit_account_page.status_code) + '). Aborting')
exit(-1)
terminate(-1)
logging.info('Nitter page downloaded successfully from ' + url)
logging.debug('Nitter page downloaded successfully from ' + url)
# DEBUG: Save page to file
# of = open(twit_account + '.html', 'w')
# of = open(toml['config']['twitter_account'] + '.html', 'w')
# of.write(twit_account_page.text)
# of.close()
@ -505,7 +738,7 @@ def main(argv):
ta = soup.find('meta', property='og:title').get('content')
ta_match = re.search(r'\(@(.+)\)', ta)
if ta_match is not None:
twit_account = ta_match.group(1)
TOML['config']['twitter_account'] = ta_match.group(1)
# Extract twitter timeline
timeline = soup.find_all('div', class_='timeline-item')
@ -528,19 +761,19 @@ def main(argv):
# Extract time stamp
time_string = status.find('span', class_='tweet-date').a.get('title')
try:
timestamp = datetime.datetime.strptime(time_string, '%d/%m/%Y, %H:%M:%S').timestamp()
timestamp = datetime.strptime(time_string, '%d/%m/%Y, %H:%M:%S').timestamp()
except:
# Dec 21, 2021 · 12:00 PM UTC
timestamp = datetime.datetime.strptime(time_string, '%b %d, %Y · %I:%M %p %Z').timestamp()
timestamp = datetime.strptime(time_string, '%b %d, %Y · %I:%M %p %Z').timestamp()
# Check if time is within acceptable range
if not is_time_valid(timestamp, max_age, min_delay):
if not is_time_valid(timestamp):
out_date_cnt += 1
logging.debug("Tweet outside valid time range, skipping")
continue
# Check if retweets must be skipped
if suppress_retweets:
if TOML['options']['skip_retweets']:
# Check if this tweet is a retweet
if len(status.select("div.tweet-body > div > div.retweet-header")) != 0:
logging.debug("Retweet ignored per command-line configuration")
@ -549,7 +782,7 @@ def main(argv):
# Check in database if tweet has already been posted
db.execute(
"SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?",
(twit_account, mast_instance, mast_account, tweet_id))
(TOML['config']['twitter_account'], TOML['config']['mastodon_instance'], TOML['config']['mastodon_user'], tweet_id))
tweet_in_db = db.fetchone()
if tweet_in_db is not None:
@ -588,12 +821,12 @@ def main(argv):
tt_iter = status.find('div', class_='tweet-content media-body').children
# Process text of tweet
tweet_text += process_media_body(tt_iter, remove_redir, remove_trackers)
tweet_text += process_media_body(tt_iter)
# Process quote: append link to tweet_text
quote_div = status.find('a', class_='quote-link')
if quote_div is not None:
tweet_text += '\n\nhttps://twitter.com' + quote_div.get('href').strip('#m')
tweet_text += substitute_source('\n\nhttps://twitter.com' + quote_div.get('href').strip('#m'))
# Process card : extract image if necessary
card_class = status.find('a', class_='card-container')
@ -603,14 +836,24 @@ def main(argv):
# Process attachment: capture image or .mp4 url or download twitter video
attachments_class = status.find('div', class_='attachments')
if attachments_class is not None:
pics, vid_in_tweet = process_attachments(nitter_url, attachments_class, get_vids, twit_account, status_id,
author_account)
pics, vid_in_tweet = process_attachments(nitter_url,
attachments_class,
status_id, author_account
)
photos.extend(pics)
if vid_in_tweet:
tweet_text += '\n\n[Video embedded in original tweet]'
# Add custom footer from config file
if TOML['options']['footer'] != '':
tweet_text += '\n\n' + TOML['options']['footer']
# Add footer with link to original tweet
tweet_text += '\n\nOriginal tweet : ' + full_status_url
if TOML['options']['remove_original_tweet_ref'] == False:
if TOML['options']['footer'] != '':
tweet_text += '\nOriginal tweet : ' + substitute_source(full_status_url)
else:
tweet_text += '\n\nOriginal tweet : ' + substitute_source(full_status_url)
# If no media was specifically added in the tweet, try to get the first picture
# with "twitter:image" meta tag in first linked page in tweet text
@ -640,7 +883,7 @@ def main(argv):
# Check if video was downloaded
video_file = None
video_path = Path('./output') / twit_account / status_id
video_path = Path('./output') / TOML['config']['twitter_account'] / status_id
if video_path.exists():
# list video files
video_file_list = list(video_path.glob('*.mp4'))
@ -666,14 +909,10 @@ def main(argv):
logging.info(str(out_date_cnt) + ' tweets outside of valid time range')
logging.info(str(in_db_cnt) + ' tweets already in database')
# DEBUG: Print extracted tweets
# for t in tweets:
# print(t)
# Login to account on maston instance
mastodon = None
if len(tweets) != 0:
mastodon = login(mast_instance, mast_account, mast_password)
mastodon = login(mast_password)
# **********************************************************
# Iterate tweets in list.
@ -683,8 +922,8 @@ def main(argv):
posted_cnt = 0
for tweet in reversed(tweets):
# Check if we have reached the cap on the number of toots to post
if cap != 0 and posted_cnt >= cap:
logging.info('%d toots not posted due to configured cap', len(tweets) - cap)
if TOML['options']['toot_cap'] != 0 and posted_cnt >= TOML['options']['toot_cap']:
logging.info('%d toots not posted due to configured cap', len(tweets) - TOML['options']['toot_cap'])
break
logging.debug('Uploading Tweet %s', tweet["tweet_id"])
@ -727,8 +966,8 @@ def main(argv):
toot = {}
try:
mastodon = Mastodon(
access_token=mast_account + '.secret',
api_base_url='https://' + mast_instance
access_token=TOML['config']['mastodon_user'] + '.secret',
api_base_url='https://' + TOML['config']['mastodon_instance']
)
if len(media_ids) == 0:
@ -737,31 +976,31 @@ def main(argv):
toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public')
except MastodonError as me:
logging.error('posting ' + tweet['tweet_text'] + ' to ' + mast_instance + ' Failed')
logging.error('posting ' + tweet['tweet_text'] + ' to ' + TOML['config']['mastodon_instance'] + ' Failed')
logging.error(me)
else:
posted_cnt += 1
logging.debug('Tweet %s posted on %s', tweet['tweet_id'], mast_account)
logging.debug('Tweet %s posted on %s', tweet['tweet_id'], TOML['config']['mastodon_user'])
# Insert toot id into database
if 'id' in toot:
db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )",
(twit_account, mast_instance, mast_account, tweet['tweet_id'], toot['id']))
(TOML['config']['twitter_account'], TOML['config']['mastodon_instance'], TOML['config']['mastodon_user'], tweet['tweet_id'], toot['id']))
sql.commit()
logging.info(str(posted_cnt) + ' tweets posted to Mastodon')
# Cleanup downloaded video files
try:
shutil.rmtree('./output/' + twit_account)
shutil.rmtree('./output/' + TOML['config']['twitter_account'])
except FileNotFoundError: # The directory does not exist
pass
# Evaluate excess records in database
excess_count = 0
db.execute('SELECT count(*) FROM toots WHERE twitter_account=?', (twit_account,))
db.execute('SELECT count(*) FROM toots WHERE twitter_account=?', (TOML['config']['twitter_account'],))
db_count = db.fetchone()
if db_count is not None:
excess_count = db_count[0] - MAX_REC_COUNT
@ -777,14 +1016,12 @@ def main(argv):
LIMIT ?
)
DELETE from toots
WHERE tweet_id IN excess''', (twit_account, excess_count))
WHERE tweet_id IN excess''', (TOML['config']['twitter_account'], excess_count))
sql.commit()
logging.info('Deleted ' + str(excess_count) + ' old records from database.')
logging.info('Run time : %2.1f seconds' % (time.time() - start_time))
logging.info('_____________________________________________________________________________________')
terminate(0)
if __name__ == "__main__":
main(sys.argv)