Chargement
Léonard Namolaru

Outils pour la Cyber

Cyber offensif

Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru
Léonard Namolaru

Outils pour la Cyber

Cyber offensif

Blog Post

Assurer l’intégrité des outils cyber : Automatisation des interactions entre Pytest et une machine virtuelle dans le cadre de tests unitaires pour un outil de scan de ports

1 juillet 2025 Outils cyber
Assurer l’intégrité des outils cyber : Automatisation des interactions entre Pytest et une machine virtuelle dans le cadre de tests unitaires pour un outil de scan de ports

L’une des particularités des outils pour la cyber est qu’ils sont conçus pour opérer contre une cible. Alors, comment vérifier et garantir l’intégrité de tels outils ? Dans l’article suivant, je montre comment créer un outil de scan de ports simple et minimaliste en Python, puis comment concevoir des tests unitaires avec Pytest, tout en communiquant avec une machine virtuelle.

Mon parcours particulier (un master en développement logiciel entre 2021 et 2023, et un master en cybersécurité que je terminerai en 2026) m’a permis, entre autres, de constater un point intéressant : si l’utilisation des machines virtuelles fait partie intégrante de toute formation en cybersécurité (c’est ce que je constate aujourd’hui à Sorbonne Université, et ce que j’ai observé auparavant à l’École 2600), dès la première semaine, les formations davantage axées sur le développement logiciel et les algorithmes abordent très peu ce sujet.

J’en ai pris conscience lors de mes études à l’École 2600. Au début du deuxième semestre de la première année, on nous a demandé de réfléchir à un sujet de “Side Quest” : un projet à long terme se déroulant tout au long du semestre autour d’un sujet lié au domaine de la cybersécurité. Le projet que plusieurs de mes camarades et moi avons choisi de réaliser consistait à créer un outil dont l’un des composants serait un mini-scanner de ports.

En tant que chef de projet, à un moment donné, j’ai dû réfléchir à la manière de réaliser des tests unitaires pour cette fonctionnalité. Ce n’était pas évident et même complètement différent de tout ce que j’avais vu lors de mon premier master, car il ne s’agit pas d’un logiciel de gestion de factures ou de formulaires, par exemple, mais d’un outil conçu pour être utilisé contre une cible réelle. C’est ce qui m’a amené à utiliser les fonctionnalités de Pytest de manière créative, pour produire des tests unitaires qui communiquent directement avec une machine virtuelle.

D’autre part, un outil de scan de port est un exemple de cas concret où la programmation orientée objet (un sujet pas toujours évoqué pendant les formations cyber) peut être utile, et c’est l’occasion d’en présenter une implémentation, qui peut être beaucoup plus intéressante que les exemples génériques présentés lors des cours de POO.

Et comme l’un des objectifs de ce blog est de combiner des concepts issus à la fois du monde du développement logiciel et de la cybersécurité, j’ai pensé qu’il serait intéressant de montrer comment effectuer des tests unitaires pour un outil de scan de ports, dans l’espoir que cet exemple inspirera d’autres façons de tester les outils cyber, et ainsi garantir leur intégrité.

Plan

Alors qu’allons-nous faire ?

  • Implémentation de 4 méthodes de scan de ports TCP : TCP ACK, TCP FIN, TCP NULL et TCP SYN.
    • Implémentation en programmation orientée objet, avec une classe abstraite, PortScan, qui facilite la mise en œuvre des différentes techniques de scan de ports.
    • Permet une mise en œuvre facile de méthodes de scan de port supplémentaires.
  • Implémentation de tests unitaires pour toutes ces méthodes de scan, en construisant une architecture permettant de communiquer avec une machine virtuelle depuis Pytest. Chaque test unitaire commence par la définition de règles iptables sur la VM (la communication entre Pytest et la VM s’effectue par SSH avec une paire de clés publique et privée, sans mot de passe). Ensuite, un scan de port est réalisé avec l’outil. Le scan qui illustre comment différentes règles iptables modifient le résultat d’un scan, mais aussi comment l’attaquant perçoit les barrières imposées par ces règles de son côté.
    • Permet une mise en œuvre facile de tests unitaires pour des méthodes de scan de port supplémentaires et fournit également un outil pédagogique simple à utiliser.

