import os
import sys
import re
import glob
import time
import json
import logging
import internetarchive
from internetarchive.config import parse_config_file
from datetime import datetime
from yt_dlp import YoutubeDL
from .utils import (sanitize_identifier, check_is_file_empty,
EMPTY_ANNOTATION_FILE)
from logging import getLogger
from urllib.parse import urlparse
from tubeup import __version__
DOWNLOAD_DIR_NAME = 'downloads'
class TubeUp(object):
def __init__(self,
verbose=False,
dir_path='~/.tubeup',
ia_config_path=None,
output_template=None,
get_comments=False):
"""
`tubeup` is a tool to archive YouTube by downloading the videos and
uploading it back to the archive.org.
:param verbose: A boolean, True means all loggings will be
printed out to stdout.
:param dir_path: A path to directory that will be used for
saving the downloaded resources. Default to
'~/.tubeup'.
:param ia_config_path: Path to an internetarchive config file, will
be used in uploading the file.
:param output_template: A template string that will be used to
generate the output filenames.
:param get_comments: A boolean, True means that the comments will
be scraped.
"""
self.dir_path = dir_path
self.verbose = verbose
self.ia_config_path = ia_config_path
self.logger = getLogger(__name__)
if output_template is None:
self.output_template = '%(id)s.%(ext)s'
else:
self.output_template = output_template
self.get_comments = get_comments
# Just print errors in quiet mode
if not self.verbose:
self.logger.setLevel(logging.ERROR)
@property
def dir_path(self):
return self._dir_path
@dir_path.setter
def dir_path(self, dir_path):
"""
Set a directory to be the saving directory for resources that have
been downloaded.
:param dir_path: Path to a directory that will be used to save the
videos, if it not created yet, the directory
will be created.
"""
extended_usr_dir_path = os.path.expanduser(dir_path)
# Create the directories.
os.makedirs(
os.path.join(extended_usr_dir_path, DOWNLOAD_DIR_NAME),
exist_ok=True)
self._dir_path = {
'root': extended_usr_dir_path,
'downloads': os.path.join(extended_usr_dir_path,
DOWNLOAD_DIR_NAME)
}
def get_resource_basenames(self, urls,
cookie_file=None, proxy_url=None,
ydl_username=None, ydl_password=None,
use_download_archive=False,
ignore_existing_item=False):
"""
Get resource basenames from an url.
:param urls: A list of urls that will be downloaded with
youtubedl.
:param cookie_file: A cookie file for YoutubeDL.
:param proxy_url: A proxy url for YoutubeDL.
:param ydl_username: Username that will be used to download the
resources with youtube_dl.
:param ydl_password: Password of the related username, will be used
to download the resources with youtube_dl.
:param use_download_archive: Record the video url to the download archive.
This will download only videos not listed in
the archive file. Record the IDs of all
downloaded videos in it.
:param ignore_existing_item: Ignores the check for existing items on archive.org.
:return: Set of videos basename that has been downloaded.
"""
downloaded_files_basename = set()
def check_if_ia_item_exists(infodict):
itemname = sanitize_identifier('%s-%s' % (infodict['extractor'],
infodict['display_id']))
item = internetarchive.get_item(itemname)
if item.exists and self.verbose:
print("\n:: Item already exists. Not downloading.")
print('Title: %s' % infodict['title'])
print('Video URL: %s\n' % infodict['webpage_url'])
return 1
return 0
def ydl_progress_hook(d):
if d['status'] == 'downloading' and self.verbose:
if d.get('_total_bytes_str') is not None:
msg_template = ('%(_percent_str)s of %(_total_bytes_str)s '
'at %(_speed_str)s ETA %(_eta_str)s')
elif d.get('_total_bytes_estimate_str') is not None:
msg_template = ('%(_percent_str)s of '
'~%(_total_bytes_estimate_str)s at '
'%(_speed_str)s ETA %(_eta_str)s')
elif d.get('_downloaded_bytes_str') is not None:
if d.get('_elapsed_str'):
msg_template = ('%(_downloaded_bytes_str)s at '
'%(_speed_str)s (%(_elapsed_str)s)')
else:
msg_template = ('%(_downloaded_bytes_str)s '
'at %(_speed_str)s')
else:
msg_template = ('%(_percent_str)s % at '
'%(_speed_str)s ETA %(_eta_str)s')
process_msg = '\r[download] ' + (msg_template % d) + '\033[K'
sys.stdout.write(process_msg)
sys.stdout.flush()
if d['status'] == 'finished':
msg = 'Downloaded %s' % d['filename']
self.logger.debug(d)
self.logger.info(msg)
if self.verbose:
print(msg)
if d['status'] == 'error':
# TODO: Complete the error message
msg = 'Error when downloading the video'
self.logger.error(msg)
if self.verbose:
print(msg)
ydl_opts = self.generate_ydl_options(ydl_progress_hook,
cookie_file, proxy_url,
ydl_username, ydl_password,
use_download_archive)
with YoutubeDL(ydl_opts) as ydl:
for url in urls:
if not ignore_existing_item:
# Get the info dict of the url, without getting comments
ydl_opts["getcomments"] = False
with YoutubeDL(ydl_opts) as ydl_nocomments:
info_dict = ydl_nocomments.extract_info(url, download=False)
if info_dict.get('_type', 'video') == 'playlist':
for entry in info_dict['entries']:
if ydl.in_download_archive(entry):
continue
if check_if_ia_item_exists(entry) == 0:
ydl.extract_info(entry['webpage_url'])
downloaded_files_basename.update(self.create_basenames_from_ydl_info_dict(ydl, entry))
else:
ydl.record_download_archive(entry)
else:
if ydl.in_download_archive(info_dict):
continue
if check_if_ia_item_exists(info_dict) == 0:
ydl.extract_info(url)
downloaded_files_basename.update(self.create_basenames_from_ydl_info_dict(ydl, info_dict))
else:
ydl.record_download_archive(info_dict)
else:
info_dict = ydl.extract_info(url)
downloaded_files_basename.update(self.create_basenames_from_ydl_info_dict(ydl, info_dict))
self.logger.debug(
'Basenames obtained from url (%s): %s'
% (url, downloaded_files_basename))
return downloaded_files_basename
def create_basenames_from_ydl_info_dict(self, ydl, info_dict):
"""
Create basenames from YoutubeDL info_dict.
:param ydl: A `youtube_dl.YoutubeDL` instance.
:param info_dict: A ydl info_dict that will be used to create
the basenames.
:return: A set that contains basenames that created from
the `info_dict`.
"""
info_type = info_dict.get('_type', 'video')
self.logger.debug('Creating basenames from ydl info dict with type %s'
% info_type)
filenames = set()
if info_type == 'playlist':
# Iterate and get the filenames through the playlist
for video in info_dict['entries']:
filenames.add(ydl.prepare_filename(video))
else:
filenames.add(ydl.prepare_filename(info_dict))
basenames = set()
for filename in filenames:
filename_without_ext = os.path.splitext(filename)[0]
file_basename = re.sub(r'(\.f\d+)', '', filename_without_ext)
basenames.add(file_basename)
return basenames
def generate_ydl_options(self,
ydl_progress_hook,
cookie_file=None,
proxy_url=None,
ydl_username=None,
ydl_password=None,
use_download_archive=False,
ydl_output_template=None):
"""
Generate a dictionary that contains options that will be used
by yt-dlp.
:param ydl_progress_hook: A function that will be called during the
download process by youtube_dl.
:param proxy_url: A proxy url for YoutubeDL.
:param ydl_username: Username that will be used to download the
resources with youtube_dl.
:param ydl_password: Password of the related username, will be
used to download the resources with
youtube_dl.
:param use_download_archive: Record the video url to the download archive.
This will download only videos not listed in
the archive file. Record the IDs of all
downloaded videos in it.
:return: A dictionary that contains options that will
be used by youtube_dl.
"""
ydl_opts = {
'outtmpl': os.path.join(self.dir_path['downloads'],
self.output_template),
'restrictfilenames': True,
'quiet': not self.verbose,
'verbose': self.verbose,
'progress_with_newline': True,
'forcetitle': True,
'continuedl': True,
'retries': 9001,
'fragment_retries': 9001,
'forcejson': False,
'writeinfojson': True,
'writedescription': True,
'getcomments': self.get_comments,
'writethumbnail': True,
'writeannotations': True,
'writesubtitles': True,
'allsubtitles': True,
'ignoreerrors': True, # Geo-blocked,
# copyrighted/private/deleted
# will be printed to STDOUT and channel
# ripping will continue uninterupted,
# use with verbose off
'fixup': 'warn', # Slightly more verbosity for debugging
# problems
'nooverwrites': True, # Don't touch what's already been
# downloaded speeds things
'consoletitle': True, # Download percentage in console title
'prefer_ffmpeg': True, # `ffmpeg` is better than `avconv`,
# let's prefer it's use
# Warns on out of date youtube-dl script, helps debugging for
# youtube-dl devs
'call_home': False,
'logger': self.logger,
'progress_hooks': [ydl_progress_hook]
}
if cookie_file is not None:
ydl_opts['cookiefile'] = cookie_file
if proxy_url is not None:
ydl_opts['proxy'] = proxy_url
if ydl_username is not None:
ydl_opts['username'] = ydl_username
if ydl_password is not None:
ydl_opts['password'] = ydl_password
if use_download_archive:
ydl_opts['download_archive'] = os.path.join(self.dir_path['root'],
'.ytdlarchive')
return ydl_opts
def upload_ia(self, videobasename, custom_meta=None):
"""
Upload video to archive.org.
:param videobasename: A video base name.
:param custom_meta: A custom meta, will be used by internetarchive
library when uploading to archive.org.
:return: A tuple containing item name and metadata used
when uploading to archive.org and whether the item
already exists.
"""
json_metadata_filepath = videobasename + '.info.json'
with open(json_metadata_filepath, 'r', encoding='utf-8') as f:
vid_meta = json.load(f)
itemname = ('%s-%s' % (vid_meta['extractor'],
vid_meta['display_id']))
# Exit if video download did not complete, don't upload .part files to IA
for ext in ['*.part', '*.f303*', '*.f302*', '*.ytdl', '*.f251*', '*.248*', '*.f247*', '*.temp']:
if glob.glob(videobasename + ext):
msg = 'Video download incomplete, re-attempt archival attempt, exiting...'
raise Exception(msg)
# Replace illegal characters within identifer
itemname = sanitize_identifier(itemname)
metadata = self.create_archive_org_metadata_from_youtubedl_meta(
vid_meta)
# Delete empty description file
description_file_path = videobasename + '.description'
if (os.path.exists(description_file_path) and
(('description' in vid_meta and
vid_meta['description'] == '') or
check_is_file_empty(description_file_path))):
os.remove(description_file_path)
# Delete empty annotations.xml file so it isn't uploaded
annotations_file_path = videobasename + '.annotations.xml'
if (os.path.exists(annotations_file_path) and
(('annotations' in vid_meta and
vid_meta['annotations'] in {'', EMPTY_ANNOTATION_FILE}) or
check_is_file_empty(annotations_file_path))):
os.remove(annotations_file_path)
# Upload all files with videobase name: e.g. video.mp4,
# video.info.json, video.srt, etc.
files_to_upload = glob.glob(videobasename + '*')
# Upload the item to the Internet Archive
item = internetarchive.get_item(itemname)
if custom_meta:
metadata.update(custom_meta)
# Parse internetarchive configuration file.
parsed_ia_s3_config = parse_config_file(self.ia_config_path)[2]['s3']
s3_access_key = parsed_ia_s3_config['access']
s3_secret_key = parsed_ia_s3_config['secret']
if None in {s3_access_key, s3_secret_key}:
msg = ('`internetarchive` configuration file is not configured'
' properly.')
self.logger.error(msg)
if self.verbose:
print(msg)
raise Exception(msg)
item.upload(files_to_upload, metadata=metadata, retries=9001,
request_kwargs=dict(timeout=9001), delete=True,
verbose=self.verbose, access_key=s3_access_key,
secret_key=s3_secret_key)
return itemname, metadata
def archive_urls(self, urls, custom_meta=None,
cookie_file=None, proxy=None,
ydl_username=None, ydl_password=None,
use_download_archive=False,
ignore_existing_item=False):
"""
Download and upload videos from youtube_dl supported sites to
archive.org
:param urls: List of url that will be downloaded and uploaded
to archive.org
:param custom_meta: A custom metadata that will be used when
uploading the file with archive.org.
:param cookie_file: A cookie file for YoutubeDL.
:param proxy_url: A proxy url for YoutubeDL.
:param ydl_username: Username that will be used to download the
resources with youtube_dl.
:param ydl_password: Password of the related username, will be used
to download the resources with youtube_dl.
:param use_download_archive: Record the video url to the download archive.
This will download only videos not listed in
the archive file. Record the IDs of all
downloaded videos in it.
:param ignore_existing_item: Ignores the check for existing items on archive.org.
:return: Tuple containing identifier and metadata of the
file that has been uploaded to archive.org.
"""
downloaded_file_basenames = self.get_resource_basenames(
urls, cookie_file, proxy, ydl_username, ydl_password, use_download_archive,
ignore_existing_item)
for basename in downloaded_file_basenames:
identifier, meta = self.upload_ia(basename, custom_meta)
yield identifier, meta
@staticmethod
def determine_collection_type(url):
"""
Determine collection type for an url.
:param url: URL that the collection type will be determined.
:return: String, name of a collection.
"""
if urlparse(url).netloc == 'soundcloud.com':
return 'opensource_audio'
return 'opensource_movies'
@staticmethod
def determine_licenseurl(vid_meta):
"""
Determine licenseurl for an url
:param vid_meta:
:return:
"""
licenseurl = ''
licenses = {
"Creative Commons Attribution license (reuse allowed)": "https://creativecommons.org/licenses/by/3.0/",
"Attribution-NonCommercial-ShareAlike": "https://creativecommons.org/licenses/by-nc-sa/2.0/",
"Attribution-NonCommercial": "https://creativecommons.org/licenses/by-nc/2.0/",
"Attribution-NonCommercial-NoDerivs": "https://creativecommons.org/licenses/by-nc-nd/2.0/",
"Attribution": "https://creativecommons.org/licenses/by/2.0/",
"Attribution-ShareAlike": "https://creativecommons.org/licenses/by-sa/2.0/",
"Attribution-NoDerivs": "https://creativecommons.org/licenses/by-nd/2.0/"
}
if 'license' in vid_meta and vid_meta['license']:
licenseurl = licenses.get(vid_meta['license'])
return licenseurl
@staticmethod
def create_archive_org_metadata_from_youtubedl_meta(vid_meta):
"""
Create an archive.org from youtubedl-generated metadata.
:param vid_meta: A dict containing youtubedl-generated metadata.
:return: A dict containing metadata to be used by
internetarchive library.
"""
title = '%s' % (vid_meta['title'])
videourl = vid_meta['webpage_url']
collection = TubeUp.determine_collection_type(videourl)
# Some video services don't tell you the uploader,
# use our program's name in that case.
try:
if vid_meta['extractor_key'] == 'TwitchClips' and 'creator' in vid_meta and vid_meta['creator']:
uploader = vid_meta['creator']
elif 'uploader' in vid_meta and vid_meta['uploader']:
uploader = vid_meta['uploader']
elif 'uploader_url' in vid_meta and vid_meta['uploader_url']:
uploader = vid_meta['uploader_url']
else:
uploader = 'tubeup.py'
except TypeError: # apparently uploader is null as well
uploader = 'tubeup.py'
uploader_url = vid_meta.get('uploader_url', videourl)
try: # some videos don't give an upload date
d = datetime.strptime(vid_meta['upload_date'], '%Y%m%d')
upload_date = d.isoformat().split('T')[0]
upload_year = upload_date[:4] # 20150614 -> 2015
except (KeyError, TypeError):
# Use current date and time as default values
upload_date = time.strftime("%Y-%m-%d")
upload_year = time.strftime("%Y")
# load up tags into an IA compatible semicolon-separated string
# example: Youtube;video;
tags_string = '%s;video;' % vid_meta['extractor_key']
if 'categories' in vid_meta:
# add categories as tags as well, if they exist
try:
for category in vid_meta['categories']:
tags_string += '%s;' % category
except Exception:
print("No categories found.")
if 'tags' in vid_meta: # some video services don't have tags
try:
if 'tags' in vid_meta is None:
tags_string += '%s;' % vid_meta['id']
tags_string += '%s;' % 'video'
else:
for tag in vid_meta['tags']:
tags_string += '%s;' % tag
except Exception:
print("Unable to process tags successfully.")
# license
licenseurl = TubeUp.determine_licenseurl(vid_meta)
# if there is no description don't upload the empty .description file
description_text = vid_meta.get('description', '')
if description_text is None:
description_text = ''
# archive.org does not display raw newlines
description_text = re.sub('\r?\n', '
', description_text)
description = ('{0}
Source: {2}'
'
Uploader: {4}').format(
description_text, videourl, videourl, uploader_url, uploader)
metadata = dict(
mediatype=('audio' if collection == 'opensource_audio'
else 'movies'),
creator=uploader,
collection=collection,
title=title,
description=description,
date=upload_date,
year=upload_year,
subject=tags_string,
originalurl=videourl,
licenseurl=licenseurl,
# Set 'scanner' metadata pair to allow tracking of TubeUp
# powered uploads, per request from archive.org
scanner='TubeUp Video Stream Mirroring Application {}'.format(__version__))
return metadata