Volver a proyectos
Ciberseguridad
1 min de lectura

Sistema de Detección de Intrusiones

Por Jeferson Heredia
13 de mayo de 2025
Python
Machine Learning
Ciberseguridad
Análisis de Red
Sistema de Detección de Intrusiones

Descripción

Este sistema de detección de intrusiones utiliza algoritmos avanzados de aprendizaje automático implementados en Python para identificar y responder a amenazas de seguridad en tiempo real.

A diferencia de los sistemas tradicionales basados en reglas, este enfoque adaptativo utiliza bibliotecas como scikit-learn, TensorFlow y PyTorch para aprender continuamente de nuevos patrones de ataque, mejorando su capacidad para detectar amenazas desconocidas.

La arquitectura del sistema está diseñada para procesar grandes volúmenes de tráfico de red utilizando técnicas de procesamiento distribuido con Dask y Apache Spark. Además, implementé un sistema de análisis de comportamiento que establece líneas base de actividad normal y detecta desviaciones que podrían indicar compromisos de seguridad.

Tecnologías Utilizadas

Python

Lenguaje principal para el desarrollo del backend y algoritmos de ML

TensorFlow

Framework de aprendizaje automático para la detección de patrones anómalos

Pandas

Análisis y manipulación de datos para el entrenamiento de modelos

Scapy

Manipulación de paquetes de red para análisis profundo

Elasticsearch

Almacenamiento y búsqueda de eventos de seguridad

Kibana

Visualización y análisis de datos de seguridad

Desafíos

Falsos positivos

Uno de los mayores desafíos fue reducir la tasa de falsos positivos que generaban alertas innecesarias y fatiga en los equipos de seguridad.

Procesamiento en tiempo real

El sistema necesitaba analizar grandes volúmenes de tráfico de red sin introducir latencia perceptible.

Adaptación a nuevas amenazas

Los atacantes evolucionan constantemente sus técnicas, por lo que el sistema debía ser capaz de adaptarse a amenazas previamente desconocidas.

Soluciones

Aprendizaje semi-supervisado

Implementé un enfoque de aprendizaje semi-supervisado en Python que combina reglas definidas por expertos con detección de anomalías basada en ML.

Arquitectura distribuida

Diseñé una arquitectura de procesamiento distribuido con Dask que permite escalar horizontalmente según la carga de tráfico.

Actualización continua

El sistema incorpora un mecanismo de retroalimentación implementado con Python que permite a los analistas de seguridad marcar falsos positivos, mejorando continuamente la precisión del modelo.

Galería

Dashboard principal mostrando alertas en tiempo real

Dashboard principal mostrando alertas en tiempo real

Análisis detallado de un intento de intrusión detectado

Análisis detallado de un intento de intrusión detectado

Interfaz de configuración de sensibilidad y parámetros

Interfaz de configuración de sensibilidad y parámetros

Ejemplo de Código

user_manager.py

import scapy.all as scapy
import argparse
import time
import logging
from threading import Thread
from collections import defaultdict
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='ids.log'
)
logger = logging.getLogger('IntrusionDetectionSystem')