La classe abstraite PortScan

Python
class PortScan(ABC):
    def __init__(self, dst: str, start_port: int, end_port: int, timeout: int = 1):
        self._dst = dst
        self._start_port = start_port
        self._end_port = end_port
        self._timeout = timeout

        self._packets: dict[int, [Packet | None, Packet | None]] = dict((port, [None, None]) for port
                                                                        in range(self._start_port, self._end_port))
        self._results: dict[int, dict[str, str]] = dict((port, dict()) for port
                                                        in range(self._start_port, self._end_port))

    @abstractmethod
    def _create_packet(self, port: int) -> Packet:
        pass

    @abstractmethod
    def _test_response(self, response: Packet | None) -> dict[str, str]:
        pass

    def _test_port(self, port: int, attempt_number: int = 0):
        packet = self._create_packet(port)
        response = sr1(packet, timeout=self._timeout, verbose=0)
        # if response is None and attempt_number < 5:
        #    sleep(2 ** attempt_number)
        #   self._test_port(port, attempt_number + 1)
        self._packets[port] = [packet, response]
        self._results[port] = self._test_response(response)

    def run(self):
        threads: list[Thread] = []
        for port in range(self._start_port, self._end_port + 1):
            thread = Thread(target=self._test_port, args=(port,))
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

    @property
    def results(self):
        return self._results
    
    def save_results_to_json(self, filename: str):
        with open(filename, 'w') as file:
            json.dump(self._results, file, indent=4)

    def __str__(self):
        string = f'{self.__module__.replace("_", " ")} \n'
        for port, port_analyse in self.results.items():
            string += str(port) + '\t' + port_analyse['state'] + '\n'
        return string

L’idée d’une classe abstraite est de présenter une structure qui indique la forme sous laquelle fonctionneront les classes qui hériteront de PortScan. Ainsi, PortScan indique que toutes ses classes enfants hériteront des attributs dst, start_port, etc. et que chacune de ces classes enfants doit implémenter 2 méthodes : _create_packet(self, port: int), et _test_response(self, response: Packet | None).

La première fonction renvoie la structure du paquet que nous souhaitons envoyer sur chacun des ports de notre cible. La deuxième fonction nous permet de déterminer l’analyse que nous souhaitons effectuer pour chacune des réponses qui seront reçues en réponse. Sur la base de cette analyse, la fonction renvoie un dictionnaire qui inclut les conclusions dérivées de cette réponse, par exemple : {'state': 'open'}. Voici un exemple :

Python
class TcpAckScan(PortScan):
    """
    La méthode de scan TCP ACK permet de vérifier si un port est filtré ou non.
    """

    def __init__(self, dst: str, start_port: int, end_port: int, timeout: int = 1):
        super().__init__(dst, start_port, end_port, timeout)

    def _create_packet(self, port: int):
        ip_packet = IP(dst=self._dst)
        ack_tcp_packet = TCP(sport=55555, dport=port, flags='A', seq=0)
        return ip_packet / ack_tcp_packet

    def _test_response(self, response: Packet | None) -> dict[str, str]:
        """
        La méthode _test_response() reçoit en argument une réponse reçue après l'envoi d'un
        paquet créé à l'aide de la méthode _create_packet(). Il s'agit donc d'une instance
        de la classe Packet de la bibliothèque scapy, ou None si rien n'a été reçu.
        """
        response_analyse = dict()

        # response is not None and response.haslayer(TCP) :
        #   <=> Si une réponse a été reçue et si cette réponse est un paquet TCP
        # response.getlayer(TCP).flags & TcpScan.RST :
        #   <=> Vérifier si le flag RST (Reset) est activé
        if response is not None and response.haslayer(TCP) and response.getlayer(TCP).flags & TcpScan.RST:
            response_analyse['state'] = 'unfiltered'
        else:
            # Si aucun paquet TCP n'est reçu en réponse au paquet ACK, cela signifie que le port est filtré
            response_analyse['state'] = 'filtered'

        return response_analyse

