diff options
Diffstat (limited to 'shadowtube.py')
-rw-r--r-- | shadowtube.py | 348 |
1 files changed, 348 insertions, 0 deletions
diff --git a/shadowtube.py b/shadowtube.py new file mode 100644 index 000000000000..ea445a93f73f --- /dev/null +++ b/shadowtube.py @@ -0,0 +1,348 @@ +#!/usr/bin/python3 + +### Dependencies + +from __future__ import print_function +import itertools, threading, subprocess +import socket, shutil +import time, json, html +import sys +import re, io, os + +try: + from lxml.cssselect import CSSSelector + from stem.control import Controller + from requests import get + from stem import Signal + from stem.connection import IncorrectPassword + from stem import SocketError + import lxml.html + import requests + import argparse + import socket + import socks +except ImportError: + subprocess.check_call([sys.executable, '-m', 'pip', '-r', 'install', 'requirements.txt']) + sys.exit(1) + +### Global variables/Settings + +YOUTUBE_VIDEO_URL = "https://www.youtube.com/watch?v={youtube_id}" +YOUTUBE_COMMENTS_AJAX_URL = "https://www.youtube.com/comment_service_ajax" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36" + +settings_dict = None +with open('/etc/shadowtube/settings.json') as f: + settings_dict = json.load(f); + +use_control_pass = settings_dict["use_control_pass"] +control_pass = settings_dict["control_pass"] +control_port = settings_dict["control_port"] +socks_port = settings_dict["socks_port"] + +### Tor + +def tor_authenticate(): + s = requests.Session() + s.proxies = {"http": "socks5://localhost:" + str(socks_port), "https": "socks5://localhost:" + str(socks_port)} + return s + +def tor_rotate(): + time.sleep(10) + try: + with Controller.from_port(port = 9151) as c: + if use_control_pass: + c.authenticate(password = control_pass) + c.signal(Signal.NEWNYM) + else: + c.authenticate() + c.signal(Signal.NEWNYM) + except IncorrectPassword: + print("Error: Failed to authenticate. Control port password incorrect.") + sys.exit(1) + except SocketError: + print("Error: Connection refused. Ensure cookie authentication/control port are enabled.") + sys.exit(1) + +def tor_validate(): + att = 0 + print("Reaching Tor service...") + while True: + try: + tor_authenticate().get("https://ip.seeip.org") + print("Success") + time.sleep(1.2) + os.system("clear") + break + except IOError: + print("Error: Failed to establish conection. Trying again in 5 seconds.") + att += 1 + time.sleep(5) + if att == 10: + print("User idle. Exiting.") + sys.exit(1) + +### Output + +def out_geoip(): + try: + r = tor_authenticate().get("https://ip.seeip.org/geoip") + r_dict = r.json() + print(r_dict["country"] + " — " + r_dict["ip"]) + except IOError: + print("Unknown") + +def out_conclude(att, acc): + if att == 0: + print("\nInterrupted before granted sufficient time") + elif acc == 0 and att > 0: + print("\nAlarming behavior detected") + elif att > acc: + print("\nQuestionable behavior detected") + elif att == acc and att > 0: + print("\nNo abnormal behavior detected") + +### Video - https://www.youtube.com/watch?v=Y6ljFaKRTrI + +def video_init(youtube_id): + att = 0 + acc = 0 + url = "https://www.youtube.com/watch?v=" + youtube_id + try: + while True: + try: + page_data = tor_authenticate().get(url).text + parse_title = str(re.findall('<title>(.*?) - YouTube</title><meta name="title" content=', page_data)) + title = html.unescape(parse_title.split("'")[1]) + break + except IndexError: + tor_rotate() + if title == "": + print("\nVideo unavailable") + sys.exit(1) + else: + print("\n" + title) + print("Interrupt (CTRL+C) to conclude the session\n") + while True: + tor_rotate() + q = tor_authenticate().get("https://www.youtube.com/results?search_query=" + "+".join(title.split())).text + if q.find('"title":{"runs":[{"text":"') >= 0: + if q.find(title) >= 0: + acc += 1 + print("[✓]", end=" ") + else: + print("[x]", end=" ") + out_geoip() + att += 1 + out_conclude(att, acc) + except KeyboardInterrupt: + out_conclude(att, acc) + +### Comments - https://www.youtube.com/feed/history/comment_history +### Comment url template (removed) - https://www.youtube.com/watch?v=OfsojVaqyAA&lc=Ugx5BtG_-N5pwDyvOiF4AaABAg.9NEWMl2CCJR9NI73GZeCDa + +def comments_init(): + att = 0 + acc = 0 + ind = 1 + print("\nInterrupt (CTRL+C) to conclude the session") + try: + with io.open("Google - My Activity.html", "r", encoding = "utf-8") as raw_html: + html = raw_html.read().replace("\n", "").replace("'", "`") + text_list = str(re.findall('<div class="QTGV3c" jsname="r4nke">(.*?)</div><div class="SiEggd">', html)) + uuid_list = str(re.findall('data-token="(.*?)" data-date', html)) + url_list = str(re.findall('<div class="iXL6O"><a href="(.*?)" jslog="65086; track:click"', html)) + for i in range(int(url_list.count("'") / 2)): + text = text_list.split("'")[ind] + uuid = uuid_list.split("'")[ind] + url = url_list.split("'")[ind] + comment_url = url + "&lc=" + uuid + ins = 0 + ind += 2 + print('\n"' + text.replace("`", "'") + '"') + print(url + "\n") + for i in range(0, 3, 1): + comments_fetch(url.replace("https://www.youtube.com/watch?v=", "")) + if private == True: + break + with open("temp.json", "r") as json: + j = json.read() + if j.find(uuid) >= 0: + print("[✓]", end=" ") + ins += 1 + else: + print("[x]", end=" ") + if ins > 0: + ins -= 1 + out_geoip() + tor_rotate() + if private == False: + if ins == 3: + acc += 1 + attempts += 1 + out_conclude(att, acc) + except KeyboardInterrupt: + out_conclude(att, acc) + +def comments_fetch(youtube_id): + parser = argparse.ArgumentParser() + try: + args, unknown = parser.parse_known_args() + output = "temp.json" + limit = 1000 + if not youtube_id or not output: + parser.print_usage() + raise ValueError('Error: Faulty video I.D.') + if os.sep in output: + if not os.path.exists(outdir): + os.makedirs(outdir) + count = 0 + with io.open(output, 'w', encoding='utf8') as fp: + for comment in comments_download(youtube_id): + comment_json = json.dumps(comment, ensure_ascii=False) + print(comment_json.decode('utf-8') if isinstance(comment_json, bytes) else comment_json, file=fp) + count += 1 + if limit and count >= limit: + break + except Exception as e: + print('Error:', str(e)) + sys.exit(1) + +def find_value(html, key, num_chars=2, separator='"'): + pos_begin = html.find(key) + len(key) + num_chars + pos_end = html.find(separator, pos_begin) + return html[pos_begin: pos_end] + +def ajax_request(session, url, params=None, data=None, headers=None, retries=5, sleep=20): + for _ in range(retries): + response = session.post(url, params=params, data=data, headers=headers) + if response.status_code == 200: + return response.json() + if response.status_code in [403, 413]: + return {} + else: + time.sleep(sleep) + +def comments_download(youtube_id, sleep=.1): + global private + private = False + session = requests.Session() + session.headers['User-Agent'] = USER_AGENT + + response = session.get(YOUTUBE_VIDEO_URL.format(youtube_id=youtube_id)) + html = response.text + + session_token = find_value(html, 'XSRF_TOKEN', 3) + session_token = session_token.encode('ascii').decode('unicode-escape') + + data = json.loads(find_value(html, 'var ytInitialData = ', 0, '};') + '}') + for renderer in search_dict(data, 'itemSectionRenderer'): + ncd = next(search_dict(renderer, 'nextContinuationData'), None) + if ncd: + break + try: + if not ncd: + private = False + return + except UnboundLocalError: + private = True + print("Private video") + return + continuations = [(ncd['continuation'], ncd['clickTrackingParams'], 'action_get_comments')] + while continuations: + continuation, itct, action = continuations.pop() + response = ajax_request(session, YOUTUBE_COMMENTS_AJAX_URL, + params={action: 1, + 'pbj': 1, + 'ctoken': continuation, + 'continuation': continuation, + 'itct': itct}, + data={'session_token': session_token}, + headers={'X-YouTube-Client-Name': '1', + 'X-YouTube-Client-Version': '2.20201202.06.01'}) + + if not response: + break + if list(search_dict(response, 'externalErrorMessage')): + raise RuntimeError('Error returned from server: ' + next(search_dict(response, 'externalErrorMessage'))) + + if action == 'action_get_comments': + section = next(search_dict(response, 'itemSectionContinuation'), {}) + for continuation in section.get('continuations', []): + ncd = continuation['nextContinuationData'] + continuations.append((ncd['continuation'], ncd['clickTrackingParams'], 'action_get_comments')) + for item in section.get('contents', []): + continuations.extend([(ncd['continuation'], ncd['clickTrackingParams'], 'action_get_comment_replies') + for ncd in search_dict(item, 'nextContinuationData')]) + + elif action == 'action_get_comment_replies': + continuations.extend([(ncd['continuation'], ncd['clickTrackingParams'], 'action_get_comment_replies') + for ncd in search_dict(response, 'nextContinuationData')]) + + for comment in search_dict(response, 'commentRenderer'): + yield {'cid': comment['commentId'],'text': ''.join([c['text'] for c in comment['contentText']['runs']])} + + time.sleep(sleep) + +def search_dict(partial, search_key): + stack = [partial] + while stack: + current_item = stack.pop() + if isinstance(current_item, dict): + for key, value in current_item.items(): + if key == search_key: + yield value + else: + stack.append(value) + elif isinstance(current_item, list): + for value in current_item: + stack.append(value) + +### Init + +def main(): + parser = argparse.ArgumentParser(description="A YouTube shadowban detection program.") + group = parser.add_mutually_exclusive_group() + group.add_argument("-v", "--video", help="analyze individual video URLs", action="store_true") + group.add_argument("-c", "--comments", help="analyze locally available comment history", action="store_true") + args = parser.parse_args() + if args.video: + tor_validate() + while True: + print("Complete the video URL in question") + youtube_id = input("https://www.youtube.com/watch?v=") + count = 0 + for c in youtube_id: + if c.isspace() != True: + count = count + 1 + if count == 11: + response = tor_authenticate().get("https://www.youtube.com/watch?v=" + youtube_id) + break + else: + os.system("clear") + video_init(youtube_id) + elif args.comments: + tor_validate() + while True: + try: + print('The basic HTML page data from https://www.youtube.com/feed/history/comment_history must be locally available to the script as "Google - My Activity.html"') + confirm = input("Confirm? (Y) ") + if confirm == "Y" or confirm == "y": + try: + io.open("Google - My Activity.html", "r") + break + except IOError: + print("Error: File does not exist.") + elif confirm == "N" or confirm == "n": + print("Exiting.") + sys.exit(1) + else: + os.system("clear") + except ValueError: + continue + comments_init() + else: + os.system("python3 shadowtube.py -h") + +if __name__ == "__main__": + main()
\ No newline at end of file |