Files
audi_uid/audit-suid.py

583 lines
24 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'audit des fichiers SUID et GUID pour Linux
Sans dépendances externes
Auteur: Zen6
Version: 1.0
"""
import os
import sys
import stat
import hashlib
import logging
import argparse
import subprocess
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass, asdict
# Configuration
@dataclass
class AuditConfig:
log_dir: str = "/var/log/audit"
whitelist_file: str = "/etc/audit/suid_whitelist.json"
excluded_dirs: List[str] = None
max_depth: int = 20
check_temp_dirs: bool = True
check_orphans: bool = True
compute_hashes: bool = True
def __post_init__(self):
if self.excluded_dirs is None:
self.excluded_dirs = [
"/proc", "/sys", "/dev", "/run", "/snap",
"/var/lib/docker", "/var/lib/lxcfs", "/proc/*",
"/sys/*", "/dev/*"
]
@dataclass
class FileAudit:
path: str
type: str
permissions: str
owner: str
group: str
size: int
size_human: str
mtime: str
hash_md5: str = ""
is_whitelisted: bool = False
alert_level: str = "INFO"
class SUIDGUIDAuditor:
def __init__(self, config: AuditConfig = None):
self.config = config or AuditConfig()
self.whitelist = {"SUID": set(), "SGID": set()}
self.alerts = []
self.results = []
# Configuration du logging
self.setup_logging()
def setup_logging(self):
"""Configure le système de logging"""
# Création du répertoire de logs
Path(self.config.log_dir).mkdir(parents=True, exist_ok=True)
# Nom des fichiers de log
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_file = Path(self.config.log_dir) / f"audit_{timestamp}.log"
self.report_file = Path(self.config.log_dir) / f"report_{timestamp}.txt"
self.alert_file = Path(self.config.log_dir) / f"alerts_{timestamp}.txt"
self.json_file = Path(self.config.log_dir) / f"audit_{timestamp}.json"
self.csv_file = Path(self.config.log_dir) / f"audit_{timestamp}.csv"
# Configuration du logging (sans couleurs)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_whitelist(self):
"""Charge la liste blanche depuis un fichier JSON"""
try:
if Path(self.config.whitelist_file).exists():
with open(self.config.whitelist_file, 'r') as f:
data = json.load(f)
self.whitelist["SUID"] = set(data.get("SUID", []))
self.whitelist["SGID"] = set(data.get("SGID", []))
self.logger.info(f"Liste blanche chargée: {len(self.whitelist['SUID'])} SUID, {len(self.whitelist['SGID'])} SGID")
else:
self.create_default_whitelist()
except Exception as e:
self.logger.error(f"Erreur lors du chargement de la whitelist: {e}")
def create_default_whitelist(self):
"""Crée une liste blanche par défaut"""
default_whitelist = {
"SUID": [
"/bin/su", "/bin/ping", "/bin/mount", "/bin/umount",
"/usr/bin/passwd", "/usr/bin/sudo", "/usr/bin/chsh",
"/usr/bin/chfn", "/usr/bin/gpasswd", "/usr/bin/crontab",
"/usr/bin/at", "/usr/bin/newgrp", "/usr/sbin/unix_chkpwd",
"/usr/lib/openssh/ssh-keysign", "/usr/lib/dbus-1.0/dbus-daemon-launch-helper",
"/usr/bin/sg", "/usr/bin/pkexec", "/usr/bin/ksu"
],
"SGID": [
"/usr/bin/wall", "/usr/bin/write", "/usr/bin/ssh-agent",
"/usr/bin/locate", "/usr/bin/mlocate", "/usr/bin/bsd-write",
"/usr/bin/chage", "/usr/bin/expiry"
]
}
try:
Path(self.config.whitelist_file).parent.mkdir(parents=True, exist_ok=True)
with open(self.config.whitelist_file, 'w') as f:
json.dump(default_whitelist, f, indent=4)
self.whitelist["SUID"] = set(default_whitelist["SUID"])
self.whitelist["SGID"] = set(default_whitelist["SGID"])
self.logger.info(f"Liste blanche par défaut créée: {self.config.whitelist_file}")
except Exception as e:
self.logger.error(f"Erreur lors de la création de la whitelist: {e}")
def get_file_info(self, filepath: str) -> Tuple[str, str, str, int, str]:
"""Récupère les informations d'un fichier"""
try:
stat_info = os.stat(filepath, follow_symlinks=False)
permissions = stat.filemode(stat_info.st_mode)
owner = str(stat_info.st_uid)
group = str(stat_info.st_gid)
size = stat_info.st_size
mtime = datetime.fromtimestamp(stat_info.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
# Récupérer les noms d'utilisateur/groupe si possible
try:
import pwd
owner = pwd.getpwuid(stat_info.st_uid).pw_name
except:
pass
try:
import grp
group = grp.getgrgid(stat_info.st_gid).gr_name
except:
pass
return permissions, owner, group, size, mtime
except Exception as e:
self.logger.debug(f"Erreur sur {filepath}: {e}")
return "unknown", "unknown", "unknown", 0, "unknown"
def compute_md5(self, filepath: str) -> str:
"""Calcule le hash MD5 d'un fichier"""
if not self.config.compute_hashes:
return ""
try:
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
self.logger.debug(f"Erreur MD5 sur {filepath}: {e}")
return ""
def check_suid_sgid(self, filepath: str) -> List[str]:
"""Vérifie les bits SUID et SGID"""
try:
st = os.stat(filepath, follow_symlinks=False)
mode = st.st_mode
types = []
if mode & stat.S_ISUID:
types.append("SUID")
if mode & stat.S_ISGID:
types.append("SGID")
return types
except:
return []
def should_exclude(self, filepath: str) -> bool:
"""Vérifie si le chemin doit être exclu"""
for excluded in self.config.excluded_dirs:
if filepath.startswith(excluded):
return True
return False
def audit_directory(self, start_path: str = "/"):
"""Audite récursivement un répertoire"""
self.logger.info(f"Début de l'audit depuis {start_path}")
total_files = 0
try:
for root, dirs, files in os.walk(start_path, followlinks=False):
# Filtrer les répertoires exclus
dirs[:] = [d for d in dirs if not self.should_exclude(os.path.join(root, d))]
# Vérifier la profondeur maximale
depth = root.count(os.sep)
if depth > self.config.max_depth:
continue
for file in files:
filepath = os.path.join(root, file)
total_files += 1
if total_files % 10000 == 0:
self.logger.info(f"Progression: {total_files} fichiers parcourus...")
suid_sgid_types = self.check_suid_sgid(filepath)
if suid_sgid_types:
self.process_file(filepath, suid_sgid_types)
except KeyboardInterrupt:
self.logger.warning("Audit interrompu par l'utilisateur")
sys.exit(1)
except Exception as e:
self.logger.error(f"Erreur lors de l'audit: {e}")
self.logger.info(f"Audit terminé. {total_files} fichiers parcourus.")
def process_file(self, filepath: str, types: List[str]):
"""Traite un fichier avec bits SUID/SGID"""
permissions, owner, group, size, mtime = self.get_file_info(filepath)
for file_type in types:
is_whitelisted = filepath in self.whitelist.get(file_type, set())
alert_level = "INFO" if is_whitelisted else "WARNING"
hash_md5 = ""
if not is_whitelisted and self.config.compute_hashes:
hash_md5 = self.compute_md5(filepath)
audit = FileAudit(
path=filepath,
type=file_type,
permissions=permissions,
owner=owner,
group=group,
size=size,
size_human=self.human_readable_size(size),
mtime=mtime,
hash_md5=hash_md5,
is_whitelisted=is_whitelisted,
alert_level=alert_level
)
self.results.append(audit)
if is_whitelisted:
self.logger.info(f"[OK {file_type}] {filepath}")
else:
self.logger.warning(f"[ALERTE {file_type}] {filepath} (propriétaire: {owner}, groupe: {group})")
self.alerts.append(audit)
def check_temp_directories(self):
"""Vérifie les répertoires temporaires"""
if not self.config.check_temp_dirs:
return
temp_dirs = ["/tmp", "/var/tmp", "/dev/shm"]
self.logger.info("Vérification des répertoires temporaires...")
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
try:
for root, dirs, files in os.walk(temp_dir):
for file in files:
filepath = os.path.join(root, file)
types = self.check_suid_sgid(filepath)
if types:
self.logger.error(f"[URGENT] Fichier SUID/SGID dans {temp_dir}: {filepath}")
alert = FileAudit(
path=filepath,
type=",".join(types),
permissions="unknown",
owner="unknown",
group="unknown",
size=0,
size_human="0B",
mtime="unknown",
alert_level="CRITICAL"
)
self.alerts.append(alert)
self.results.append(alert)
except Exception as e:
self.logger.debug(f"Erreur lors de la vérification de {temp_dir}: {e}")
def check_orphan_files(self):
"""Vérifie les fichiers sans propriétaire valide"""
if not self.config.check_orphans:
return
self.logger.info("Recherche des fichiers orphelins...")
try:
cmd = "find / -type f \\( -perm -4000 -o -perm -2000 \\) \\( -nouser -o -nogroup \\) 2>/dev/null"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300)
for line in result.stdout.splitlines():
if line.strip():
self.logger.error(f"[ORPHELIN] {line}")
alert = FileAudit(
path=line.strip(),
type="ORPHAN",
permissions="unknown",
owner="unknown",
group="unknown",
size=0,
size_human="0B",
mtime="unknown",
alert_level="CRITICAL"
)
self.alerts.append(alert)
self.results.append(alert)
except subprocess.TimeoutExpired:
self.logger.error("Timeout lors de la recherche des fichiers orphelins")
except Exception as e:
self.logger.error(f"Erreur lors de la vérification des orphelins: {e}")
def human_readable_size(self, size: int) -> str:
"""Convertit la taille en format lisible"""
if size == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
i = 0
while size >= 1024 and i < len(units) - 1:
size /= 1024.0
i += 1
return f"{size:.2f} {units[i]}"
def generate_reports(self):
"""Génère tous les rapports (texte, JSON, CSV)"""
# Rapport texte détaillé
self.generate_text_report()
# Rapport JSON
self.generate_json_report()
# Rapport CSV
self.generate_csv_report()
self.logger.info(f"Rapport texte généré: {self.report_file}")
self.logger.info(f"Rapport JSON généré: {self.json_file}")
self.logger.info(f"Rapport CSV généré: {self.csv_file}")
self.logger.info(f"Alertes sauvegardées: {self.alert_file}")
def generate_text_report(self):
"""Génère un rapport texte détaillé"""
with open(self.report_file, 'w') as f:
f.write("=" * 80 + "\n")
f.write("RAPPORT D'AUDIT SUID/SGID\n")
f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Serveur: {os.uname().nodename}\n")
f.write(f"Système: {os.uname().sysname} {os.uname().release}\n")
f.write("=" * 80 + "\n\n")
# Résumé
f.write("RÉSUMÉ\n")
f.write("-" * 40 + "\n")
f.write(f"Total fichiers audités: {len(self.results)}\n")
f.write(f"Anomalies SUID: {len([r for r in self.alerts if r.type == 'SUID'])}\n")
f.write(f"Anomalies SGID: {len([r for r in self.alerts if r.type == 'SGID'])}\n")
f.write(f"Fichiers orphelins: {len([r for r in self.alerts if r.type == 'ORPHAN'])}\n")
f.write(f"Fichiers whitelistés: {len([r for r in self.results if r.is_whitelisted])}\n\n")
# Anomalies détaillées
if self.alerts:
f.write("ANOMALIES DÉTECTÉES\n")
f.write("-" * 40 + "\n")
# Séparer par type
suid_alerts = [a for a in self.alerts if a.type == 'SUID']
sgid_alerts = [a for a in self.alerts if a.type == 'SGID']
orphan_alerts = [a for a in self.alerts if a.type == 'ORPHAN']
if suid_alerts:
f.write("\n[SUID SUSPECTS]\n")
for i, alert in enumerate(suid_alerts, 1):
f.write(f"\n{i}. {alert.path}\n")
f.write(f" Type: {alert.type}\n")
f.write(f" Propriétaire: {alert.owner}\n")
f.write(f" Groupe: {alert.group}\n")
f.write(f" Permissions: {alert.permissions}\n")
f.write(f" Taille: {alert.size_human}\n")
f.write(f" Dernière modification: {alert.mtime}\n")
if alert.hash_md5:
f.write(f" MD5: {alert.hash_md5}\n")
if sgid_alerts:
f.write("\n[SGID SUSPECTS]\n")
for i, alert in enumerate(sgid_alerts, 1):
f.write(f"\n{i}. {alert.path}\n")
f.write(f" Type: {alert.type}\n")
f.write(f" Propriétaire: {alert.owner}\n")
f.write(f" Groupe: {alert.group}\n")
f.write(f" Permissions: {alert.permissions}\n")
f.write(f" Taille: {alert.size_human}\n")
f.write(f" Dernière modification: {alert.mtime}\n")
if alert.hash_md5:
f.write(f" MD5: {alert.hash_md5}\n")
if orphan_alerts:
f.write("\n[FICHIERS ORPHELINS]\n")
for i, alert in enumerate(orphan_alerts, 1):
f.write(f"\n{i}. {alert.path}\n")
f.write(f" Type: {alert.type}\n")
else:
f.write("✓ AUCUNE ANOMALIE DÉTECTÉE\n\n")
# Tous les fichiers SUID/SGID (y compris whitelist)
f.write("\n\nLISTE COMPLÈTE DES FICHIERS SUID/SGID\n")
f.write("-" * 40 + "\n")
for result in sorted(self.results, key=lambda x: x.path):
status = "[WHITELIST]" if result.is_whitelisted else "[ALERTE]"
f.write(f"{status} {result.type}: {result.path}\n")
f.write(f" Permissions: {result.permissions}, Owner: {result.owner}, Group: {result.group}\n")
# Recommandations
f.write("\n\nRECOMMANDATIONS\n")
f.write("-" * 40 + "\n")
f.write("1. Examinez chaque binaire suspect manuellement\n")
f.write("2. Supprimez les bits SUID/SGID si non nécessaires:\n")
f.write(" sudo chmod u-s <fichier> # Pour SUID\n")
f.write(" sudo chmod g-s <fichier> # Pour SGID\n")
f.write(f"3. Ajoutez les binaires légitimes à la whitelist:\n")
f.write(f" {self.config.whitelist_file}\n")
f.write("4. Vérifiez l'intégrité des binaires système\n")
f.write("5. Mettez en place une surveillance régulière\n")
def generate_json_report(self):
"""Génère un rapport JSON"""
report_data = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"hostname": os.uname().nodename,
"system": f"{os.uname().sysname} {os.uname().release}",
"total_files_audited": len(self.results),
"total_alerts": len(self.alerts)
},
"alerts": [asdict(alert) for alert in self.alerts],
"all_files": [asdict(result) for result in self.results]
}
with open(self.json_file, 'w') as f:
json.dump(report_data, f, indent=2)
def generate_csv_report(self):
"""Génère un rapport CSV pour analyse avec tableur"""
import csv
with open(self.csv_file, 'w', newline='') as f:
if self.results:
fieldnames = ['path', 'type', 'permissions', 'owner', 'group',
'size', 'size_human', 'mtime', 'hash_md5',
'is_whitelisted', 'alert_level']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for result in self.results:
writer.writerow(asdict(result))
def save_alerts_file(self):
"""Sauvegarde les alertes dans un fichier simple"""
with open(self.alert_file, 'w') as f:
f.write("# Fichier d'alertes SUID/SGID\n")
f.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("# Format: TYPE|PATH|OWNER|GROUP|LEVEL\n\n")
for alert in self.alerts:
f.write(f"{alert.type}|{alert.path}|{alert.owner}|{alert.group}|{alert.alert_level}\n")
def print_summary(self) -> int:
"""Affiche un résumé dans la console"""
print("\n" + "=" * 80)
print("RÉSUMÉ DE L'AUDIT")
print("=" * 80)
print(f"Log complet: {self.log_file}")
print(f"Rapport détaillé: {self.report_file}")
print(f"Rapport JSON: {self.json_file}")
print(f"Rapport CSV: {self.csv_file}")
print(f"Alertes: {self.alert_file}")
print(f"Whitelist: {self.config.whitelist_file}")
if self.alerts:
print(f"\n⚠️ ATTENTION: {len(self.alerts)} anomalie(s) détectée(s) !")
print(f" - SUID suspects: {len([a for a in self.alerts if a.type == 'SUID'])}")
print(f" - SGID suspects: {len([a for a in self.alerts if a.type == 'SGID'])}")
print(f" - Orphelins: {len([a for a in self.alerts if a.type == 'ORPHAN'])}")
print("Consultez les rapports pour plus de détails")
return 1
else:
print("\n✓ AUCUNE ANOMALIE DÉTECTÉE")
print("Tous les fichiers SUID/SGID sont dans la whitelist")
return 0
def run(self) -> int:
"""Exécute l'audit complet"""
print("=" * 80)
print("SUID/SGID Security Audit Tool - Linux Privilege Scanner")
print(f"Début de l'audit: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
# Vérification des droits root
if os.geteuid() != 0:
self.logger.error("Ce script doit être exécuté en tant que root")
print("ERREUR: Ce script doit être exécuté en tant que root")
print(f"Utilisez: sudo python3 {sys.argv[0]}")
return 1
# Chargement de la configuration
self.load_whitelist()
# Exécution des audits
self.audit_directory()
self.check_temp_directories()
self.check_orphan_files()
# Génération des rapports
self.generate_reports()
self.save_alerts_file()
# Affichage du résumé
return self.print_summary()
def main():
"""Fonction principale avec parsing des arguments"""
parser = argparse.ArgumentParser(
description="Audit des fichiers SUID et GUID sur Linux",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
sudo python3 audit-suid.py # Audit complet
sudo python3 audit-suid.py --no-temp # Sans vérification /tmp
sudo python3 audit-suid.py --max-depth 10 # Limite à 10 niveaux
sudo python3 audit-suid.py --no-hash # Sans calcul MD5 (plus rapide)
"""
)
parser.add_argument('--no-temp', action='store_true',
help="Ne pas vérifier les répertoires temporaires")
parser.add_argument('--no-orphan', action='store_true',
help="Ne pas vérifier les fichiers orphelins")
parser.add_argument('--no-hash', action='store_true',
help="Ne pas calculer les hashs MD5 (plus rapide)")
parser.add_argument('--max-depth', type=int, default=20,
help="Profondeur maximale de recherche (défaut: 20)")
parser.add_argument('--log-dir', type=str, default="/var/log/audit",
help="Répertoire des logs (défaut: /var/log/audit)")
parser.add_argument('--whitelist', type=str, default="/etc/audit/suid_whitelist.json",
help="Fichier de whitelist (défaut: /etc/audit/suid_whitelist.json)")
args = parser.parse_args()
# Configuration
config = AuditConfig(
log_dir=args.log_dir,
whitelist_file=args.whitelist,
max_depth=args.max_depth,
check_temp_dirs=not args.no_temp,
check_orphans=not args.no_orphan,
compute_hashes=not args.no_hash
)
# Exécution de l'audit
auditor = SUIDGUIDAuditor(config)
exit_code = auditor.run()
sys.exit(exit_code)
if __name__ == "__main__":
main()