Merged contribution from

mathdatech
This commit is contained in:
jeancf 2022-11-17 20:18:42 +01:00
parent ed38f6f4d5
commit 6a20c257e5
2 changed files with 73 additions and 20 deletions

2
.gitignore vendored
View File

@ -4,5 +4,7 @@ venv/
*.secret
*.sh
*.log
*.png
*.xcf
twoot.db
__pycache__

83
twoot.py Executable file → Normal file
View File

@ -18,21 +18,22 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import logging
import argparse
import datetime
import logging
import os
import random
import re
import shutil
import sqlite3
import sys
import time
from pathlib import Path
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
import requests
from bs4 import BeautifulSoup, element
import sqlite3
import datetime
import time
import re
from pathlib import Path
from mastodon import Mastodon, MastodonError, MastodonAPIError, MastodonIllegalArgumentError
import subprocess
import shutil
# Number of records to keep in db table for each twitter account
MAX_REC_COUNT = 50
@ -63,6 +64,52 @@ USER_AGENTS = [
]
def _remove_tracker_params(query_str):
"""
private function
Given a query string from a URL, strip out the known trackers
:param query_str: query to be cleaned
:return: query cleaned
"""
# Avalaible URL tracking parameters :
# UTM tags by Google Ads, M$ Ads, ...
# tag by TikTok
# tags by Snapchat
# tags by Facebook
params_to_remove = [
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"mkt_tok",
"campaign_name", "ad_set_name", "campaign_id", "ad_set_id",
"campaign_name", "ad_set_name", "ad_set_id", "media", "interest_group_name", "ad_set_id"
]
query_to_clean = dict(parse_qsl(query_str, keep_blank_values=True))
query_cleaned = [(k, v) for k, v in query_to_clean.items() if not k in params_to_remove]
return urlencode(query_cleaned, doseq=True)
def clean_url(dirty_url):
"""
Given a URL, return it with the UTM parameters removed from query and fragment
:param dirty_url: url to be cleaned
:return: url cleaned
>>> clean_url('https://exemple.com/video/this-aerial-ropeway?utm_source=Twitter&utm_medium=video&utm_campaign=organic&utm_content=Nov13&a=aaa&b=1#mkt_tok=tik&mkt_tik=tok')
'https://exemple.com/video/this-aerial-ropeway?a=aaa&b=1#mkt_tik=tok'
"""
url_parsed = urlparse(dirty_url)
cleaned_url = urlunparse([
url_parsed.scheme,
url_parsed.netloc,
url_parsed.path,
url_parsed.params,
_remove_tracker_params(url_parsed.query),
_remove_tracker_params(url_parsed.fragment)
])
return cleaned_url
def process_media_body(tt_iter):
"""
Receives an iterator over all the elements contained in the tweet-text container.
@ -88,7 +135,7 @@ def process_media_body(tt_iter):
tweet_text += tag_text
else:
# This is a real link, keep url
tweet_text += tag.get('href')
tweet_text += clean_url(tag.get('href'))
else:
logging.warning("No handler for tag in twitter text: " + tag.prettify())
@ -156,7 +203,6 @@ def process_attachments(nitter_url, attachments_container, get_vids, twit_accoun
logging.debug('Could not download video of GIF animation from attachments')
pass
# Close directory
os.chdir(orig_dir)
@ -362,7 +408,8 @@ def main(argv):
# Verify that download worked
if twit_account_page.status_code != 200:
logging.fatal('The Nitter page did not download correctly from ' + url + ' (' + str(twit_account_page.status_code) + '). Aborting')
logging.fatal('The Nitter page did not download correctly from ' + url + ' (' + str(
twit_account_page.status_code) + '). Aborting')
exit(-1)
logging.info('Nitter page downloaded successfully from ' + url)
@ -421,7 +468,8 @@ def main(argv):
continue
# Check in database if tweet has already been posted
db.execute("SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?",
db.execute(
"SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?",
(twit_account, mast_instance, mast_account, tweet_id))
tweet_in_db = db.fetchone()
@ -476,7 +524,8 @@ def main(argv):
# Process attachment: capture image or .mp4 url or download twitter video
attachments_class = status.find('div', class_='attachments')
if attachments_class is not None:
pics, vid_in_tweet = process_attachments(nitter_url, attachments_class, get_vids, twit_account, status_id, author_account)
pics, vid_in_tweet = process_attachments(nitter_url, attachments_class, get_vids, twit_account, status_id,
author_account)
photos.extend(pics)
if vid_in_tweet:
tweet_text += '\n\n[Video embedded in original tweet]'
@ -569,7 +618,8 @@ def main(argv):
logging.debug("Uploading video to Mastodon")
media_posted = mastodon.media_post(tweet['video'])
media_ids.append(media_posted['id'])
except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
except (MastodonAPIError, MastodonIllegalArgumentError,
TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
logging.debug("Uploading video failed")
pass
@ -590,7 +640,8 @@ def main(argv):
logging.debug('uploading picture to Mastodon')
media_posted = mastodon.media_post(media.content, mime_type=media.headers['content-type'])
media_ids.append(media_posted['id'])
except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
except (MastodonAPIError, MastodonIllegalArgumentError,
TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
pass
# Post toot