class PacketSniffer:
    """
    A network packet sniffer that captures and analyzes network traffic.
    """
    
    def __init__(self, interface=None):
        """
        Initialize the packet sniffer.
        
        Args:
            interface: Network interface to sniff on (None for auto-detect)
        """
        self.interface = interface
        self.stop_sniffing = False
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(list)
        self.anomaly_detector = None
        
    def _packet_callback(self, packet):
        """
        Process each captured packet.
        
        Args:
            packet: The captured packet
        """
        self.packet_count += 1
        
        # Extract packet information
        if packet.haslayer(scapy.IP):
            src_ip = packet[scapy.IP].src
            dst_ip = packet[scapy.IP].dst
            proto = packet[scapy.IP].proto
            
            # Update statistics
            self.packet_stats[src_ip] += 1
            
            # Track connections
            conn_key = f"{src_ip}:{dst_ip}:{proto}"
            timestamp = time.time()
            self.connections[conn_key].append(timestamp)
            
            # Log packet info
            if self.packet_count % 100 == 0:
                logger.debug(f"Captured {self.packet_count} packets")
                
            # Check for anomalies periodically
            if self.packet_count % 1000 == 0:
                self._check_for_anomalies()
                
    def _check_for_anomalies(self):
        """
        Check for anomalies in the captured traffic.
        """
        # Prepare features for anomaly detection
        features = []
        
        # Calculate features for each source IP
        for ip, count in self.packet_stats.items():
            # Calculate connection rate (connections per second)
            conn_count = sum(1 for key in self.connections if key.startswith(ip))
            
            # Calculate packet rate
            packet_rate = count / max(1, (time.time() - self.start_time))
            
            # Calculate unique destinations
            unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
            
            features.append([packet_rate, conn_count, unique_dests])
            
        if not features:
            return
            
        # Convert to numpy array
        X = np.array(features)
        
        # If we have an anomaly detector, use it
        if self.anomaly_detector is not None:
            # Predict anomalies (-1 for anomalies, 1 for normal)
            predictions = self.anomaly_detector.predict(X)
            
            # Get anomaly scores
            scores = self.anomaly_detector.decision_function(X)
            
            # Log anomalies
            for i, (pred, score) in enumerate(zip(predictions, scores)):
                if pred == -1:  # Anomaly detected
                    ip = list(self.packet_stats.keys())[i]
                    logger.warning(f"Anomaly detected from IP {ip} with score {score:.4f}")
                    
                    # Additional analysis for the anomalous IP
                    self._analyze_anomalous_ip(ip)
        
    def _analyze_anomalous_ip(self, ip):
        """
        Perform detailed analysis on traffic from an anomalous IP.
        
        Args:
            ip: The IP address to analyze
        """
        # Get all connections from this IP
        ip_connections = [k for k in self.connections.keys() if k.startswith(ip)]
        
        # Analyze connection patterns
        if len(ip_connections) > 20:
            # Check for port scanning (many different destination ports)
            dest_ports = set()
            for conn in ip_connections:
                parts = conn.split(':')
                if len(parts) >= 3:
                    dest_ports.add(parts[2])
                    
            if len(dest_ports) > 15:
                logger.warning(f"Possible port scan detected from {ip} - {len(dest_ports)} different ports")
                
        # Check for DoS attack (high packet rate)
        packet_count = self.packet_stats[ip]
        duration = time.time() - self.start_time
        packet_rate = packet_count / duration
        
        if packet_rate > 100:  # Threshold for high packet rate
            logger.warning(f"Possible DoS attack from {ip} - {packet_rate:.2f} packets/sec")
            
    def train_anomaly_detector(self, duration=300):
        """
        Train the anomaly detector on normal traffic.
        
        Args:
            duration: Duration in seconds to capture normal traffic for training
        """
        logger.info(f"Training anomaly detector on normal traffic for {duration} seconds...")
        
        # Start sniffing in a separate thread
        self.start_time = time.time()
        sniff_thread = Thread(target=self._sniff, args=(duration,))
        sniff_thread.start()
        sniff_thread.join()
        
        # Prepare training data
        features = []
        for ip, count in self.packet_stats.items():
            conn_count = sum(1 for key in self.connections if key.startswith(ip))
            packet_rate = count / max(1, (time.time() - self.start_time))
            unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
            
            features.append([packet_rate, conn_count, unique_dests])
            
        if not features:
            logger.warning("No packets captured during training period")
            return
            
        # Train the anomaly detector
        X = np.array(features)
        self.anomaly_detector = IsolationForest(contamination=0.05, random_state=42)
        self.anomaly_detector.fit(X)
        
        logger.info("Anomaly detector trained successfully")
        
        # Reset statistics
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(list)
        
    def _sniff(self, duration=None):
        """
        Start sniffing packets.
        
        Args:
            duration: Duration in seconds to sniff (None for indefinite)
        """
        self.stop_sniffing = False
        self.start_time = time.time()
        
        # Set timeout if duration is specified
        timeout = None
        if duration:
            timeout = duration
            
        # Start sniffing
        scapy.sniff(
            iface=self.interface,
            prn=self._packet_callback,
            stop_filter=lambda _: self.stop_sniffing,
            timeout=timeout
        )
        
    def start(self):
        """
        Start the packet sniffer in a separate thread.
        """
        logger.info(f"Starting packet sniffer on interface {self.interface or 'default'}")
        
        # Reset statistics
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(int)
        self.start_time = time.time()
        
        # Start sniffing in a separate thread
        self.sniff_thread = Thread(target=self._sniff)
        self.sniff_thread.daemon = True
        self.sniff_thread.start()
        
    def stop(self):
        """
        Stop the packet sniffer.
        """
        logger.info("Stopping packet sniffer")
        self.stop_sniffing = True
        
        if hasattr(self, 'sniff_thread') and self.sniff_thread.is_alive():
            self.sniff_thread.join(timeout=2)
            
        logger.info(f"Captured {self.packet_count} packets in total")

# Example usage
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Network Intrusion Detection System")
    parser.add_argument("-i", "--interface", help="Network interface to sniff on")
    parser.add_argument("-t", "--train", action="store_true", help="Train the anomaly detector")
    parser.add_argument("-d", "--duration", type=int, default=300, help="Training duration in seconds")
    
    args = parser.parse_args()
    
    sniffer = PacketSniffer(interface=args.interface)
    
    try:
        if args.train:
            sniffer.train_anomaly_detector(duration=args.duration)
        
        sniffer.start()
        
        # Keep the main thread running
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("
Stopping sniffer...")
        sniffer.stop()
        print("Sniffer stopped")

Compartir

Habilidades Python

Python Core
Data Analysis
Web Frameworks
Machine Learning
Testing

¿Interesado en un proyecto similar?

Si estás buscando implementar una solución en Python o tienes un proyecto en mente que requiera habilidades de desarrollo similares, no dudes en contactarme para discutir cómo puedo ayudarte.

Contactar