Sistema de Detección de Intrusiones

Descripción
Este sistema de detección de intrusiones utiliza algoritmos avanzados de aprendizaje automático implementados en Python para identificar y responder a amenazas de seguridad en tiempo real.
A diferencia de los sistemas tradicionales basados en reglas, este enfoque adaptativo utiliza bibliotecas como scikit-learn, TensorFlow y PyTorch para aprender continuamente de nuevos patrones de ataque, mejorando su capacidad para detectar amenazas desconocidas.
La arquitectura del sistema está diseñada para procesar grandes volúmenes de tráfico de red utilizando técnicas de procesamiento distribuido con Dask y Apache Spark. Además, implementé un sistema de análisis de comportamiento que establece líneas base de actividad normal y detecta desviaciones que podrían indicar compromisos de seguridad.
Tecnologías Utilizadas
Python
Lenguaje principal para el desarrollo del backend y algoritmos de ML
TensorFlow
Framework de aprendizaje automático para la detección de patrones anómalos
Pandas
Análisis y manipulación de datos para el entrenamiento de modelos
Scapy
Manipulación de paquetes de red para análisis profundo
Elasticsearch
Almacenamiento y búsqueda de eventos de seguridad
Kibana
Visualización y análisis de datos de seguridad
Desafíos
Falsos positivos
Uno de los mayores desafíos fue reducir la tasa de falsos positivos que generaban alertas innecesarias y fatiga en los equipos de seguridad.
Procesamiento en tiempo real
El sistema necesitaba analizar grandes volúmenes de tráfico de red sin introducir latencia perceptible.
Adaptación a nuevas amenazas
Los atacantes evolucionan constantemente sus técnicas, por lo que el sistema debía ser capaz de adaptarse a amenazas previamente desconocidas.
Soluciones
Aprendizaje semi-supervisado
Implementé un enfoque de aprendizaje semi-supervisado en Python que combina reglas definidas por expertos con detección de anomalías basada en ML.
Arquitectura distribuida
Diseñé una arquitectura de procesamiento distribuido con Dask que permite escalar horizontalmente según la carga de tráfico.
Actualización continua
El sistema incorpora un mecanismo de retroalimentación implementado con Python que permite a los analistas de seguridad marcar falsos positivos, mejorando continuamente la precisión del modelo.
Galería

Dashboard principal mostrando alertas en tiempo real

Análisis detallado de un intento de intrusión detectado