Définir ces 2 méthodes (en plus du constructeur bien sûr) suffit pour permettre le lancement d’un scan de ports sur la cible avec la méthode run. Cette méthode est déjà définie dans la classe parent (PortScan), une classe enfant (TcpAckScan par exemple) n’a donc pas besoin de la redéfinir, mais peut l’exécuter. Dans un tel cas, appeler la méthode run depuis TcpAckScan revient en fait à appeler la méthode run de PortScan, sauf que la méthode run peut maintenant prendre en compte les besoins spécifiques de notre type de scan (TcpAckScan dans ce cas) en faisant un appel à la méthode _create_packet de TcpAckScan pour chaque port entre self._start_port et self._end_port et toute réponse reçue passera par la méthode _test_response de TcpAckScan. Les résultats seront sauvegardés dans self._results qui associe à chaque numéro de port le dictionnaire des conclusions obtenues à partir de la méthode _test_port de TcpAckScan.

Grâce à cette méthode, l’implémentation de chaque méthode de scan de ports ne reviendra qu’à faire une implémentation de _create_packet et de _test_response. Tout le reste du travail (envoi des paquets via plusieurs threads, réception des réponses et collecte des résultats) sera effectué par la classe parent, au lieu d’avoir à réécrire le même code pour chaque méthode de scan supplémentaire qui sera implémentée.

Utilisation

Comment pouvons-nous maintenant utiliser la classe que nous avons créée ? La façon la plus simple est de faire quelque chose comme ceci :

Python
if __name__ == '__main__':
    if len(sys.argv) < 4:
        print(f'Utilisation : python {sys.argv[0]} ip start_port end_port')
        sys.exit(1)

    ip = sys.argv[1]
    start_port = int(sys.argv[2])
    end_port = int(sys.argv[3])

    tcp_ack_scan = TcpAckScan(ip, start_port, end_port)
    tcp_ack_scan.run()
    tcp_ack_scan.save_results_to_json('tcp_ack_scan_results.json')
    print(tcp_ack_scan)

Si nous disposons déjà d’une machine virtuelle, nous pouvons bien sûr essayer de l’utiliser comme cible et de faire un scan de ses ports. Mais comment automatiser ces tests ?

Tests unitaires

L’un des points forts du framework Pytest est le mécanisme des fixture, il est possible d’en voir un certain nombre dans le fichier conftest.py :

Python
def pytest_addoption(parser):
    parser.addoption("--host", action="store", help="host ip", type=host_checker, default="127.0.0.1")
    parser.addoption("--username", action="store", help="ssh user", type=identifier_checker, default="root")
    parser.addoption("--pkey", action="store", help="ssh user", type=path_checker, default="./id_rsa")


@pytest.fixture(scope="session")
def host(request):
    return request.config.getoption("--host")


@pytest.fixture(scope="session")
def username(request):
    return request.config.getoption("--username")


@pytest.fixture(scope="session")
def pkey(request):
    return request.config.getoption("--pkey")


@pytest.fixture(scope="session")
def key_based_ssh_connection(host, username, pkey):
    private_key = paramiko.RSAKey.from_private_key_file(pkey)
    ssh_client = paramiko.SSHClient()
    policy = paramiko.AutoAddPolicy()

    ssh_client.set_missing_host_key_policy(policy)
    ssh_client.connect(host, username=username, pkey=private_key)
    return ssh_client


