Isn’t it odd how subscribing to a channel on Youtube means telling Youtube to aggregate that content on your behalf? Perhaps I’m spoiled by RSS where I can configure my own software to download content listings from sites I subscribe to. But in the cloud model, I have to trust Google to manage my subscriptions. As far as I can tell there is no benefit for me the user in this arrangement, and in fact it is a detriment because I lose control and become dependent on some corporation to faithfully keep my best interests in the forefront.

To alleviate my ails, I wrote a python script called utube (code included at the end of this page). The dependencies are youtube-dl and mplayer (though mplayer could be swapped out if need be). The intention of this project is multi-fold:

  • Move the management of subscriptions and marking content as watched from the cloud to the local machine to drastically improve independence and privacy.
  • Integrate multiple video platforms (e.g., youtube and bitchute) into one frontend to improve interoperability.
  • Eliminate dependence on the webapp for retrieval and playback of the video to improve performance.

The utube script maintains and updates a list of videos in a .utube file. Here are a few examples of subscribing to one of my favourite youtubers, The Hated One:

Subscribe on Youtube

The url for The Hated One’s channel on Youtube is https://www.youtube.com/channel/UCjr2bPAyPV7t35MvcgT3W8Q/videos. I can be subscribe using the following command:

$ utube -a https://www.youtube.com/channel/UCjr2bPAyPV7t35MvcgT3W8Q/videos

Subscribe on Bitchute

The url for The Hated One’s channel on Bitchute is https://www.bitchute.com/channel/thehatedone/. Using this url the same way as subscribing on Youtube will work, but updating the listing of videos is slow using Bitchute’s API. Luckily, Bitchute provides a RSS feed at https://www.bitchute.com/feeds/rss/channel/thehatedone/ (this pattern of inserting /feeds/rss/ in the URL works in general on Bitchute).

$ utube -a https://www.bitchute.com/feeds/rss/channel/thehatedone/

Subscribe Elsewhere

If you wish to subscribe to someone on a platform supported by youtube-dl but which has a prohibitively slow API, then it is easy to set up your own RSS feed on your server!

root@server # utube -a https://www.prohibitively-slow-api.com/channel/someone/
root@server # mkdir /var/www/rss/
root@server # crontab -e
[inside editor]
30 * * * * utube -u && cd /var/www/rss && utube -x

Then, on your client machine:

$ utube -a https://subdomain.domain.com/rss/someone.xml

The source code

#!/usr/bin/env python3
#
# Version 1.3
# https://vance.homelinuxserver.org/coolstuff/youtube-without-youtube.html
#

import argparse
import subprocess
from pathlib import Path
import os
import getpass
import re
import html
import json
import xml.etree.ElementTree as ET

CONF= os.path.expanduser('~/.utube')

parser = argparse.ArgumentParser(description='Play vids made by your favourite creators!')
add = parser.add_argument_group('add', 'Add a creator to be tracked by utube')
add.add_argument('-a', '--add', metavar='URL', help='The url of the channel/playlist/RSS feed to add')
parser.add_argument('-u', '--update', action='store_true', help='Update listing of videos')
parser.add_argument('-U', '--forceupdate', action='store_true', help='Clear and re-download listing of videos')
parser.add_argument('-x', '--xml', action='store_true', help='Dump listing of videos in XML format (for RSS)')
parser.add_argument('-l', '--list', action='store_true', help='List videos available')
parser.add_argument('-w', '--watch', nargs=1, type=int, help='Watch a specific video')
parser.add_argument('-L', '--listen', nargs=1, type=int, help='Listen to a specific video')
parser.add_argument('-d', '--download', nargs=1, type=int, help='Download a specific video')
parser.add_argument('-W', metavar='URL', help='Watch a specific url')
parser.add_argument('--markAllWatched', action='store_true', help='Mark all videos as watched')
parser.add_argument('--unwatch', nargs=1, type=int, help='Mark a specific video as unwatched')
parser.add_argument('--autosync', metavar='HOST', help='Name of host to sync .utube file with (operates over ssh)')
args = parser.parse_args()

def meAtMyself():
    return '{}@{}'.format(getpass.getuser(), os.uname()[1])

