mirror of
synced 2025-03-18 19:50:02 +00:00
221 lines
7.4 KiB
221 lines
7.4 KiB
import argparse
import requests
import json
import urllib.parse
import m3u8
from pathlib import Path
import re
import ffmpeg
import shutil
import copy
class TwitterDownloader:
tw-dl offers the ability to download videos from Twitter feeds.
**Disclaimer** I wrote this to recover a video for which the original was lost. Consider copyright before downloading
content you do not own.
video_player_prefix = 'https://twitter.com/i/videos/tweet/'
video_api = 'https://api.twitter.com/1.1/videos/tweet/config/'
tweet_data = {}
def __init__(self, tweet_url, output_dir='./output', target_width=0, debug=0):
self.tweet_url = tweet_url
self.output_dir = output_dir
self.target_width = int(target_width)
self.debug = debug
if debug > 2:
self.debug = 2
We split on ? to clean up the URL. Sharing tweets, for example,
will add ? with data about which device shared it.
The rest is just getting the user and ID to work with.
self.tweet_data['tweet_url'] = tweet_url.split('?', 1)[0]
self.tweet_data['user'] = self.tweet_data['tweet_url'].split('/')[3]
self.tweet_data['id'] = self.tweet_data['tweet_url'].split('/')[5]
output_path = Path(output_dir)
storage_dir = output_path / self.tweet_data['user'] / self.tweet_data['id']
Path.mkdir(storage_dir, parents=True, exist_ok=True)
self.storage = str(storage_dir)
self.requests = requests.Session()
def download(self):
self.__debug('Tweet URL', self.tweet_data['tweet_url'])
# Get the bearer token
token = self.__get_bearer_token()
# Get the M3u8 file - this is where rate limiting has been happening
video_host, playlist = self.__get_playlist(token)
if playlist.is_variant:
if self.target_width == 0:
print('[+] Multiple resolutions found. Slurping all resolutions.')
print('[+] Multiple resolutions found. Selecting the one closest to target width of ' + str(self.target_width))
playlist = self.__filter_playlist(playlist)
for plist in playlist.playlists:
resolution = str(plist.stream_info.resolution[0]) + 'x' + str(plist.stream_info.resolution[1])
resolution_file = Path(self.storage) / Path(resolution + '.mp4')
print('[+] Downloading ' + resolution)
playlist_url = video_host + plist.uri
ts_m3u8_response = self.requests.get(playlist_url, headers = {'Authorization': None})
ts_m3u8_parse = m3u8.loads(ts_m3u8_response.text)
ts_list = []
ts_full_file_list = []
for ts_uri in ts_m3u8_parse.segments.uri:
# ts_list.append(video_host + ts_uri)
ts_file = requests.get(video_host + ts_uri)
fname = ts_uri.split('/')[-1]
ts_path = Path(self.storage) / Path(fname)
ts_full_file = Path(self.storage) / Path(resolution + '.ts')
ts_full_file = str(ts_full_file)
# Shamelessly taken from https://stackoverflow.com/questions/13613336/python-concatenate-text-files/27077437#27077437
with open(str(ts_full_file), 'wb') as wfd:
for f in ts_list:
with open(f, 'rb') as fd:
shutil.copyfileobj(fd, wfd, 1024 * 1024 * 10)
for ts in ts_full_file_list:
print('\t[*] Doing the magic ...')
.output(str(resolution_file), acodec='copy', vcodec='libx264', format='mp4', loglevel='error')\
print('\t[+] Doing cleanup')
for ts in ts_list:
p = Path(ts)
for ts in ts_full_file_list:
p = Path(ts)
print('[-] Sorry, single resolution video download is not yet implemented. Please submit a bug report with the link to the tweet.')
def __get_bearer_token(self):
video_player_url = self.video_player_prefix + self.tweet_data['id']
video_player_response = self.requests.get(video_player_url).text
self.__debug('Video Player Body', '', video_player_response)
js_file_url = re.findall('src="(.*js)', video_player_response)[0]
js_file_response = self.requests.get(js_file_url).text
self.__debug('JS File Body', '', js_file_response)
bearer_token_pattern = re.compile('Bearer ([a-zA-Z0-9%-])+')
bearer_token = bearer_token_pattern.search(js_file_response)
bearer_token = bearer_token.group(0)
self.requests.headers.update({'Authorization': bearer_token})
self.__debug('Bearer Token', bearer_token)
return bearer_token
def __get_playlist(self, token):
player_config_req = self.requests.get(self.video_api + self.tweet_data['id'] + '.json')
player_config = json.loads(player_config_req.text)
if 'errors' not in player_config:
self.__debug('Player Config JSON', '', json.dumps(player_config))
m3u8_url = player_config['track']['playbackUrl']
self.__debug('Player Config JSON - Error', json.dumps(player_config['errors']))
print('[-] Rate limit exceeded. Could not recover. Try again later.')
# Get m3u8
m3u8_response = self.requests.get(m3u8_url)
self.__debug('M3U8 Response', '', m3u8_response.text)
m3u8_url_parse = urllib.parse.urlparse(m3u8_url)
video_host = m3u8_url_parse.scheme + '://' + m3u8_url_parse.hostname
m3u8_parse = m3u8.loads(m3u8_response.text)
return [video_host, m3u8_parse]
Thanks to @devkarim for this fix: https://github.com/h4ckninja/twitter-video-downloader/issues/2#issuecomment-538773026
def __get_guest_token(self):
res = self.requests.post("https://api.twitter.com/1.1/guest/activate.json")
res_json = json.loads(res.text)
self.requests.headers.update({'x-guest-token': res_json.get('guest_token')})
def __filter_playlist(self, playlist):
# Make a copy of the playlist object and reset 'playlists' member
new_playlist = copy.deepcopy(playlist)
new_playlist.playlists = []
# Arbitrary high number that any resolution will beat
min_dist_2_target = 100000
for instance in playlist.playlists:
# Calculate how far the width of considered resolution is from our target
dist_2_target = abs(instance.stream_info.resolution[0] - self.target_width)
if dist_2_target < min_dist_2_target:
min_dist_2_target = dist_2_target
# Replace the only item of new_playlist with this one
new_playlist.playlists = []
return new_playlist
def __debug(self, msg_prefix, msg_body, msg_body_full = ''):
if self.debug == 0:
if self.debug == 1:
print('[Debug] ' + '[' + msg_prefix + ']' + ' ' + msg_body)
if self.debug == 2:
print('[Debug+] ' + '[' + msg_prefix + ']' + ' ' + msg_body + ' - ' + msg_body_full)
if __name__ == '__main__':
import sys
if sys.version_info[0] == 2:
print('Python3 is required.')
parser = argparse.ArgumentParser()
parser.add_argument('tweet_url', help='The video URL on Twitter (https://twitter.com/<user>/status/<id>).')
parser.add_argument('-o', '--output', dest='output', default='./output', help='The directory to output to. The structure will be: <output>/<user>/<id>.')
parser.add_argument('-d', '--debug', default=0, action='count', dest='debug', help='Debug. Add more to print out response bodies (maximum 2).')
parser.add_argument('-w', '--target_width', dest='target_width', default=0, help='In pixels. Download only the video resolution closest to this value')
args = parser.parse_args()
twitter_dl = TwitterDownloader(args.tweet_url, output_dir=args.output, target_width=args.target_width, debug=args.debug)