Volver a proyectos
Ciberseguridad
1 min de lectura

Librería de Comunicaciones Seguras

Por Jeferson Heredia
13 de junio de 2025
Python
Cryptography
Security
Networking
Librería de Comunicaciones Seguras

Descripción

Esta librería de comunicaciones seguras proporciona una capa de seguridad robusta para intercambios de datos entre microservicios, aplicaciones y usuarios finales.

Desarrollada en Python, la librería implementa los más recientes protocolos criptográficos utilizando bibliotecas como cryptography, PyNaCl y PyOpenSSL para asegurar que las comunicaciones sean privadas, auténticas y resistentes a ataques.

La API está diseñada para ser intuitiva y fácil de integrar en aplicaciones Python existentes, reduciendo la posibilidad de errores de implementación que podrían comprometer la seguridad. La librería maneja automáticamente aspectos complejos como la negociación de claves, la rotación de credenciales y la validación de certificados.

Tecnologías Utilizadas

Python

Lenguaje principal para el desarrollo de la librería

Cryptography

Biblioteca Python para operaciones criptográficas

PyNaCl

Implementación de libsodium para Python

gRPC

Framework de comunicación entre servicios

Protobuf

Serialización eficiente de datos estructurados

PyTest

Framework para pruebas automatizadas

Desafíos

Rendimiento vs. seguridad

Encontrar el equilibrio entre implementar protocolos criptográficos robustos y mantener un rendimiento óptimo.

Gestión de claves

Desarrollar un sistema seguro y usable para la gestión del ciclo de vida de las claves criptográficas.

Compatibilidad entre plataformas

Asegurar que la librería funcionara consistentemente en diferentes sistemas operativos y versiones de Python.

Soluciones

Implementaciones optimizadas

Utilicé implementaciones de Python altamente optimizadas de algoritmos criptográficos con soporte para aceleración por hardware cuando está disponible.

Rotación automática de claves

Implementé un sistema de rotación automática de claves en Python que mantiene la seguridad sin intervención manual.

Pruebas exhaustivas

Desarrollé una suite completa de pruebas con pytest que verifica la funcionalidad en múltiples plataformas y versiones de Python.

Galería

Arquitectura de la solución de encriptación

Arquitectura de la solución de encriptación

Dashboard de monitoreo de comunicaciones seguras

Dashboard de monitoreo de comunicaciones seguras

Ejemplo de implementación en una aplicación cliente

Ejemplo de implementación en una aplicación cliente

Ejemplo de Código

user_manager.py

import scapy.all as scapy
import argparse
import time
import logging
from threading import Thread
from collections import defaultdict
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='ids.log'
)
logger = logging.getLogger('IntrusionDetectionSystem')