def readConf():
    utubrs = {} # { (url, name) : [ {'title': title, 'url': url, 'watched': True/False], ...] }
    syncDests = [meAtMyself()] # [ "`whoami`@`hostname`", "user1@host1", ... ]
    # Ensure it exists!
    Path(CONF).touch()
    with open(CONF) as f:
        lines = f.read().split('\n')
    if lines[0] == 'SYNCDESTS':
        end = lines.index('')
        hosts = lines[1:end]
        lines = lines[end+1:]
        for host in hosts:
            if host not in syncDests:
                syncDests.append(host)
    utubr = None
    for line in lines:
        if not utubr and line:
            utubr = tuple(line.rsplit('&', 1))
            utubrs[utubr] = []
        elif not line:
            utubr = None
        else:
            (title, url, watched) = line.rsplit('&', 2)
            if watched == 'True':
                watched = True
            else:
                watched = False
            utubrs[utubr].append({'title': title, 'url': url, 'watched': watched})
    return utubrs, syncDests

def writeConf(utubrs, syncDests):
    with open(CONF, 'w') as f:
        f.write('SYNCDESTS\n')
        for host in syncDests:
            f.write('{}\n'.format(host))
        f.write('\n')
        for utubr in sorted(utubrs):
            f.write('{}&{}\n'.format(utubr[0], utubr[1]))
            for vid in utubrs[utubr]:
                f.write('{}&{}&{}\n'.format(vid['title'], vid['url'], vid['watched']))
            f.write('\n') # blank line
    
    me = meAtMyself()
    for host in syncDests:
        if host != me:
            subprocess.run('scp {} {}:.utube 1> /dev/null'.format(CONF, host), shell=True)

def getVid(index):
    i = 0
    for utubr in sorted(utubrs):
        for vid in utubrs[utubr]:
            if i == index:
                return vid
            i += 1

def printFancy(vids = []):
    index = 0
    for utubr in sorted(utubrs):
        name = utubr[1]
        hasPrintedName = False
        for vid in utubrs[utubr]:
            color = '\033[91m'
            if vid['watched']:
                color = ''
            if not vids or any(v['url'] == vid['url'] for v in vids):
                if not hasPrintedName:
                    print('\033[36m\033[1m{}\033[0m:'.format(name))
                    hasPrintedName = True
                print('\t{}\033[1m{}\033[0m) {}'.format(color, index, vid['title']))
            index += 1
        if hasPrintedName:
            print()

def watchUrl(url):
    subprocess.run('youtube-dl --no-check-certificate -o - {} | mplayer -'.format(url), shell=True)

def listenUrl(url):
    subprocess.run('youtube-dl --no-check-certificate -o - {} | mplayer -vo null -'.format(url), shell=True)

def downloadUrl(url):
    subprocess.run('youtube-dl --no-check-certificate {} -o \'%(title)s.%(ext)s\''.format(url), shell=True)

utubrs, syncDests = readConf()

def group(lst, n):
    for i in range(0, len(lst), n):
        val = lst[i:i+n]
        if len(val) == n:
            yield tuple(val)

# Optimized for RSS downloads
def processRss(xmlData):
    try:
        root = ET.fromstring(xmlData)
        channel = root.find('channel')
        name = channel.find('title').text
        index = []
        for vid in channel.findall('item'):
            title = html.unescape(vid.find('title').text)
            url = vid.find('link').text
            index.append({'title': title, 'url': url, 'watched': False})
        return index, name
    except:
        return [], ''

# Optimized for Youtube downloads
def processYoutube(httpData):
    search = re.search('(?<=<meta name="title" content=").*(?=">)', httpData)
    if not search:
        return [], ''
    name = search.group(0)
    index = []
    for line in re.findall('^.*<h3 class="yt-lockup-title ">.*$', httpData, re.MULTILINE):
        title = html.unescape(re.search('(?<=rel="nofollow">).*(?=</a>)', line).group(0))
        if not re.search('(?<=Duration: ).*(?=</span>)', line):
            continue # It's a live stream
        url = "https://www.youtube.com" + re.search('(?<=href=").*(?=" rel=)', line).group(0)
        index.append({'title': title, 'url': url, 'watched': False})
    return index, name