@pytest.fixture(scope="function")
def ssh_connection_after_iptables_reset(key_based_ssh_connection):
    exec_ssh_command_with_error_handling(key_based_ssh_connection, 'sudo iptables -F')
    exec_ssh_command_with_error_handling(key_based_ssh_connection, 'sudo iptables -X')
    return key_based_ssh_connection

def exec_ssh_command_with_error_handling(ssh_connection: SSHClient, command: str) -> None:
    _stdin, _stdout, _stderr = ssh_connection.exec_command(command)

    error_message = _stderr.read().decode()
    if len(error_message) > 0:
        raise RuntimeError(error_message)
    return

Ensuite, ils sont utilisés dans les fichiers de test. L’idée est qu’il y a souvent des actions qui se répètent inévitablement dans les différents tests. Certaines d’entre elles doivent être effectuées une fois, avant d’exécuter la série de tests, d’autres doivent être effectuées à nouveau avant chaque test de la série. Cela peut être considéré comme une sorte de condition préalable permettant de réaliser correctement le test.

Dans le fichier test_tcp_ack.py par exemple, chaque test prend deux arguments : host et ssh_connection_after_iptables_reset. Ces noms correspondent aux noms de fonctions définies dans le fichier conftest.py. Ajouter leurs noms comme arguments à une fonction de test garantit que ces fonctions seront nécessairement appelées avant d’exécuter ce test et que la fonction de test pourra utiliser les valeurs renvoyées par ces fonctions. De cette manière, les iptables sont vidés et initialisés avant chaque test. De plus, au début de chaque série de tests, une connexion SSH avec le serveur de test est établie.

Python
def test_tcp_ack_1(host, ssh_connection_after_iptables_reset):
    port_number = 22
    tcp_ack_scan = tcp_ack.TcpAckScan(host, port_number, port_number)
    tcp_ack_scan.run()
    assert tcp_ack_scan.results[port_number]['state'] == 'unfiltered'


def test_tcp_ack_2(host, ssh_connection_after_iptables_reset):
    port_number = 21
    tcp_ack_scan = tcp_ack.TcpAckScan(host, port_number, port_number)
    tcp_ack_scan.run()
    assert tcp_ack_scan.results[port_number]['state'] == 'unfiltered'
 
 # ...

def test_tcp_ack_7(host, ssh_connection_after_iptables_reset):
    port_number = 21
    # [!] --tcp-flags mask comp
    # Le premier argument (mask) correspond aux flags que nous devons examiner
    # Le deuxième argument (comp) correspond aux flags qui doivent être définis.
    # Les flags sont : SYN ACK FIN RST URG PSH ALL NONE.
    # Source : https://ipset.netfilter.org/iptables-extensions.man.html
    exec_ssh_command_with_error_handling(ssh_connection_after_iptables_reset,
                                         f"sudo iptables -A INPUT -p tcp --dport {port_number} "
                                         f"--tcp-flags ALL ACK -j DROP")

    tcp_ack_scan = tcp_ack.TcpAckScan(host, port_number, port_number)
    tcp_ack_scan.run()
    assert tcp_ack_scan.results[port_number]['state'] == 'filtered'

Les exemples de tests unitaires de ce projet sont très simples, voire banals. Cependant, ce qui est important ici est la démonstration de la manière dont Pytest peut interagir avec une machine virtuelle.

Mise en place d’une machine virtuelle Ubuntu Server

L’exécution des tests unitaires pour les scans de ports nécessite donc l’accès à un serveur avec un système d’exploitation de la famille Unix auquel il est possible de se connecter via SSH sans mot de passe, idéalement Ubuntu Server. Une deuxième exigence est l’existence d’une configuration permettant d’exécuter la commande iptables en sudo sur ce serveur sans avoir besoin de saisir un mot de passe. Ces deux prérequis sont nécessaires afin de pouvoir ajouter des règles iptables et les supprimer, selon les besoins des différents tests. De toute évidence, une telle configuration permettant d’exécuter une commande en sudo sans mot de passe ne doit pas être utilisée dans un environnement de production.