class PacketSniffer:
    """
    A network packet sniffer that captures and analyzes network traffic.
    """
    
    def __init__(self, interface=None):
        """
        Initialize the packet sniffer.
        
        Args:
            interface: Network interface to sniff on (None for auto-detect)
        """
        self.interface = interface
        self.stop_sniffing = False
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(list)
        self.anomaly_detector = None
        
    def _packet_callback(self, packet):
        """
        Process each captured packet.
        
        Args:
            packet: The captured packet
        """
        self.packet_count += 1
        
        # Extract packet information
        if packet.haslayer(scapy.IP):
            src_ip = packet[scapy.IP].src
            dst_ip = packet[scapy.IP].dst
            proto = packet[scapy.IP].proto
            
            # Update statistics
            self.packet_stats[src_ip] += 1
            
            # Track connections
            conn_key = f"{src_ip}:{dst_ip}:{proto}"
            timestamp = time.time()
            self.connections[conn_key].append(timestamp)
            
            # Log packet info
            if self.packet_count % 100 == 0:
                logger.debug(f"Captured {self.packet_count} packets")
                
            # Check for anomalies periodically
            if self.packet_count % 1000 == 0:
                self._check_for_anomalies()
                
    def _check_for_anomalies(self):
        """
        Check for anomalies in the captured traffic.
        """
        # Prepare features for anomaly detection
        features = []
        
        # Calculate features for each source IP
        for ip, count in self.packet_stats.items():
            # Calculate connection rate (connections per second)
            conn_count = sum(1 for key in self.connections if key.startswith(ip))
            
            # Calculate packet rate
            packet_rate = count / max(1, (time.time() - self.start_time))
            
            # Calculate unique destinations
            unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
            
            features.append([packet_rate, conn_count, unique_dests])
            
        if not features:
            return
            
        # Convert to numpy array
        X = np.array(features)
        
        # If we have an anomaly detector, use it
        if self.anomaly_detector is not None:
            # Predict anomalies (-1 for anomalies, 1 for normal)
            predictions = self.anomaly_detector.predict(X)
            
            # Get anomaly scores
            scores = self.anomaly_detector.decision_function(X)
            
            # Log anomalies
            for i, (pred, score) in enumerate(zip(predictions, scores)):
                if pred == -1:  # Anomaly detected
                    ip = list(self.packet_stats.keys())[i]
                    logger.warning(f"Anomaly detected from IP {ip} with score {score:.4f}")
                    
                    # Additional analysis for the anomalous IP
                    self._analyze_anomalous_ip(ip)
        
    def _analyze_anomalous_ip(self, ip):
        """
        Perform detailed analysis on traffic from an anomalous IP.
        
        Args:
            ip: The IP address to analyze
        """
        # Get all connections from this IP
        ip_connections = [k for k in self.connections.keys() if k.startswith(ip)]
        
        # Analyze connection patterns
        if len(ip_connections) > 20:
            # Check for port scanning (many different destination ports)
            dest_ports = set()
            for conn in ip_connections:
                parts = conn.split(':')
                if len(parts) >= 3:
                    dest_ports.add(parts[2])
                    
            if len(dest_ports) > 15:
                logger.warning(f"Possible port scan detected from {ip} - {len(dest_ports)} different ports")
                
        # Check for DoS attack (high packet rate)
        packet_count = self.packet_stats[ip]
        duration = time.time() - self.start_time
        packet_rate = packet_count / duration
        
        if packet_rate > 100:  # Threshold for high packet rate
            logger.warning(f"Possible DoS attack from {ip} - {packet_rate:.2f} packets/sec")
            
    def train_anomaly_detector(self, duration=300):
        """
        Train the anomaly detector on normal traffic.
        
        Args:
            duration: Duration in seconds to capture normal traffic for training
        """
        logger.info(f"Training anomaly detector on normal traffic for {duration} seconds...")
        
        # Start sniffing in a separate thread
        self.start_time = time.time()
        sniff_thread = Thread(target=self._sniff, args=(duration,))
        sniff_thread.start()
        sniff_thread.join()
        
        # Prepare training data
        features = []
        for ip, count in self.packet_stats.items():
            conn_count = sum(1 for key in self.connections if key.startswith(ip))
            packet_rate = count / max(1, (time.time() - self.start_time))
            unique_dests = len(set(key.split(':')[1] for key in self.connections if key.startswith(ip)))
            
            features.append([packet_rate, conn_count, unique_dests])
            
        if not features:
            logger.warning("No packets captured during training period")
            return
            
        # Train the anomaly detector
        X = np.array(features)
        self.anomaly_detector = IsolationForest(contamination=0.05, random_state=42)
        self.anomaly_detector.fit(X)
        
        logger.info("Anomaly detector trained successfully")
        
        # Reset statistics
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(list)
        
    def _sniff(self, duration=None):
        """
        Start sniffing packets.
        
        Args:
            duration: Duration in seconds to sniff (None for indefinite)
        """
        self.stop_sniffing = False
        self.start_time = time.time()
        
        # Set timeout if duration is specified
        timeout = None
        if duration:
            timeout = duration
            
        # Start sniffing
        scapy.sniff(
            iface=self.interface,
            prn=self._packet_callback,
            stop_filter=lambda _: self.stop_sniffing,
            timeout=timeout
        )
        
    def start(self):
        """
        Start the packet sniffer in a separate thread.
        """
        logger.info(f"Starting packet sniffer on interface {self.interface or 'default'}")
        
        # Reset statistics
        self.packet_count = 0
        self.packet_stats = defaultdict(int)
        self.connections = defaultdict(int)
        self.start_time = time.time()
        
        # Start sniffing in a separate thread
        self.sniff_thread = Thread(target=self._sniff)
        self.sniff_thread.daemon = True
        self.sniff_thread.start()
        
    def stop(self):
        """
        Stop the packet sniffer.
        """
        logger.info("Stopping packet sniffer")
        self.stop_sniffing = True
        
        if hasattr(self, 'sniff_thread') and self.sniff_thread.is_alive():
            self.sniff_thread.join(timeout=2)
            
        logger.info(f"Captured {self.packet_count} packets in total")

# Example usage
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Network Intrusion Detection System")
    parser.add_argument("-i", "--interface", help="Network interface to sniff on")
    parser.add_argument("-t", "--train", action="store_true", help="Train the anomaly detector")
    parser.add_argument("-d", "--duration", type=int, default=300, help="Training duration in seconds")
    
    args = parser.parse_args()
    
    sniffer = PacketSniffer(interface=args.interface)
    
    try:
        if args.train:
            sniffer.train_anomaly_detector(duration=args.duration)
        
        sniffer.start()
        
        # Keep the main thread running
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("
Stopping sniffer...")
        sniffer.stop()
        print("Sniffer stopped")

Compartir

Habilidades Python

Python Core
Data Analysis
Web Frameworks
Machine Learning
Testing

¿Interesado en un proyecto similar?

Si estás buscando implementar una solución en Python o tienes un proyecto en mente que requiera habilidades de desarrollo similares, no dudes en contactarme para discutir cómo puedo ayudarte.

Contactar