def downloadMetadata(utubrUrl, maxDownloads):
    # Let's first try some optimized download strategies
    data = subprocess.run('curl -sL {}'.format(utubrUrl), shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8')
    index, name = processRss(data)
    if not name:
        index, name = processYoutube(data)
    if not name:
        for line in subprocess.run('youtube-dl {} -i -j --no-check-certificate --max-downloads {}'.format(utubrUrl, maxDownloads), shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode('utf-8').split('\n'):
            if not line:
                continue
            vid = json.loads(line)
            title = ''
            url = 'err'
            if 'title' in vid:
                title = vid['title']
            if 'webpage_url' in vid:
                url = vid['webpage_url']
            elif 'url' in vid:
                url = vid['url']
            index.append({'title': title, 'url': url, 'watched': False})
            if not name and 'uploader_id' in vid:
                name = vid['uploader_id']
            if not name and 'playlist' in vid:
                name = vid['playlist']
    if len(index) > maxDownloads:
        index = index[0:maxDownloads]
    return index, name

def mergeVids(index, utubr):
    merged = []
    seen = 0
    #print('\nProcessing {}'.format(utubr[1]))
    for new in index:
        if not any(new['url'] == vid['url'] for vid in utubrs[utubr]):
            #print('Appending {}'.format(new))
            merged.append(new)
            utubrs[utubr].insert(seen, new)
            seen += 1
        else:
            #print('I has seen "{}" before...'.format(new['title']))
            seen = [i for i, vid in enumerate(utubrs[utubr]) if vid['url'] == new['url']][0]
            #print('seen is now {}'.format(seen))
    # Remember at most 10 items per utubr
    if len(utubrs[utubr]) > 10:
        utubrs[utubr] = utubrs[utubr][0:10]
    return merged

if args.add:
    index, name = downloadMetadata(args.add, 10)
    utubr = (args.add, name)
    utubrs[utubr] = []
    mergeVids(index, utubr)
    printFancy(index)
    writeConf(utubrs, syncDests)

if args.forceupdate:
    for utubr in utubrs:
        utubrs[utubr] = []
    args.update = True #hacky, but whatever

if args.update:
    alldaNewStuff = []
    for utubr in utubrs:
        newstuff, name = downloadMetadata(utubr[0], 10)
        alldaNewStuff += mergeVids(newstuff, utubr)
    if alldaNewStuff:
        print('New stuff emerges!')
        printFancy(alldaNewStuff)
    else:
        print('No new vids.')
    writeConf(utubrs, syncDests)

if args.xml:
    for utubr in utubrs:
        # Create an output file
        with open('{}.xml'.format(utubr[1].replace(' ', '')), 'w') as f:
            f.write('<?xml version="1.0" encoding="ISO-8859-1" ?>\n<rss version="2.0">\n')
            f.write(' <channel>\n <title>{}</title>\n'.format(html.escape(utubr[1])))
            for vid in utubrs[utubr]:
                f.write('  <item><title>{}</title>'.format(html.escape(vid['title'])))
                f.write('<link>{}</link>'.format(vid['url']))
                f.write('<description></description></item>\n')
            f.write(' </channel>\n')
            f.write('</rss>\n')

if args.list:
    printFancy()

if args.watch:
    vid = getVid(args.watch[0])
    utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
    vid['watched'] = True
    writeConf(utubrs, syncDests)
    watchUrl(vid['url'])

if args.listen:
    vid = getVid(args.listen[0])
    utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
    vid['watched'] = True
    writeConf(utubrs, syncDests)
    listenUrl(vid['url'])

if args.W:
    watchUrl(args.W)

if args.download:
    vid = getVid(args.download[0])
    utubr = next(utubr for utubr in utubrs if vid in utubrs[utubr])
    vid['watched'] = True
    writeConf(utubrs, syncDests)
    downloadUrl(vid['url'])

if args.markAllWatched:
    for utubr in utubrs:
        for vid in utubrs[utubr]:
            vid['watched'] = True
    writeConf(utubrs, syncDests)

if args.unwatch:
    vid = getVid(args.unwatch[0])
    vid['watched'] = False
    writeConf(utubrs, syncDests)

if args.autosync:
    host=args.autosync
    if ':' in host:
        host = host.split(':')[0]
    syncDests.append(host)
    writeConf(utubrs, syncDests)