Python Sample Codes for Intrusion Detections and Preventions
1. Secure Password Generation: Use 'secrets' module.
import string
import secrets
alphabet = string.ascii_letters + string.digits
password = ''.join(secrets.choice(alphabet) for i in range(16))
print(password)
2. Hashing Passwords: Use 'hashlib' module hashing passwords with SHA-256, SHA-512, etc.
Example 1:
import hashlib
password = "mysecret"
hash_object = hashlib.sha256(password.encode())
hashed_password = hash_object.hexdigest()
print(hashed_password)
Example 2: Use bcrypt library hashing password and store the hashed password in a MySQL database.
import bcrypt
import mysql.connector
# Connect to MySQL database
db = mysql.connector.connect(
host="localhost",
user="yourusername",
password="yourpassword",
database="yourdatabase"
)
# Generate a salt for password hashing
salt = bcrypt.gensalt()
# Get password from user input
password = input("Enter your password: ")
# Hash the password using bcrypt
hashed_password = bcrypt.hashpw(password.encode('utf-8'), salt)
# Store the hashed password in the database
cursor = db.cursor()
sql = "INSERT INTO users (username, password) VALUES (%s, %s)"
values = ("user123", hashed_password.decode('utf-8'))
cursor.execute(sql, values)
db.commit()
print("Password hashed and stored in database!")
3. Input Validation: Use 'pyinputsecurity', 'validators', 're', 'wtforms' and 'django-forms' libraries.
Example 1: Use 're'
import re
email = "john@example.com"
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
print("Invalid email address")
else:
print("Valid email address")
Example 2:
import pyinputsecurity
username = pyinputsecurity.get_input_string('Enter username: ')
email = pyinputsecurity.get_input_email('Enter email: ')
password = pyinputsecurity.get_input_password('Enter password: ')
Example 3:
import validators
email = 'example@example.com'
if validators.email(email):
print('Valid email address')
else:
print('Invalid email address')
Example 4:
from flask import Flask, render_template_string, request
from wtforms import Form, StringField, validators
app = Flask(__name__)
class MyForm(Form):
name = StringField('Name', validators=[validators.InputRequired()])
@app.route('/', methods=['GET', 'POST'])
def index():
form = MyForm(request.form)
if request.method == 'POST' and form.validate():
# Process form data here
return 'Form submitted successfully'
return render_template_string('''
<form method="post">
{{ form.csrf_token }}
{{ form.name.label }} {{ form.name }}
<input type="submit" value="Submit">
</form>
''', form=form)
if __name__ == '__main__':
app.run()
4. Scans an access.log file for intrusion detection:
import reimport loggingdef scan_access_log(log_file):intrusion_patterns = [r'(Unauthorized access)',r'(SQL injection)',r'(Cross-Site Scripting)',# Add more intrusion patterns as needed]logging.basicConfig(filename='intrusion.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')try:with open(log_file, 'r') as file:for line_number, line in enumerate(file, start=1):for pattern in intrusion_patterns:if re.search(pattern, line, re.IGNORECASE):# Intrusion detected, handle it accordinglylogging.warning(f"Intrusion detected in log line {line_number}: {line.strip()}")send_alert(line.strip()) # Function to send an alert# You can perform additional actions like logging, alerting, etc.except FileNotFoundError:logging.error(f"Log file '{log_file}' not found")except Exception as e:logging.error(f"An error occurred while scanning the log file: {str(e)}")def send_alert(message):# Implement the logic to send an alert, such as an email, SMS, or notification.print("Alert: ", message)# Usageaccess_log_file = 'access.log'scan_access_log(access_log_file)
5. Encryption and Decryption: Use 'cryptography' module for encryption and decryption.
Example 1: Encrypt an ddecrypt a string
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher_suite = Fernet(key)
data = "my secret data".encode()
cipher_text = cipher_suite.encrypt(data)
print(cipher_text)
plain_text = cipher_suite.decrypt(cipher_text)
print(plain_text.decode())
Example 2: Encrypt a text file
from cryptography.fernet import Fernet
# Generate a key for encryption
key = Fernet.generate_key()
# Save the key to a file
with open('key.key', 'wb') as key_file:
key_file.write(key)
# Load the key from the file
with open('key.key', 'rb') as key_file:
key = key_file.read()
# Initialize the Fernet cipher with the key
cipher = Fernet(key)
# Open the plaintext file to be encrypted
with open('plaintext.txt', 'rb') as plaintext_file:
plaintext = plaintext_file.read()
# Encrypt the plaintext using the Fernet cipher
encrypted_data = cipher.encrypt(plaintext)
# Save the encrypted data to a file
with open('encrypted.txt', 'wb') as encrypted_file:
encrypted_file.write(encrypted_data)
print("File encrypted successfully!")
Example 3: Decrypt file encrypted.txt
from cryptography.fernet import Fernet
# Load the key from the file
with open('key.key', 'rb') as key_file:
key = key_file.read()
# Initialize the Fernet cipher with the key
cipher = Fernet(key)
# Open the encrypted file to be decrypted
with open('encrypted.txt', 'rb') as encrypted_file:
encrypted_data = encrypted_file.read()
# Decrypt the encrypted data using the Fernet cipher
decrypted_data = cipher.decrypt(encrypted_data)
# Save the decrypted data to a file
with open('decrypted.txt', 'wb') as decrypted_file:
decrypted_file.write(decrypted_data)
print("File decrypted successfully!")
Example 4: Encrypt and decrypt a directory.
import os
import shutil
from cryptography.fernet import Fernet
def encrypt_directory(input_dir, output_dir, key):
"""Encrypts a directory using Fernet symmetric encryption.
:param input_dir: The path to the directory to be encrypted.
:param output_dir: The path to the output directory for the encrypted files.
:param key: The encryption key to be used.
"""
# create the output directory if it does not exist
if not os.path.exists(output_dir):
os.mkdir(output_dir)
# create the Fernet instance with the given key
fernet = Fernet(key)
# encrypt each file in the input directory
for root, dirs, files in os.walk(input_dir):
for filename in files:
input_file = os.path.join(root, filename)
output_file = os.path.join(output_dir, filename + '.encrypted')
with open(input_file, 'rb') as f:
data = f.read()
encrypted_data = fernet.encrypt(data)
with open(output_file, 'wb') as f:
f.write(encrypted_data)
def decrypt_directory(input_dir, output_dir, key):
"""Decrypts a directory that was encrypted using Fernet symmetric encryption.
:param input_dir: The path to the encrypted directory.
:param output_dir: The path to the output directory for the decrypted files.
:param key: The encryption key that was used.
"""
# create the output directory if it does not exist
if not os.path.exists(output_dir):
os.mkdir(output_dir)
# create the Fernet instance with the given key
fernet = Fernet(key)
# decrypt each file in the input directory
for root, dirs, files in os.walk(input_dir):
for filename in files:
input_file = os.path.join(root, filename)
if not input_file.endswith('.encrypted'):
continue
output_file = os.path.join(output_dir, filename[:-len('.encrypted')])
with open(input_file, 'rb') as f:
encrypted_data = f.read()
decrypted_data = fernet.decrypt(encrypted_data)
with open(output_file, 'wb') as f:
f.write(decrypted_data)
if __name__ == '__main__':
# specify the input and output directories
input_dir = 'path/to/input/dir'
output_dir = 'path/to/output/dir'
# generate a new Fernet key
key = Fernet.generate_key()
# encrypt the input directory
encrypt_directory(input_dir, output_dir, key)
# decrypt the output directory
decrypt_directory(output_dir, input_dir + '_decrypted', key)
6. Cross-Site Request Forgery (CSRF) Protection: Use 'csrf' module.
Example 1:
from flask import Flask, render_template_string
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret key'
csrf = CSRFProtect(app)
@app.route('/')
def index():
template = '<form method="POST"><input type="submit" value="Submit"></form>'
return render_template_string(template)
if __name__ == '__main__':
app.run()
Example 2:
from flask import Flask, render_template, request
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
csrf = CSRFProtect(app)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# process form data
pass
return render_template('index.html')
if __name__ == '__main__':
app.run()
Example 3:
from flask import Flask, render_template_string, session
from flask_wtf import FlaskForm
from wtforms import SubmitField
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'
class MyForm(FlaskForm):
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
def index():
form = MyForm()
if form.validate_on_submit():
# Perform form submission here
return 'Form submitted successfully'
return render_template_string('''
<form method="post">
{{ form.csrf_token }}
{{ form.submit }}
</form>
''', form=form)
if __name__ == '__main__':
app.run()
7. Input Sanitization: Use 'html', 'bleach' and 'html-sanitizer' libraries.
Example 1:
import html
user_input = '<script>alert("Hello")</script>'
sanitized_input = html.escape(user_input)
print(sanitized_input)
Example 2:
import bleach
text = '<script>alert("Hello, World!")</script>'
clean_text = bleach.clean(text, strip=True)
print(clean_text)
Example 3: Sanitize input data using regular expressions.
import re
def sanitize_input(input_str):
"""Sanitizes input data by removing characters that are not alphanumeric or whitespace.
:param input_str: The input string to be sanitized.
:return: The sanitized input string.
"""
# remove all characters that are not alphanumeric or whitespace
sanitized_str = re.sub('[^a-zA-Z0-9\s]', '', input_str)
return sanitized_str
8. Two-Factor Authentication (2FA): Use 'pyotp' and 'django-two-factor-auth' libraries.
Example 1:
import pyotp
totp = pyotp.TOTP('JBSWY3DPEHPK3PXP')
print('Current OTP:', totp.now())
# Verify OTP
user_input = input('Enter OTP:')
if totp.verify(user_input):
print('OTP is valid')
else:
print('OTP is invalid')
Example 2:
import pyotp
otp_secret = pyotp.random_base32()
otp = pyotp.TOTP(otp_secret)
print('OTP:', otp.now())
9. SSL/TLS Certificate Verification: Use 'ssl' module.
import ssl
import socket
hostname = 'example.com'
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
if cert:
print('Certificate is valid')
else:
print('Certificate is invalid')
10. Secure File Deletion: Use 'shutil' module to securely delete files.
Example 1:
import os
import shutil
filename = 'my_secret_file.txt'
os.remove(filename)
shutil.move(filename, os.path.abspath(os.path.join('~', '.Trash')))
Example 2: Delete a file by overwriting its contents before deleting it.
import os
import random
def secure_delete(filename, passes=1):
"""Securely delete a file by overwriting its contents before deleting it.
:param filename: The name of the file to be deleted.
:param passes: The number of times to overwrite the file's contents (default is 1).
"""
try:
# open the file for writing with random data
with open(filename, 'wb') as f:
for i in range(passes):
# write random data to the file
f.write(os.urandom(os.path.getsize(filename)))
f.flush()
os.fsync(f.fileno())
# delete the file
os.remove(filename)
except Exception as e:
print(f"Error deleting file '{filename}': {e}")
Example 3: Securely swipe a harddisk or SD card
import os
import random
import string
def secure_wipe(device_path, passes=3):
"""Securely wipes a hard drive or SD card by overwriting it with random data multiple times.
:param device_path: The path to the device to be wiped (e.g. '/dev/sdb').
:param passes: The number of times to overwrite the device's contents (default is 3).
"""
# calculate the block size of the device
block_size = os.stat(device_path).st_blksize
# generate random data to overwrite the device
random_data = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(block_size))
# open the device for writing with random data
with open(device_path, 'wb') as f:
for i in range(passes):
# write random data to the device
while True:
try:
f.write(random_data.encode())
f.flush()
os.fsync(f.fileno())
break
except OSError:
pass
# sync the device to ensure all data has been written
os.system(f"sync; sync; sync")
# delete the device partition table and create a new one with a single partition
os.system(f"dd if=/dev/zero of={device_path} bs=512 count=1 conv=notrunc")
os.system(f"parted {device_path} mktable gpt")
os.system(f"parted {device_path} mkpart primary ext4 0% 100%")
# sync the device again to ensure the partition table has been written
os.system(f"sync; sync; sync")
11. Automated Vulnerability Scanning: Use 'Nmap', 'PyCurl', 'OpenVAS', 'scapy', and 'socket' for automated vulnerability scanning:
Example 1:
import nmap
nm = nmap.PortScanner()
nm.scan('example.com', arguments='-p 1-65535')
print(nm.all_hosts())
print(nm['example.com'].all_tcp())
Example 2:
import nmap
nm = nmap.PortScanner()
result = nm.scan('127.0.0.1', arguments='-p 22-443')
print('Open ports:', result['scan']['127.0.0.1']['tcp'].keys())
Example 3: Use Scapy to perform automated vulnerability scanning.
from scapy.all import *
import argparse
def scan_vulnerabilities(target_ip, target_port):
"""Scans for vulnerabilities on the target IP and port.
:param target_ip: The IP address of the target to be scanned.
:param target_port: The port number to be scanned.
"""
# craft the packet with the appropriate payload for the vulnerability being scanned
payload = 'GET / HTTP/1.1\r\nHost: {}\r\n\r\n'.format(target_ip)
packet = IP(dst=target_ip)/TCP(dport=target_port)/payload
# send the packet and capture the response
response = sr1(packet, timeout=5, verbose=0)
# analyze the response to determine if a vulnerability exists
if response is None:
print('Port {} is closed on {}'.format(target_port, target_ip))
elif response.haslayer(TCP) and response[TCP].flags == 'SA':
print('Port {} is open on {}, but does not appear to be vulnerable'.format(target_port, target_ip))
elif response.haslayer(TCP) and response[TCP].flags == 'FA':
print('Port {} is open on {} and appears to be vulnerable'.format(target_port, target_ip))
else:
print('Unexpected response on port {} from {}'.format(target_port, target_ip))
if __name__ == '__main__':
# parse the command-line arguments
parser = argparse.ArgumentParser(description='Automated vulnerability scanner using Scapy')
parser.add_argument('target_ip', type=str, help='the IP address of the target to be scanned')
parser.add_argument('target_port', type=int, help='the port number to be scanned')
args = parser.parse_args()
# scan for vulnerabilities on the target IP and port
scan_vulnerabilities(args.target_ip, args.target_port)
12. Firewall Management: Use 'python-iptables' and 'ufw' for firewall management:
import subprocess
ip_address = '191.168.10.4'
#Use ufw to block incoming traffic from a specific IP address:
subprocess.run(['sudo', 'ufw', 'deny', 'from', ip_address])
13. Automated Backup: Use 'rsync', 'scp', and 'paramiko' for automated backups:
import os
import paramiko
hostname = 'example.com'
username = 'user'
password = 'password'
remote_path = '/path/to/remote/folder'
local_path = '/path/to/local/folder'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=hostname, username=username, password=password)
sftp = ssh.open_sftp()
for filename in sftp.listdir(remote_path):
remote_file = os.path.join(remote_path, filename)
local_file = os.path.join(local_path, filename)
sftp.get(remote_file, local_file)
sftp.close()
ssh.close()
14. HTTP Header Security: Use libraries 'Flask', 'http.server', 'Flask-Talisman' and 'django-csp':
Example 1:
from flask import Flask, request, make_response
app = Flask(__name__)
@app.route('/')
def index():
response = make_response('Hello, world!')
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['X-Content-Type-Options'] = 'nosniff'
return response
if __name__ == '__main__':
app.run()
Example 2:
import http.server
import socketserver
class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def end_headers(self):
self.send_header('Content-Security-Policy', "default-src 'self'")
self.send_header('X-XSS-Protection', '1; mode=block')
self.send_header('X-Content-Type-Options', 'nosniff')
super().end_headers()
PORT = 8000
with socketserver.TCPServer(('', PORT), MyHTTPRequestHandler) as httpd:
print('Serving on port', PORT)
httpd.serve_forever()
Example 3:
from flask import Flask
from flask_talisman import Talisman
app = Flask(__name__)
csp = {
'default-src': [
'\'self\'',
'*.example.com'
]
}
talisman = Talisman(app, content_security_policy=csp)
@app.route('/')
def index():
return 'Hello, World!'
if __name__ == '__main__':
app.run()
15. SQL Injection Prevention: Use libraries 'sqlalchemy', 'pymysql', 'django-db-protect', and 'SQLite3':
Example 1:
import sqlite3
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
user_id = 1
cursor.execute('SELECT * FROM users WHERE id=?', (user_id,))
print(cursor.fetchone())
conn.close()
Example 2:
from sqlalchemy import create_engine, text
engine = create_engine('postgresql://user:password@localhost/mydatabase')
sql = text('SELECT * FROM users WHERE id = :id')
result = engine.execute(sql, id=1)
for row in result:
print(row)
Example 3:
import pymysql
connection = pymysql.connect(host='localhost',
user='user',
password='password',
db='database')
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE username = %s AND password = %s"
cursor.execute(sql, ('admin', 'password123'))
result = cursor.fetchone()
print(result)
16. Code Signing: Use 'signing' module:
import signing
code = 'print("Hello, world!")'
signed_code = signing.sign(code, 'my_private_key')
print('Signed code:', signed_code)
verified_code = signing.verify(signed_code, 'my_public_key')
print('Verified code:', verified_code)
17. Password Strength Analysis: Use 'zxcvbn-python' and 'password_strength' library
from password_strength import PasswordPolicy
policy = PasswordPolicy.from_names(length=14, uppercase=2, numbers=4, special=1)
password = 'my_secret_password'
strength = policy.test(password)[0]
print('Password strength:', strength)
18. Man-in-the-Middle (MitM) attacks prevention:
Example 1: Using HTTPS: HTTPS is a secure communication protocol that encrypts communication between a web server and a client. By using HTTPS, you can prevent MitM attacks that attempt to intercept communication between the server and the client. Here's an example of how to use HTTPS in Python using the 'requests' library:
import requests
response = requests.get('https://www.example.com')
print(response.text)
Example 2: Use Certificate pinning, a security mechanism ensuring that the client only accepts a specific certificate for a given website.
import certifi
import requests
response = requests.get('https://www.example.com',
verify=certifi.where(),
cert=('path/to/client/cert', 'path/to/client/key'))
print(response.text)
Example 3: Public Key Pinning, a security mechanism ensuring that the client only accepts a specific public key for a given website.
import ssl
import socket
hostname = 'www.example.com'
port = 443
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations('path/to/cert/file')
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as sslsock:
print(sslsock.version())
Example 4: Public-Key Cryptography uses a pair of keys (public and private) to encrypt and decrypt data
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
# generate a public/private key pair
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
public_key = private_key.public_key()
# encrypt a message using the public key
message = b'This is a secret message'
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# decrypt the message using the private key
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
print(plaintext.decode('utf-8'))
Example 5: Use TLS 1.3 protocol
import socket
import ssl
# create a socket and connect to the server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('example.com', 443))
# wrap the socket with TLS 1.3
context = ssl.create_default_context()
context.set_ciphers('TLS_AES_128_GCM_SHA256:TLS_AES_256_GCM_SHA384')
context.set_alpn_protocols(['h2', 'http/1.1'])
context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
ssl_sock = context.wrap_socket(sock, server_hostname='example.com', ssl_version=ssl.PROTOCOL_TLSv1_3)
# send and receive data over the secure connection
ssl_sock.send(b'Hello, World!')
data = ssl_sock.recv(1024)
print(data)
# close the socket
ssl_sock.close()
19. Preventions for Denial of Service (DoS), and packet sniffing: Use 'scapy', 'pypcap', and 'pcapy' libraries:
Example 1: Use scapy to detect network anomalies:
from scapy.all import *
def detect_anomalies(packet):
if packet.haslayer(TCP):
if packet[TCP].flags & 0x03 == 0x03:
print('SYN-ACK packet detected:', packet.summary())
sniff(prn=detect_anomalies)
Example 2: Increase the size of the connection queue and decrease the timeout on open connections:
import socket
# Increase the size of the connection queue
backlog = 100
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 8080))
sock.listen(backlog)
# Decrease the timeout on open connections
timeout = 10 # in seconds
sock.settimeout(timeout)
Example 3: Limit the number of requests from a single IP address within a certain time frame:
from flask import Flask, request, abort
import time
app = Flask(__name__)
# dictionary to store IP addresses and their request timestamps
ip_dict = {}
# maximum number of requests allowed within the time frame
max_requests = 10
# time frame for the maximum number of requests (in seconds)
time_frame = 60
@app.route('/')
def index():
# get the IP address of the request
ip_address = request.remote_addr
# get the current timestamp
current_time = int(time.time())
# check if the IP address is already in the dictionary
if ip_address in ip_dict:
# get the last timestamp for the IP address
last_time = ip_dict[ip_address]
# calculate the difference between the current time and the last timestamp
time_diff = current_time - last_time
# check if the time difference is within the time frame
if time_diff <= time_frame:
# increment the number of requests for the IP address
ip_dict[ip_address] += 1
# check if the number of requests exceeds the maximum allowed
if ip_dict[ip_address] > max_requests:
# deny the request and return a 429 status code (Too Many Requests)
abort(429)
else:
# reset the number of requests and update the last timestamp
ip_dict[ip_address] = 1
ip_dict[ip_address] = current_time
else:
# add the IP address to the dictionary with the current timestamp
ip_dict[ip_address] = current_time
return 'Hello, World!'
Example 4: Implement SYN cookies to validate legitimate connection requests and mitigate the effects of a flood attack:
import socket
def validate_connection_request():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_SYNCOOKIES, 1)
s.bind(('0.0.0.0', 1234))
s.listen(10)
while True:
conn, addr = s.accept()
print('Connection request from:', addr)
# Validate the connection request
if validate_request(conn):
# Process the connection request
process_request(conn)
else:
# Drop the connection request
conn.close()
def validate_request(conn):
# Check if the SYN cookie option is set in the TCP header
options = conn.getsockopt(socket.IPPROTO_TCP, socket.TCP_OPTIONS)
for opt in options:
if isinstance(opt, tuple) and opt[0] == socket.TCP_SYNCOOKIES:
return True
return False
def process_request(conn):
# Handle the connection request
pass
if __name__ == '__main__':
validate_connection_request()
Example 5: Prevent Teardrop Attack in Python by implementing a packet filter using the scapy library.
from scapy.all import *
def filter_func(packet):
# check if the packet is an IP fragment
if packet.haslayer(IP) and packet[IP].frag > 0:
# check if the IP fragments overlap
if packet[IP].fragoff & 0x1:
# drop the packet
return False
# accept the packet
return True
# start sniffing packets
sniff(filter=filter_func)
Example 6: Prevent Smurf Attack in Python by implementing a packet filter
from scapy.all import *
def filter_func(packet):
# check if the packet is an ICMP echo request
if packet.haslayer(ICMP) and packet[ICMP].type == 8:
# check if the destination address is a broadcast address
if packet[IP].dst == '255.255.255.255' or packet[IP].dst.startswith('224.0.0.'):
# drop the packet
return False
# accept the packet
return True
# start sniffing packets
sniff(filter=filter_func)
Example 7: Prevent Ping of Death Attack
import socket
# create a socket and bind it to a local address
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 1234))
# set the maximum packet size to 64 kilobytes
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# start receiving packets
while True:
data, addr = sock.recvfrom(1024)
# handle the received data
Example 8: Anti-spoofing
from scapy.all import *
# define the anti-spoofing filter function
def anti_spoofing_filter(packet):
# check if the source IP address is a private IP address range
if packet[IP].src.startswith("10.") or packet[IP].src.startswith("192.168.") or packet[IP].src.startswith("172.16."):
# check if the source IP address matches the interface address
if packet[IP].src != packet.sniffed_on:
# block the packet
return False
# accept the packet
return True
# start sniffing packets with the anti-spoofing filter
sniff(filter=anti_spoofing_filter)
Example 9: Use socket to block traffic from known botnet IPs
import socket
# read a list of known botnet IPs from a file
with open('botnet_ips.txt', 'r') as f:
botnet_ips = f.readlines()
# strip whitespace and comments from the IPs
botnet_ips = [ip.strip().split('#')[0] for ip in botnet_ips]
# create a socket and bind it to a port
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('localhost', 12345))
while True:
# receive data from the socket
data, addr = s.recvfrom(1024)
# check if the source IP is in the botnet list
if addr[0] in botnet_ips:
# drop the packet
continue
# process the packet normally
# ...
Example 10: Rate Limiting: Use 'flask-limiter' and 'django-ratelimit' libraries.
from flask import Flask, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(app, key_func=get_remote_address)
@app.route('/api/')
@limiter.limit('10 per minute')
def api():
return jsonify({'status': 'success'})
if __name__ == '__main__':
app.run()
20. Email Security: Use 'smtplib', 'email', and 'dkimpy' library:
Example 1:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dkim import DKIM, sign
msg = MIMEMultipart()
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg['Subject'] = 'Test email'
body = 'This is a test email'
msg.attach(MIMEText(body, 'plain'))
dkim = DKIM('my_private_key.pem', 'example.com', 'default')
signed_message = sign(msg.as_string().encode(), 'example.com', 'default', dkim)
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login('sender@gmail.com', 'password')
server.sendmail('sender@example.com', 'recipient@example.com', signed_message.decode())
server.quit()
Example 2: Use email and imaplib to filter out suspicious emails
import email
import imaplib
# login to the email server
mail = imaplib.IMAP4_SSL('mail.example.com')
mail.login('username', 'password')
# select the inbox folder
mail.select('inbox')
# search for emails
status, messages = mail.search(None, 'ALL')
for msg_id in messages[0].split():
# fetch the email content
status, msg_data = mail.fetch(msg_id, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
# check if the email is suspicious
if msg['From'] not in ['known_sender@example.com', 'trusted_sender@example.com']:
# move the email to the spam folder
mail.store(msg_id, '+X-GM-LABELS', '\\Spam')
Example 3: Email with TLS 1.3 using the smtplib library
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# create a message
msg = MIMEMultipart()
msg['From'] = 'sender@example.com'
msg['To'] = 'recipient@example.com'
msg['Subject'] = 'Test email'
body = 'This is a test email.'
msg.attach(MIMEText(body, 'plain'))
# create a secure SMTP connection
smtp_server = 'smtp.example.com'
port = 587
context = ssl.create_default_context()
context.set_ciphers('TLS_AES_256_GCM_SHA384')
with smtplib.SMTP(smtp_server, port) as smtp:
smtp.starttls(context=context)
# login to the SMTP server
username = 'sender@example.com'
password = 'yourpassword'
smtp.login(username, password)
# send the message
smtp.sendmail(msg['From'], msg['To'], msg.as_string())
print('Email sent successfully')
21. Web Application Firewall (WAF): Use 'http.server' module to create a simple WAF:
import http.server
import socketserver
class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if 'admin' in self.path:
self.send_response(403)
self.end_headers()
self.wfile.write(b'Access denied')
else:
super().do_GET()
PORT = 8000
with socketserver.TCPServer(('', PORT), MyHTTPRequestHandler) as httpd:
print('Serving on port', PORT)
httpd.serve_forever()
22. Brute-Force Protection: Use Libraries like 'ratelimiter' 'throttle', 'time' and 'threading'
Example 1:
from ratelimiter import RateLimiter
@RateLimiter(max_calls=5, period=1)
def make_request():
print('Request made')
for i in range(10):
make_request()
Example 2: Use time and threading modules
import time
import threading
# define a dictionary of usernames and passwords
credentials = {'user1': 'password1', 'user2': 'password2', 'user3': 'password3'}
# define a dictionary to store the number of failed attempts per user
failed_attempts = {}
# define a function to check credentials and enforce a lockout period
def check_credentials(username, password):
# check if the user is locked out
if username in failed_attempts and time.time() - failed_attempts[username] < 60:
print(f'{username} is locked out for {60 - (time.time() - failed_attempts[username])} seconds')
return False
# check if the username and password are correct
if username in credentials and credentials[username] == password:
print(f'{username} logged in successfully')
return True
else:
# record the failed attempt
if username in failed_attempts:
failed_attempts[username] += 1
else:
failed_attempts[username] = 1
print(f'Invalid login attempt for {username}')
return False
# define a function to run a brute-force attack
def brute_force(username):
for password in ['password1', 'password2', 'password3']:
if check_credentials(username, password):
break
time.sleep(1)
# run the brute-force attacks in parallel using threading
threads = []
for username in credentials:
thread = threading.Thread(target=brute_force, args=(username,))
threads.append(thread)
thread.start()
# wait for all threads to finish
for thread in threads:
thread.join()
23. Cross-Site Scripting (XSS) Prevention: Use 'cgi.escape' function to escape HTML characters:
Example 1:
import cgi
input_text = '<script>alert("XSS")</script>'
escaped_text = cgi.escape(input_text, quote=True)
print('Escaped text:', escaped_text)
Example 2:
import html
# define a function to sanitize user input
def sanitize_input(user_input):
return html.escape(user_input)
# example usage
user_input = '<script>alert("XSS attack!")</script>'
sanitized_input = sanitize_input(user_input)
print(sanitized_input)
24. File Permissions: Use 'os.chmod' function to set file, director permissions:
Example 1: Set file permission
import os
file_path = '/path/to/file'
os.chmod(file_path, 0o600) # set file permissions to 0600 (owner read/write)
Example 2: Set the directory permission
import os
# define the directory path and permissions
dir_path = '/path/to/directory'
permissions = 0o700
# set the directory permissions
os.chmod(dir_path, permissions)
25. Content Security Policy (CSP): Use 'http.server' module to add CSP headers:
Example 1: Use 'http.server' module
import http.server
import socketserver
class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def end_headers(self):
self.send_header('Content-Security-Policy', "default-src 'self'")
super().end_headers()
PORT = 8000
with socketserver.TCPServer(('', PORT), MyHTTPRequestHandler) as httpd:
print('Serving on port', PORT)
httpd.serve_forever()
Example 2: Check CSP of a website using Python and the requests module to send HTTP requests and the beautifulsoup4 module to parse the HTML response.
import requests
from bs4 import BeautifulSoup
# specify the website URL to check
url = 'https://example.com'
# send an HTTP GET request and get the response
response = requests.get(url)
Example 3: Set the Content-Security-Policy header in the HTTP response using the flask module.
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/')
def index():
# define the Content Security Policy
csp = "default-src 'self'; script-src 'self' 'unsafe-inline'"
# set the Content-Security-Policy header in the response
response.headers['Content-Security-Policy'] = csp
return render_template('index.html')
Example 4:
from flask import Flask
from flask_talisman import Talisman
app = Flask(__name__)
csp = {
'default-src': [
'\'self\'',
'https://*.example.com'
]
}
Talisman(app, content_security_policy=csp)
@app.route('/')
def index():
return 'Hello, World!'
if __name__ == '__main__':
app.run()
26. Server Hardening: Automate server hardening tasks using 'subprocess' to disable Telnet on Linux:
import subprocess
subprocess.run(['systemctl', 'disable', 'telnet'], check=True)
27. Enable SELinux on all hosts from a text file hostlist.txt
import os
# read the list of hosts from a file
with open('hostlist.txt', 'r') as f:
hosts = f.readlines()
# iterate over each host and enable SELinux
for host in hosts:
host = host.strip() # remove any trailing whitespace
print(f"Enabling SELinux on host {host}")
os.system(f"ssh {host} 'sed -i \"s/^SELINUX=.*/SELINUX=enforcing/g\" /etc/selinux/config'")
os.system(f"ssh {host} 'setenforce 1'")
print(f"SELinux has been enabled on host {host}")
28. SSL/TLS Encryption: Use 'pyOpenSSL' and 'ssl' for SSL/TLS encryption.
import socket, ssl
HOST = 'localhost'
PORT = 443
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile='cert.pem', keyfile='key.pem')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock:
sock.bind((HOST, PORT))
sock.listen()
with context.wrap_socket(sock, server_side=True) as ssock:
conn, addr = ssock.accept()
with conn:
print('Connected by', addr)
data = conn.recv(1024)
conn.sendall(data)
29. Capture TCP/UDP traffic:
Example 1: Capture TCP traffic and print the source and destination IP addresses for each TCP packet
import pyshark
# create a capture object to listen for TCP traffic
capture = pyshark.LiveCapture(interface='eth0', bpf_filter='tcp')
# start the capture
capture.sniff(timeout=10) # capture for 10 seconds
# iterate over each packet in the capture and print the source and destination IP addresses
for packet in capture:
if 'TCP' in packet:
print(f"Source IP: {packet.ip.src}, Destination IP: {packet.ip.dst}")
Example 2: Alert suspicions when a packet is sent to a known malicious IP address
import pyshark
# define the known malicious IP address
malicious_ip = "10.0.0.1"
# create a capture object to listen for network traffic
capture = pyshark.FileCapture('/path/to/capture.pcap', display_filter='ip')
# iterate over each packet in the capture and check for suspicious traffic
for packet in capture:
if packet.ip.dst == malicious_ip:
print(f"Suspicious traffic detected: packet sent to known malicious IP address {malicious_ip} from {packet.ip.src}")
# add your alert logic here, such as sending an email or triggering a script
Example 3: Use PyCurl to perform automated vulnerability scanning.
import pycurl
import argparse
def scan_vulnerabilities(target_url, payload):
"""Scans for vulnerabilities on the target URL using the specified payload.
:param target_url: The URL of the target to be scanned.
:param payload: The payload to be used in the vulnerability scan.
"""
# initialize the PyCurl object and set the target URL and payload
curl = pycurl.Curl()
curl.setopt(curl.URL, target_url)
curl.setopt(curl.POSTFIELDS, payload)
# configure the PyCurl object to capture the response
response = BytesIO()
curl.setopt(curl.WRITEFUNCTION, response.write)
# send the request and capture the response code
curl.perform()
response_code = curl.getinfo(pycurl.HTTP_CODE)
# analyze the response to determine if a vulnerability exists
if response_code == 200:
if 'error' in response.getvalue().decode('utf-8').lower():
print('Vulnerability detected on {}'.format(target_url))
else:
print('No vulnerability detected on {}'.format(target_url))
else:
print('Unexpected response code {} on {}'.format(response_code, target_url))
# cleanup the PyCurl object
curl.close()
if __name__ == '__main__':
# parse the command-line arguments
parser = argparse.ArgumentParser(description='Automated vulnerability scanner using PyCurl')
parser.add_argument('target_url', type=str, help='the URL of the target to be scanned')
parser.add_argument('payload', type=str, help='the payload to be used in the vulnerability scan')
args = parser.parse_args()
# scan for vulnerabilities on the target URL using the specified payload
scan_vulnerabilities(args.target_url, args.payload)
30. Escape User Input: Use 'cgi.escape' to escape user input:
import cgi
user_input = '<script>alert("Hello, World!")</script>'
escaped_input = cgi.escape(user_input)
print(escaped_input) # Output: <script>alert("Hello, World!")</script>
31. Checks the digital signature of an image: Use cryptography library.
Example 1: Check digital signature of an PNG image
import io
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, utils
from cryptography.x509 import load_pem_x509_certificate
def verify_signature(image_path, signature_path, public_key_path):
# Load the image data
with open(image_path, 'rb') as f:
image_data = f.read()
# Load the signature data
with open(signature_path, 'rb') as f:
signature_data = f.read()
# Load the public key
with open(public_key_path, 'rb') as f:
public_key_data = f.read()
public_key = serialization.load_pem_public_key(public_key_data)
# Load the certificate
with open(public_key_path, 'rb') as f:
cert_data = f.read()
cert = load_pem_x509_certificate(cert_data)
# Verify the signature
hasher = hashes.Hash(hashes.SHA256())
hasher.update(image_data)
try:
utils.verify(public_key, signature_data, hasher.finalize(), padding.PKCS1v15())
print("Signature verification successful!")
except:
print("Signature verification failed.")
if __name__ == '__main__':
verify_signature('image.png', 'image.png.signature', 'public_key.pem')
Example 2: Check digital signature of an ISO image
import subprocess
# Specify the ISO file and the detached signature file
iso_file = 'my_image.iso'
sig_file = 'my_image.iso.sig'
# Verify the digital signature using gpg
gpg_cmd = ['gpg', '--verify', sig_file, iso_file]
try:
output = subprocess.check_output(gpg_cmd, stderr=subprocess.STDOUT)
print("Digital signature is valid.")
print(output.decode('utf-8'))
except subprocess.CalledProcessError as e:
print("Digital signature is NOT valid.")
print(e.output.decode('utf-8'))
Example 3: Sign an ISO file using gpg, and then verifies the digital signature.
import subprocess
# Specify the ISO file and the passphrase for the signing key
iso_file = 'my_image.iso'
passphrase = 'my_passphrase'
# Sign the ISO file using gpg
gpg_cmd = ['gpg', '--batch', '--passphrase', passphrase, '--detach-sign', iso_file]
subprocess.check_call(gpg_cmd)
# Verify the digital signature using gpg
sig_file = iso_file + '.sig'
gpg_cmd = ['gpg', '--verify', sig_file, iso_file]
try:
output = subprocess.check_output(gpg_cmd, stderr=subprocess.STDOUT)
print("Digital signature is valid.")
print(output.decode('utf-8'))
except subprocess.CalledProcessError as e:
print("Digital signature is NOT valid.")
print(e.output.decode('utf-8'))
32. Create an random crypto key:
Example 1: Create a random 512-bit key and save it to a file named crypto_512.key
import os
# Generate a random 512-bit key
key = os.urandom(64)
# Write the key to a file
with open('crypto.key', 'wb') as f:
f.write(key)
Example 2: Create a random 1028-bit key and save it to a file named crypto_1028.key
import os
# Generate a random 1028-bit key
key = os.urandom(129)
# Write the key to a file
with open('crypto_1028.key', 'wb') as f:
f.write(key)
Example 3: Create a random AES-256 key and save it to a file named crypto_aes.key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
# Generate a random 256-bit key
key = os.urandom(32)
# Write the key to a file
with open('crypto-aes.key', 'wb') as f:
f.write(key)
33. Access Control: Use libraries `flask-principal` and `django-guardian`
from flask_principal import Principal, Identity, AnonymousIdentity, Permission, RoleNeed
app = Flask(__name__)
principal = Principal(app)
admin_permission = Permission(RoleNeed('admin'))
@app.route('/admin/')
@admin_permission.require()
def admin():
return 'Hello, Admin!'
if __name__ == '__main__':
app.run()
34. Removes access for a Microsoft Azure user who has not been active for 6 months:
import adal
import datetime
import requests
import json
# Set Azure AD credentials
tenant_id = 'your_tenant_id'
client_id = 'your_client_id'
client_secret = 'your_client_secret'
# Set Azure AD endpoint and resource
authority_url = 'https://login.microsoftonline.com/' + tenant_id
resource_url = 'https://graph.microsoft.com'
# Create an ADAL authentication context
context = adal.AuthenticationContext(authority_url)
# Get an access token using client credentials
token = context.acquire_token_with_client_credentials(resource_url, client_id, client_secret)
# Set the API endpoint and query parameters
api_endpoint = 'https://graph.microsoft.com/v1.0/users'
query_params = {
'$select': 'id,userPrincipalName,lastActivityDateTime',
'$filter': "accountEnabled eq true and userType eq 'Member' and signInActivity/lastSignInDateTime lt " +
f"{datetime.datetime.utcnow() - datetime.timedelta(days=180)}"
}
# Send the API request to get a list of users
response = requests.get(api_endpoint, params=query_params, headers={'Authorization': 'Bearer ' + token['accessToken']})
# Check for errors in the response
if response.status_code != 200:
raise Exception('API request failed with status code ' + str(response.status_code))
# Parse the response JSON to get the list of users
users = json.loads(response.text)['value']
# Loop through the list of users and remove their access
for user in users:
# Get the user's ID and user principal name
user_id = user['id']
user_principal_name = user['userPrincipalName']
# Set the API endpoint to remove the user's access
api_endpoint = f"https://graph.microsoft.com/v1.0/users/{user_id}/revokeSignInSessions"
# Send the API request to remove the user's access
response = requests.post(api_endpoint, headers={'Authorization': 'Bearer ' + token['accessToken']})
# Check for errors in the response
if response.status_code != 204:
raise Exception(f"Failed to remove access for user {user_principal_name} with status code " + str(response.status_code))
# Print a message to indicate that the user's access has been removed
print(f"Access removed for user {user_principal_name}")
36. Send email notifications to Microsoft Azure users who have no activity within 6 months:
import requests
import json
import datetime
import smtplib
from email.mime.text import MIMEText
# set the Microsoft Graph API endpoint and parameters
graph_endpoint = "https://graph.microsoft.com/v1.0/users"
query_params = {'$select': 'displayName,mail,userPrincipalName,lastPasswordChangeDateTime', '$filter': f'accountEnabled eq true and userType eq "Member" and (lastPasswordChangeDateTime le {datetime.datetime.utcnow() - datetime.timedelta(days=180)})'}
# set the Microsoft Graph API authorization token
access_token = "<your_access_token_here>"
# set the email server settings
smtp_server = "<your_smtp_server>"
smtp_port = "<your_smtp_port>"
smtp_username = "<your_smtp_username>"
smtp_password = "<your_smtp_password>"
sender_email = "<your_sender_email>"
# make a request to the Microsoft Graph API to get a list of inactive users
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(graph_endpoint, headers=headers, params=query_params)
# iterate over each user in the response and send an email notification
for user in response.json()['value']:
display_name = user['displayName']
email = user['mail']
last_password_change_date = datetime.datetime.strptime(user['lastPasswordChangeDateTime'], '%Y-%m-%dT%H:%M:%S.%fZ')
days_since_last_password_change = (datetime.datetime.utcnow() - last_password_change_date).days
# check if the user has not logged in for 6 months
if days_since_last_password_change >= 180:
# compose the email message
message = f"Dear {display_name},\n\nThis is a notification to inform you that your Microsoft Azure account associated with the email address {email} has not been used for the past 6 months.\n\nPlease log in to your account to ensure it remains active.\n\nThank you,\nMicrosoft Azure Team"
mime_message = MIMEText(message)
# send the email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_username, smtp_password)
server.sendmail(sender_email, email, mime_message.as_string())
print(f"Email notification sent to {email} successfully.")
Comments
Post a Comment