Interfaz de configuración de sensibilidad y parámetros
Ejemplo de Código
import scapy.all as scapy
import argparse
import time
import logging
from threading import Thread
from collections import defaultdict
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='ids.log'
)
logger = logging.getLogger('IntrusionDetectionSystem')
class PacketSniffer:
"""
A network packet sniffer that captures and analyzes network traffic.
"""
def __init__(self, interface=None):
"""
Initialize the packet sniffer.
Args:
interface: Network interface to sniff on (None for auto-detect)
"""
self.interface = interface
self.stop_sniffing = False
self.packet_count = 0
self.packet_stats = defaultdict(int)
self.connections = defaultdict(list)
self.anomaly_detector = None
def _packet_callback(self, packet):
"""
Process each captured packet.
Args:
packet: The captured packet
"""
self.packet_count += 1
# Extract packet information
if packet.haslayer(scapy.IP):
src_ip = packet[scapy.IP].src
dst_ip = packet[scapy.IP].dst
proto = packet[scapy.IP].proto
# Update statistics
self.packet_stats[src_ip] += 1
# Track connections
conn_key = f"{src_ip}:{dst_ip}:{proto}"
timestamp = time.time()
self.connections[conn_key].append(timestamp)
# Log packet info
if self.packet_count % 100 == 0:
logger.debug(f"Captured {self.packet_count} packets")
# Check for anomalies periodically
if self.packet_count % 1000 == 0:
self._check_for_anomalies()
def _check_for_anomalies(self):
"""
Check for anomalies in the captured traffic.
"""
# Prepare features for anomaly detection
features = []
# Calculate features for each source IP
for ip, count in self.packet_stats.items():
# Calculate connection rate (connections per second)
conn_count = sum(1 for key in self.connections if key.startswith(ip))
# Calculate packet rate
packet_rate = count / max(1, (time.time() - self.start_time))
# Calculate unique destinations
unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
features.append([packet_rate, conn_count, unique_dests])
if not features:
return
# Convert to numpy array
X = np.array(features)
# If we have an anomaly detector, use it
if self.anomaly_detector is not None:
# Predict anomalies (-1 for anomalies, 1 for normal)
predictions = self.anomaly_detector.predict(X)
# Get anomaly scores
scores = self.anomaly_detector.decision_function(X)
# Log anomalies
for i, (pred, score) in enumerate(zip(predictions, scores)):
if pred == -1: # Anomaly detected
ip = list(self.packet_stats.keys())[i]
logger.warning(f"Anomaly detected from IP {ip} with score {score:.4f}")
# Additional analysis for the anomalous IP
self._analyze_anomalous_ip(ip)
def _analyze_anomalous_ip(self, ip):
"""
Perform detailed analysis on traffic from an anomalous IP.
Args:
ip: The IP address to analyze
"""
# Get all connections from this IP
ip_connections = [k for k in self.connections.keys() if k.startswith(ip)]
# Analyze connection patterns
if len(ip_connections) > 20:
# Check for port scanning (many different destination ports)
dest_ports = set()
for conn in ip_connections:
parts = conn.split(':')
if len(parts) >= 3:
dest_ports.add(parts[2])
if len(dest_ports) > 15:
logger.warning(f"Possible port scan detected from {ip} - {len(dest_ports)} different ports")
# Check for DoS attack (high packet rate)
packet_count = self.packet_stats[ip]
duration = time.time() - self.start_time
packet_rate = packet_count / duration
if packet_rate > 100: # Threshold for high packet rate
logger.warning(f"Possible DoS attack from {ip} - {packet_rate:.2f} packets/sec")
def train_anomaly_detector(self, duration=300):
"""
Train the anomaly detector on normal traffic.
Args:
duration: Duration in seconds to capture normal traffic for training
"""
logger.info(f"Training anomaly detector on normal traffic for {duration} seconds...")
# Start sniffing in a separate thread
self.start_time = time.time()
sniff_thread = Thread(target=self._sniff, args=(duration,))
sniff_thread.start()
sniff_thread.join()
# Prepare training data
features = []
for ip, count in self.packet_stats.items():
conn_count = sum(1 for key in self.connections if key.startswith(ip))
packet_rate = count / max(1, (time.time() - self.start_time))
unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
features.append([packet_rate, conn_count, unique_dests])
if not features:
logger.warning("No packets captured during training period")
return
# Train the anomaly detector
X = np.array(features)
self.anomaly_detector = IsolationForest(contamination=0.05, random_state=42)
self.anomaly_detector.fit(X)
logger.info("Anomaly detector trained successfully")
# Reset statistics
self.packet_count = 0
self.packet_stats = defaultdict(int)
self.connections = defaultdict(list)
def _sniff(self, duration=None):
"""
Start sniffing packets.
Args:
duration: Duration in seconds to sniff (None for indefinite)
"""
self.stop_sniffing = False
self.start_time = time.time()
# Set timeout if duration is specified
timeout = None
if duration:
timeout = duration
# Start sniffing
scapy.sniff(
iface=self.interface,
prn=self._packet_callback,
stop_filter=lambda _: self.stop_sniffing,
timeout=timeout
)
def start(self):
"""
Start the packet sniffer in a separate thread.
"""
logger.info(f"Starting packet sniffer on interface {self.interface or 'default'}")
# Reset statistics
self.packet_count = 0
self.packet_stats = defaultdict(int)
self.connections = defaultdict(int)
self.start_time = time.time()
# Start sniffing in a separate thread
self.sniff_thread = Thread(target=self._sniff)
self.sniff_thread.daemon = True
self.sniff_thread.start()
def stop(self):
"""
Stop the packet sniffer.
"""
logger.info("Stopping packet sniffer")
self.stop_sniffing = True
if hasattr(self, 'sniff_thread') and self.sniff_thread.is_alive():
self.sniff_thread.join(timeout=2)
logger.info(f"Captured {self.packet_count} packets in total")
# Example usage
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Network Intrusion Detection System")
parser.add_argument("-i", "--interface", help="Network interface to sniff on")
parser.add_argument("-t", "--train", action="store_true", help="Train the anomaly detector")
parser.add_argument("-d", "--duration", type=int, default=300, help="Training duration in seconds")
args = parser.parse_args()
sniffer = PacketSniffer(interface=args.interface)
try:
if args.train:
sniffer.train_anomaly_detector(duration=args.duration)
sniffer.start()
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
print("
Stopping sniffer...")
sniffer.stop()
print("Sniffer stopped")
Habilidades Python
¿Interesado en un proyecto similar?
Si estás buscando implementar una solución en Python o tienes un proyecto en mente que requiera habilidades de desarrollo similares, no dudes en contactarme para discutir cómo puedo ayudarte.
Contactar