twoot/twoot.py

470 lines
17 KiB
Python
Raw Normal View History

2019-07-31 20:42:38 +00:00
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
2019-07-31 20:42:38 +00:00
Copyright (C) 2019 Jean-Christophe Francois
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
2019-07-31 20:42:38 +00:00
import sys
2020-10-14 19:51:00 +00:00
import logging
2019-08-01 12:58:41 +00:00
import argparse
2019-07-31 20:42:38 +00:00
import os
2019-08-01 10:31:26 +00:00
import random
2019-07-31 20:42:38 +00:00
import requests
from bs4 import BeautifulSoup, element
import sqlite3
2020-02-15 14:39:01 +00:00
import datetime, time
2019-07-31 20:42:38 +00:00
import re
from pathlib import Path
from mastodon import Mastodon, MastodonError, MastodonAPIError, MastodonIllegalArgumentError
2020-03-29 11:41:49 +00:00
import subprocess
2020-03-26 19:50:59 +00:00
import shutil
2020-03-25 16:40:07 +00:00
2019-09-17 13:44:03 +00:00
# Update from https://www.whatismybrowser.com/guides/the-latest-user-agent/
2019-08-01 10:31:26 +00:00
USER_AGENTS = [
2020-12-16 17:47:27 +00:00
'Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.60',
2019-08-01 10:31:26 +00:00
]
2019-07-31 20:42:38 +00:00
2020-10-14 19:51:00 +00:00
# Setup logging to file
2020-12-16 18:43:17 +00:00
logging.basicConfig(filename="twoot.log", level=logging.INFO)
logging.info('*********** NEW RUN ***********')
2020-12-17 21:08:43 +00:00
def process_media_body(tt_iter):
2020-12-18 10:45:43 +00:00
"""
Receives an iterator over all the elements contained in the tweet-text container.
2020-12-17 21:08:43 +00:00
Processes them to make them suitable for posting on Mastodon
:param tt_iter: iterator over the HTML elements in the text of the tweet
2020-12-17 21:08:43 +00:00
:return: cleaned up text of the tweet
2020-12-18 10:45:43 +00:00
"""
2019-07-31 20:42:38 +00:00
tweet_text = ''
# Iterate elements
for tag in tt_iter:
# If element is plain text, copy it verbatim
if isinstance(tag, element.NavigableString):
tweet_text += tag.string
# If it is an 'a' html tag
2020-12-17 21:08:43 +00:00
elif tag.name == 'a':
tag_text = tag.get_text()
if tag_text.starts_with('@'):
# Only keep user name
tweet_text += tag_text
elif tag_text.starts_with('#'):
# Only keep hashtag text
tweet_text += tag_text
else:
# This is a real link, keep url
tweet_text += tag.get('href')
2019-07-31 20:42:38 +00:00
else:
2020-12-17 09:15:46 +00:00
logging.warning("No handler for tag in twitter text: " + tag.prettify())
2019-07-31 20:42:38 +00:00
return tweet_text
2020-12-17 21:59:21 +00:00
def process_card(card_container):
2020-12-18 10:45:43 +00:00
"""
2020-12-17 21:59:21 +00:00
Extract image from card in case mastodon does not do it
:param card_container: soup of 'a' tag containing card markup
:return: list with url of image
2020-12-18 10:45:43 +00:00
"""
2020-12-17 21:59:21 +00:00
list = []
link = card_container.get('href')
# Dailymotion
if link.contains('dailymotion.com'):
image_url = 'twitter.com' + card_container.div.div.img.get('src')
list.append(image_url)
return list
2020-12-18 10:45:43 +00:00
def process_attachments(attachments_container):
"""
Extract images or video from attachments. Videos are downloaded on the file system.
:param card_container: soup of 'div' tag containing attachments markup
:return: list with url of images
"""
# Collect url of images
pics = []
images = attachments_container.find_all('a', class_='still-image')
for image in images:
pics.append(image.get('href'))
# TODO Download nitter video (converted animated GIF)
# TODO Download twitter video
return pics
2020-02-15 14:39:01 +00:00
def contains_class(body_classes, some_class):
2020-12-18 10:45:43 +00:00
"""
2020-02-15 14:39:01 +00:00
:param body_classes: list of classes to search
:param some_class: class that we are interested in
:return: True if found, false otherwise
2020-12-18 10:45:43 +00:00
"""
2020-02-15 14:39:01 +00:00
found = False
for body_class in body_classes:
if body_class == some_class:
found = True
return found
def is_time_valid(timestamp, max_age, min_delay):
ret = True
# Check that the tweet is not too young (might be deleted) or too old
age_in_hours = (time.time() - float(timestamp)) / 3600.0
min_delay_in_hours = min_delay / 60.0
max_age_in_hours = max_age * 24.0
if age_in_hours < min_delay_in_hours or age_in_hours > max_age_in_hours:
ret = False
return ret
2020-12-16 18:43:17 +00:00
2019-08-01 12:58:41 +00:00
def main(argv):
# Build parser for command line arguments
parser = argparse.ArgumentParser(description='toot tweets.')
parser.add_argument('-t', metavar='<twitter account>', action='store', required=True)
parser.add_argument('-i', metavar='<mastodon instance>', action='store', required=True)
parser.add_argument('-m', metavar='<mastodon account>', action='store', required=True)
parser.add_argument('-p', metavar='<mastodon password>', action='store', required=True)
parser.add_argument('-r', action='store_true', help='Also post replies to other tweets')
parser.add_argument('-v', action='store_true', help='Ingest twitter videos and upload to Mastodon instance')
parser.add_argument('-a', metavar='<max age (in days)>', action='store', type=float, default=1)
parser.add_argument('-d', metavar='<min delay (in mins)>', action='store', type=float, default=0)
2019-08-01 12:58:41 +00:00
# Parse command line
args = vars(parser.parse_args())
twit_account = args['t']
mast_instance = args['i']
mast_account = args['m']
mast_password = args['p']
tweets_and_replies = args['r']
get_vids = args['v']
2019-08-01 12:58:41 +00:00
max_age = float(args['a'])
min_delay = float(args['d'])
2020-12-16 18:43:17 +00:00
logging.info('Updating ' + twit_account + ' on ' + mast_instance)
# Try to open database. If it does not exist, create it
sql = sqlite3.connect('twoot.db')
db = sql.cursor()
db.execute('''CREATE TABLE IF NOT EXISTS toots (twitter_account TEXT, mastodon_instance TEXT,
mastodon_account TEXT, tweet_id TEXT, toot_id TEXT)''')
2019-08-01 12:58:41 +00:00
# **********************************************************
# Load twitter page of user. Process all tweets and generate
# list of dictionaries ready to be posted on Mastodon
# **********************************************************
# To store content of all tweets from this user
tweets = []
# Initiate session
session = requests.Session()
2019-08-01 12:58:41 +00:00
# Get a copy of the default headers that requests would use
headers = requests.utils.default_headers()
# Update default headers with randomly selected user agent
headers.update(
{
'User-Agent': USER_AGENTS[random.randint(0, len(USER_AGENTS)-1)],
'Cookie': 'replaceTwitter=; replaceYouTube=',
2019-08-01 12:58:41 +00:00
}
)
2020-12-16 18:43:17 +00:00
url = 'https://nitter.net/' + twit_account
2020-12-17 16:50:10 +00:00
# Use different page if we need to handle replies
if tweets_and_replies:
url += '/with_replies'
2020-12-16 18:43:17 +00:00
# Download twitter page of user.
twit_account_page = session.get(url, headers=headers)
2020-02-13 17:01:45 +00:00
2020-02-15 14:39:01 +00:00
# Verify that download worked
assert twit_account_page.status_code == 200,\
2020-12-16 19:42:44 +00:00
'The nitter page did not download correctly. Aborting'
2020-02-15 14:39:01 +00:00
2020-12-16 18:43:17 +00:00
logging.info('Page downloaded successfully')
2020-02-13 17:01:45 +00:00
# DEBUG: Save page to file
2020-12-16 18:43:17 +00:00
of = open(twit_account + '.html', 'w')
of.write(twit_account_page.text)
of.close()
2020-02-15 14:39:01 +00:00
# Make soup
soup = BeautifulSoup(twit_account_page.text, 'html.parser')
2019-08-01 12:58:41 +00:00
# Replace twit_account with version with correct capitalization
2020-12-16 19:42:44 +00:00
ta = soup.find('meta', property='og:title').get('content')
2020-12-16 19:48:00 +00:00
ta_match = re.search('\(@(.+)\)', ta)
if ta_match is not None:
twit_account = ta_match.group(1)
2019-08-01 12:58:41 +00:00
# Extract twitter timeline
2020-12-16 19:55:26 +00:00
timeline = soup.find_all('div', class_='timeline-item')
2020-12-17 16:50:10 +00:00
logging.info('Processing ' + str(len(timeline)) + ' tweets found in timeline')
# **********************************************************
# Process each tweets and generate dictionary
# with data ready to be posted on Mastodon
# **********************************************************
for status in timeline:
# Extract tweet ID and status ID
2020-12-16 20:55:13 +00:00
tweet_id = status.find('a', class_='tweet-link').get('href').strip('#m')
status_id = tweet_id.split('/')[3]
2020-10-14 19:51:00 +00:00
logging.debug('processing tweet %s', tweet_id)
# Extract time stamp
time_string = status.find('span', class_='tweet-date').a.get('title')
timestamp = datetime.datetime.strptime(time_string, '%d/%m/%Y, %H:%M:%S').timestamp()
# Check if time is within acceptable range
if not is_time_valid(timestamp, max_age, min_delay):
logging.debug("Tweet outside valid time range, skipping")
continue
# Check in database if tweet has already been posted
2020-11-09 14:55:42 +00:00
db.execute("SELECT * FROM toots WHERE twitter_account=? AND mastodon_instance=? AND mastodon_account=? AND tweet_id=?",
(twit_account, mast_instance, mast_account, tweet_id))
tweet_in_db = db.fetchone()
if tweet_in_db is not None:
2020-10-14 19:51:00 +00:00
logging.debug("Tweet %s already in database", tweet_id)
# Skip to next tweet
continue
2020-11-09 14:55:42 +00:00
else:
logging.debug('Tweet %s not found in database', tweet_id)
2020-10-14 19:51:00 +00:00
2020-02-15 14:39:01 +00:00
# extract author
author = status.find('a', class_='fullname').get('title')
2019-08-01 12:58:41 +00:00
# Extract user name
author_account = status.find('a', class_='username').get('title').lstrip('@')
2019-08-01 12:58:41 +00:00
2020-12-16 21:46:01 +00:00
# Extract URL of full status page (for video download)
full_status_url = 'https://twitter.com' + tweet_id
2020-12-17 21:59:21 +00:00
# Initialize containers
2020-12-17 16:56:12 +00:00
tweet_text = ''
2020-12-17 21:59:21 +00:00
photos = []
2020-12-17 16:56:12 +00:00
2020-12-17 17:59:02 +00:00
# Add prefix if the tweet is a reply-to
2020-12-17 16:56:12 +00:00
replying_to_class = status.find('div', class_='replying-to')
if replying_to_class is not None:
tweet_text += 'Replying to ' + replying_to_class.a.get_text()
# Check it the tweet is a retweet from somebody else
if author_account.lower() != twit_account.lower():
tweet_text = 'RT from ' + author + ' (@' + author_account + ')\n\n'
2020-02-14 06:58:39 +00:00
2019-08-01 12:58:41 +00:00
# extract iterator over tweet text contents
2020-12-16 21:46:01 +00:00
tt_iter = status.find('div', class_='tweet-content media-body').children
2019-08-01 12:58:41 +00:00
2020-12-17 21:59:21 +00:00
# Process text of tweet
tweet_text += process_media_body(tt_iter)
2020-12-17 20:44:32 +00:00
2020-12-17 21:59:21 +00:00
# Process quote: append link to tweet_text
quote_div = status.find('div', class_='quote-link')
if quote_div is not None:
tweet_text += '\n twitter.com' + quote_div.get('href').strip('#m')
2020-12-17 20:44:32 +00:00
2020-12-17 21:59:21 +00:00
# Process card : extract image if necessary
card_class = status.find('a', class_='card-container')
if card_class is not None:
photos.extend(process_card(card_class))
2020-12-17 20:44:32 +00:00
# TODO Process attachment: capture image or .mp4 url or download twitter video
2020-12-17 21:59:21 +00:00
attachments_class = status.find('a', class_='attachments')
2020-12-18 10:45:43 +00:00
if attachments_class is not None:
2020-12-17 21:59:21 +00:00
photos.extend(process_attachments(attachments_class))
2019-08-01 12:58:41 +00:00
# Add footer with link to original tweet
2020-12-16 21:46:01 +00:00
tweet_text += '\n\nOriginal tweet : ' + full_status_url
2019-08-01 12:58:41 +00:00
# If no media was specifically added in the tweet, try to get the first picture
# with "twitter:image" meta tag in first linked page in tweet text
if not photos:
m = re.search(r"http[^ \n\xa0]*", tweet_text)
if m is not None:
link_url = m.group(0)
if link_url.endswith(".html"): # Only process a web page
try:
r = requests.get(link_url, timeout=10)
if r.status_code == 200:
# Matches the first instance of either twitter:image or twitter:image:src meta tag
match = re.search(r'<meta name="twitter:image(?:|:src)" content="(.+?)".*?>', r.text)
if match is not None:
url = match.group(1).replace('&amp;', '&') # Remove HTML-safe encoding from URL if any
photos.append(url)
# Give up if anything goes wrong
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ContentDecodingError,
requests.exceptions.TooManyRedirects,
requests.exceptions.MissingSchema):
pass
# Check if video was downloaded
video_file = None
video_path = Path('./output') / twit_account / status_id
if video_path.exists():
# Take the first subdirectory of video path (named after original poster of video)
video_path = [p for p in video_path.iterdir() if p.is_dir()][0]
# Take again the first subdirectory of video path (named after status id of original post where vidoe is attached)
video_path = [p for p in video_path.iterdir() if p.is_dir()][0]
# list video files
video_file_list = list(video_path.glob('*.mp4'))
if len(video_file_list) != 0:
# Extract posix path of first video file in list
video_file = video_file_list[0].absolute().as_posix()
2019-08-01 12:58:41 +00:00
# Add dictionary with content of tweet to list
tweet = {
"author": author,
"author_account": author_account,
"timestamp": timestamp,
"tweet_id": tweet_id,
"tweet_text": tweet_text,
"video": video_file,
2019-08-01 12:58:41 +00:00
"photos": photos,
}
tweets.append(tweet)
logging.debug('Tweet %s added to list of toots to upload', tweet_id)
# TODO Log summary stats: how many not in db, how many in valid timeframe
2020-10-14 19:51:00 +00:00
2019-08-01 12:58:41 +00:00
# DEBUG: Print extracted tweets
#for t in tweets:
#print(t)
2019-08-01 12:58:41 +00:00
# **********************************************************
# Iterate tweets in list.
# post each on Mastodon and record it in database
2019-08-01 12:58:41 +00:00
# **********************************************************
# Create Mastodon application if it does not exist yet
if not os.path.isfile(mast_instance + '.secret'):
try:
Mastodon.create_app(
'twoot',
api_base_url='https://' + mast_instance,
to_file=mast_instance + '.secret'
)
except MastodonError as me:
print('failed to create app on ' + mast_instance)
sys.exit(1)
# Log in to Mastodon instance
try:
mastodon = Mastodon(
2019-08-01 12:58:41 +00:00
client_id=mast_instance + '.secret',
api_base_url='https://' + mast_instance
)
2019-08-01 12:58:41 +00:00
mastodon.log_in(
username=mast_account,
password=mast_password,
to_file=mast_account + ".secret"
)
except MastodonError as me:
logging.fatal('ERROR: Login to ' + mast_instance + ' Failed\n' + me)
sys.exit(1)
2019-08-01 12:58:41 +00:00
# Upload tweets
for tweet in reversed(tweets):
2020-10-14 19:51:00 +00:00
logging.debug('Uploading Tweet %s', tweet["tweet_id"])
2019-08-01 12:58:41 +00:00
media_ids = []
# Upload video if there is one
if tweet['video'] is not None:
try:
2020-11-09 14:55:42 +00:00
logging.debug("Uploading video")
media_posted = mastodon.media_post(tweet['video'])
media_ids.append(media_posted['id'])
except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
2020-11-09 14:55:42 +00:00
logging.debug("Uploading video failed")
pass
2019-08-01 12:58:41 +00:00
else: # Only upload pic if no video was uploaded
# Upload photos
for photo in tweet['photos']:
media = False
# Download picture
try:
media = requests.get(photo)
except: # Picture cannot be downloaded for any reason
pass
# Upload picture to Mastodon instance
if media:
try:
media_posted = mastodon.media_post(media.content, mime_type=media.headers['content-type'])
media_ids.append(media_posted['id'])
except (MastodonAPIError, MastodonIllegalArgumentError, TypeError): # Media cannot be uploaded (invalid format, dead link, etc.)
pass
2019-08-01 12:58:41 +00:00
# Post toot
try:
mastodon = Mastodon(
access_token=mast_account + '.secret',
api_base_url='https://' + mast_instance
)
if len(media_ids) == 0:
toot = mastodon.status_post(tweet['tweet_text'], visibility='public')
else:
toot = mastodon.status_post(tweet['tweet_text'], media_ids=media_ids, visibility='public')
except MastodonError as me:
2020-10-14 19:51:00 +00:00
logging.error('posting ' + tweet['tweet_text'] + ' to ' + mast_instance + ' Failed')
logging.error(me)
2019-08-01 12:58:41 +00:00
sys.exit(1)
2020-10-14 19:51:00 +00:00
logging.debug('Tweet %s posted on %s', tweet_id, mast_account)
2019-08-01 12:58:41 +00:00
# Insert toot id into database
if 'id' in toot:
db.execute("INSERT INTO toots VALUES ( ? , ? , ? , ? , ? )",
(twit_account, mast_instance, mast_account, tweet['tweet_id'], toot['id']))
sql.commit()
# Cleanup downloaded video files
try:
shutil.rmtree('./output/' + twit_account)
except FileNotFoundError: # The directory does not exist
pass
2019-08-01 12:58:41 +00:00
2019-08-01 12:58:41 +00:00
if __name__ == "__main__":
main(sys.argv)