Voici donc une liste rapide d’instructions pour installer et configurer une machine virtuelle Ubuntu Server à l’aide de Virtual Box :

  • Télécharger le fichier iso d’ Ubuntu Server (la version testée est 22.04.4) depuis https://ubuntu.com/download/server
  • Création d’une nouvelle VM sur Virtual Box :
    • Type : Linux, Version : Ubuntu (64-bit)
    • Taille de la mémoire (RAM, Random Access Memory) : 2 Go (2 048 mégaoctets)
    • Disque dur : Create a virtual hard disk / Créer un disque dur virtuel maintenant
    • Type de fichier de disque dur : VDI (VirtualBox Disk Image)
    • Stockage sur disque dur physique : Dynamically allocated / Dynamiquement alloué
    • Emplacement du fichier et taille : 10,00 Gio

  • Démarrer la VM (Choix du disque de démarrage : ubuntu-22.04.4-live-server-amd64.iso).
    • Choisir la langue : English
    • Configuration du clavier Layout : French
    • Type d’installation : Ubuntu Server
    • Connexions réseau : Utiliser les configurations par défaut
    • Configuration d’un proxy : Passer à l’étape suivante
    • Configuration du miroir d’archives Ubuntu : Utilisation de la configuration par défaut.
    • Configuration de stockage guidée : Use an entire disk ; Set up this disk as an LVM group
    • Upgrade to Ubuntu Pro : Skip for now
    • SSH Setup : Install OpenSSH server ; Import SSH identity : No
    • Featured Server Snaps : Ne rien choisir
    • Reboot now
  • ip addr pour obtenir l’adresse IP de notre VM.
  • Générer une paire de clés SSH :
    • Sur la machine hôte, pour générer notre paire de clés : ssh-keygen -o -b 4096 -t rsa (accepter l’emplacement par défaut et laisser passphrase vide).
    • Copier la clé publique sur le serveur Ubuntu depuis la machine hôte : ssh-copy-id -i ~/.ssh/id_rsa.pub <Login Ubuntu Server>@<IP Ubuntu Server>
  • Pour pouvoir faire « sudo » (notamment à distance avec SSH) sans mot de passe pour la commande iptables :
    • sudo visudo
    • Ajouter à la fin du fichier,
      <Login Ubuntu Server> ALL=(ALL) NOPASSWD:/usr/sbin/iptables
    • Pour trouver l’emplacement d’iptables sur le système d’exploitation : which iptables

Exécution des tests

L’exécution de tests unitaires écrits à l’aide du framework Pytest se fait simplement à l’aide de la commande pytest. Cependant, dans le cas de l’outil d’analyse de ports, l’utilisation de scapy nécessite généralement également l’utilisation de sudo.

De plus, le caractère unique des tests unitaires nécessaires pour vérifier les performances de notre outil nécessite l’utilisation de 3 options qui ont été définies à l’aide de Pytest spécialement pour cet outil. Ces options permettent de définir l’adresse IP du serveur de test, le nom d’utilisateur par lequel la connexion SSH se fera, ainsi que le chemin menant à la clé privée qui permettra la connexion SSH sans mot de passe :

Bash
sudo pytest --host=10.0.2.16 --username=namolaru --pkey=/home/namolaru/.ssh/id_rsa

Exécution des tests unitaires à partir d’un venv :

Bash
sudo .venv/bin/pytest --host=10.0.2.16 --username=namolaru --pkey=/home/namolaru/.ssh/id_rsa

L’utilisation de cette commande entraîne l’exécution des tests dans les fichiers du sous-répertoire tests.

Le projet dans son intégralité

Le projet est disponible dans son intégralité sur mon GitHub

Et vous ? Quelles solutions avez-vous trouvées pour assurer l’intégrité de vos outils cyber ? N’hésitez pas à partager dans les commentaires !

Tags:
Write a comment
This site is registered on wpml.org as a development site. Switch to a production site key to remove this banner.