#!/usr/bin/env python3
"""
Version: 1.1.0 (2024-01-12)
Auteur: kahina et franck - Groupe 2
Contact: bigmoletos@yopmail.com
Script de surveillance et sauvegarde automatique.
Ce module:
- Surveille les modifications du projet en temps réel
- Gère les sauvegardes planifiées avec cron
- Génère des résumés de modifications hiérarchiques
- Crée des archives ZIP datées et vérifiées
- Exclut les fichiers spécifiés dans files_ignore.txt de la surveillance
Architecture:
------------
1. Surveillance:
- Détection des modifications avec watchdog
- Classification hiérarchique des changements
- Filtrage intelligent des événements via files_ignore.txt
- Horodatage précis des modifications
2. Planification:
- Cron toutes les 3 heures (7h, 10h, 13h, 16h, 19h)
- Plage horaire optimisée (7h-20h)
- Gestion des week-ends et jours fériés
- Cooldown entre les sauvegardes
3. Sauvegarde:
- Déclenchement du script de sauvegarde
- Compression ZIP avec vérification (inclut TOUS les fichiers)
- Résumé détaillé des modifications
- Validation de l'intégrité
4. Logging:
- Journal détaillé des événements
- Rotation des fichiers de logs
- Niveaux de gravité adaptés
- Format ISO pour les horodatages
Structure des fichiers:
---------------------
project_root/
├── surveillance_projet.py
├── sauve_projet_qualite_air.py
├── files_ignore.txt
├── surveillance.log
└── backup/
└── JJ_MM_AAAA_HHhMM/
├── projet_qualite_air.zip
└── resume_des_modifs.log
Dépendances:
-----------
- Python 3.8+
- python-crontab: Gestion des tâches planifiées
- watchdog: Surveillance du système de fichiers
- pathlib: Manipulation des chemins
- logging: Journalisation avancée
"""
import os
import sys
import time
import logging
from pathlib import Path
from datetime import datetime, timedelta
from crontab import CronTab
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
import fnmatch
# Configuration du logging avec rotation et encodage UTF-8
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='surveillance.log',
encoding='utf-8' # Support des caractères spéciaux
)
logger = logging.getLogger(__name__)
[docs]
def load_ignore_patterns():
"""
Charge les patterns à ignorer depuis le fichier files_ignore.txt.
Cette fonction:
1. Lit le fichier files_ignore.txt
2. Parse les patterns en ignorant les commentaires et lignes vides
3. Convertit les patterns en format glob
Returns:
set: Ensemble des patterns à ignorer
Notes:
- Ignore les lignes commençant par #
- Supprime les espaces en début/fin de ligne
- Convertit les patterns Windows en format Unix
"""
try:
ignore_patterns = set()
ignore_file = Path(__file__).parent / "files_ignore.txt"
if not ignore_file.exists():
logger.warning("files_ignore.txt non trouvé, utilisation des patterns par défaut")
return {
'.git/', '__pycache__/', '*.pyc', '*.log',
'.venv/', 'venv/', '.idea/', '.vscode/'
}
with ignore_file.open('r', encoding='utf-8') as f:
for line in f:
# Ignorer les commentaires et lignes vides
line = line.strip()
if line and not line.startswith('#'):
# Convertir les patterns Windows en Unix
pattern = line.replace('\\', '/')
# Supprimer le / final des dossiers si présent
pattern = pattern.rstrip('/')
ignore_patterns.add(pattern)
logger.info(f"Chargement de {len(ignore_patterns)} patterns d'exclusion")
return ignore_patterns
except Exception as e:
logger.error(f"Erreur lors du chargement des patterns d'exclusion: {e}")
# Patterns de secours en cas d'erreur
return {
'.git/', '__pycache__/', '*.pyc', '*.log',
'.venv/', 'venv/', '.idea/', '.vscode/'
}
[docs]
class ProjectChangeHandler(FileSystemEventHandler):
"""
Gestionnaire des événements de modification de fichiers.
"""
[docs]
def __init__(self, project_path):
super().__init__()
self.project_path = Path(project_path)
self.ignore_patterns = self.load_ignore_patterns()
self.last_backup_time = datetime.now()
self.changes_count = 0
self.min_backup_interval = timedelta(hours=3) # Intervalle minimum entre les sauvegardes
self.min_changes_required = 5 # Nombre minimum de modifications avant une sauvegarde
logger.info(f"🔍 Surveillance initialisée pour {self.project_path}")
[docs]
def load_ignore_patterns(self):
"""
Charge les patterns à ignorer depuis le fichier files_ignore.txt.
Returns:
set: Ensemble des patterns à ignorer
"""
try:
ignore_patterns = set()
ignore_file = Path(__file__).parent / "files_ignore.txt"
if not ignore_file.exists():
logger.warning("files_ignore.txt non trouvé, utilisation des patterns par défaut")
return {
'.git/', '__pycache__/', '*.pyc', '*.log',
'.venv/', 'venv/', '.idea/', '.vscode/'
}
with ignore_file.open('r', encoding='utf-8') as f:
for line in f:
# Ignorer les commentaires et lignes vides
line = line.strip()
if line and not line.startswith('#'):
# Convertir les patterns Windows en Unix
pattern = line.replace('\\', '/')
# Supprimer le / final des dossiers si présent
pattern = pattern.rstrip('/')
ignore_patterns.add(pattern)
logger.info(f"Chargement de {len(ignore_patterns)} patterns d'exclusion")
return ignore_patterns
except Exception as e:
logger.error(f"Erreur lors du chargement des patterns d'exclusion: {e}")
# Patterns de secours en cas d'erreur
return {
'.git/', '__pycache__/', '*.pyc', '*.log',
'.venv/', 'venv/', '.idea/', '.vscode/'
}
[docs]
def should_ignore(self, file_path):
"""
Vérifie si un fichier doit être ignoré selon les patterns.
Args:
file_path (Path): Chemin du fichier à vérifier
Returns:
bool: True si le fichier doit être ignoré
"""
path_str = str(file_path).replace('\\', '/')
# Vérifier les correspondances exactes
if path_str in self.ignore_patterns:
return True
# Vérifier les patterns avec wildcards
for pattern in self.ignore_patterns:
if '*' in pattern:
if fnmatch.fnmatch(path_str, pattern):
return True
elif pattern in path_str:
return True
return False
[docs]
def on_modified(self, event):
if not event.is_directory:
try:
file_path = Path(event.src_path)
if self.should_ignore(file_path):
return
rel_path = file_path.relative_to(self.project_path)
logger.info(f"📄 Fichier modifié: {rel_path}")
self.changes_count += 1
# Vérifier si on doit déclencher une sauvegarde
time_since_last_backup = datetime.now() - self.last_backup_time
if (time_since_last_backup >= self.min_backup_interval and
self.changes_count >= self.min_changes_required):
self.trigger_backup()
self.changes_count = 0 # Réinitialiser le compteur
self.last_backup_time = datetime.now()
else:
remaining_time = self.min_backup_interval - time_since_last_backup
logger.info(f"⏳ En attente pour la prochaine sauvegarde: {remaining_time.seconds//3600}h {(remaining_time.seconds//60)%60}m")
logger.info(f"📊 Modifications en attente: {self.changes_count}/{self.min_changes_required}")
except Exception as e:
logger.error(f"❌ Erreur lors du traitement de la modification: {str(e)}")
[docs]
def trigger_backup(self):
try:
logger.info("🔄 Déclenchement de la sauvegarde...")
backup_script = self.project_path / "sauve_projet_qualite_air.py"
if not backup_script.exists():
raise FileNotFoundError(f"Script de sauvegarde non trouvé: {backup_script}")
result = subprocess.run([sys.executable, str(backup_script)],
capture_output=True,
text=True)
if result.returncode == 0:
logger.info("✅ Sauvegarde déclenchée avec succès")
else:
logger.error(f"❌ Erreur lors de la sauvegarde: {result.stderr}")
except Exception as e:
logger.error(f"❌ Erreur lors du déclenchement de la sauvegarde: {str(e)}")
[docs]
def verify_cron_status():
"""
Vérifie le statut du cron et son bon fonctionnement.
Cette fonction:
1. Vérifie l'existence de la tâche cron
2. Valide les horaires configurés
3. Contrôle le dernier déclenchement
4. Génère un rapport de statut
Returns:
bool: True si le cron fonctionne correctement
"""
try:
cron = CronTab(user=True)
jobs = list(cron.find_comment('surveillance_projet'))
if not jobs:
logger.error("Tâche cron non trouvée - réinstallation nécessaire")
return False
job = jobs[0]
if not job.is_valid():
logger.error("Configuration cron invalide")
return False
# Vérifier le dernier fichier de log
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
cron_log = log_dir / "cron_execution.log"
if cron_log.exists():
last_execution = datetime.fromtimestamp(cron_log.stat().st_mtime)
hours_since_last = (datetime.now() - last_execution).total_seconds() / 3600
if hours_since_last > 4: # Plus de 4 heures sans exécution
logger.warning(f"Dernier déclenchement il y a {hours_since_last:.1f}h")
return False
logger.info("Cron fonctionnel et actif")
return True
except Exception as e:
logger.error(f"Erreur lors de la vérification du cron: {e}")
return False
[docs]
def log_cron_execution():
"""
Enregistre l'exécution du cron dans un fichier de log dédié.
"""
try:
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
cron_log = log_dir / "cron_execution.log"
with cron_log.open('a', encoding='utf-8') as f:
f.write(f"{datetime.now().isoformat()}: Exécution cron\n")
except Exception as e:
logger.error(f"Erreur lors de l'enregistrement de l'exécution cron: {e}")
[docs]
def setup_scheduler():
"""
Configure la planification des sauvegardes selon le système d'exploitation.
Cette fonction:
1. Détecte le système d'exploitation
2. Configure soit Task Scheduler (Windows) soit Cron (Linux)
3. Planifie les sauvegardes toutes les 3 heures
4. Configure le démarrage au boot
5. Vérifie la configuration
Notes:
- Windows: Utilise Task Scheduler via subprocess
- Linux: Utilise python-crontab
- Plage horaire: 7h-20h
- Vérifie les permissions
"""
import platform
system = platform.system().lower()
if system == 'windows':
setup_windows_scheduler()
elif system in ('linux', 'darwin'):
setup_linux_cron()
else:
raise OSError(f"Système d'exploitation non supporté: {system}")
[docs]
def setup_windows_scheduler():
"""
Configure le planificateur de tâches Windows.
Cette fonction:
1. Crée un dossier personnalisé dans le planificateur
2. Configure les tâches planifiées dans ce dossier
3. Active le démarrage automatique
4. Vérifie les permissions
"""
try:
# Chemins absolus
script_path = Path(__file__).resolve()
work_dir = script_path.parent
python_exe = Path(sys.executable).resolve()
folder_name = "scripts_franck"
logger.info(f"Python: {python_exe}")
logger.info(f"Script: {script_path}")
logger.info(f"Dossier: {work_dir}")
# Supprimer les anciennes tâches si elles existent
for hour in range(7, 21, 3):
task_name = f"{folder_name}\\SurveillanceProjetQualiteAir_{hour}"
try:
subprocess.run(['schtasks', '/delete', '/tn', task_name, '/f'],
capture_output=True)
except:
pass
# Créer les nouvelles tâches
for hour in range(7, 21, 3): # 7h, 10h, 13h, 16h, 19h
task_time = f"{hour:02d}:00"
task_name = f"{folder_name}\\SurveillanceProjetQualiteAir_{hour}"
# Commande simplifiée avec répertoire de travail
command = f'cmd /c cd /d "{work_dir}" && "{python_exe}" "{script_path}"'
cmd = [
'schtasks', '/create', '/tn', task_name,
'/tr', command,
'/sc', 'daily',
'/st', task_time,
'/ru', 'SYSTEM',
'/rl', 'HIGHEST',
'/f' # Forcer la création
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Erreur création tâche {task_time}: {result.stderr}")
logger.error(f"Commande: {' '.join(cmd)}")
else:
logger.info(f"Tâche planifiée créée pour {task_time}")
# Configurer le démarrage au boot
boot_task_name = f"{folder_name}\\SurveillanceProjetQualiteAir_Boot"
boot_cmd = [
'schtasks', '/create', '/tn', boot_task_name,
'/tr', f'cmd /c cd /d "{work_dir}" && "{python_exe}" "{script_path}"',
'/sc', 'onstart',
'/ru', 'SYSTEM',
'/rl', 'HIGHEST',
'/f'
]
result = subprocess.run(boot_cmd, capture_output=True, text=True)
if result.returncode == 0:
logger.info("Tâche de démarrage configurée")
else:
logger.error(f"Erreur configuration démarrage: {result.stderr}")
except Exception as e:
logger.error(f"Erreur configuration Task Scheduler: {e}")
raise
[docs]
def setup_linux_cron():
"""
Configure le cron sous Linux/Unix.
Cette fonction:
1. Utilise python-crontab pour la configuration
2. Configure les sauvegardes périodiques
3. Ajoute une entrée @reboot
4. Vérifie les permissions
"""
try:
from crontab import CronTab
cron = CronTab(user=True)
# Supprimer les anciennes tâches
cron.remove_all(comment='surveillance_projet')
# Créer la tâche principale
job = cron.new(
command=f'{sys.executable} {__file__}',
comment='surveillance_projet'
)
job.setall('0 7-20/3 * * *') # Toutes les 3h entre 7h et 20h
# Ajouter la tâche de démarrage
reboot_job = cron.new(
command=f'{sys.executable} {__file__}',
comment='surveillance_projet_reboot'
)
reboot_job.every_reboot()
# Vérifier et sauvegarder
if job.is_valid() and reboot_job.is_valid():
cron.write()
logger.info("Configuration cron mise à jour")
else:
raise ValueError("Configuration cron invalide")
except Exception as e:
logger.error(f"Erreur configuration cron: {e}")
raise
[docs]
def verify_scheduler_status():
"""
Vérifie l'état du planificateur selon le système d'exploitation.
Returns:
bool: True si la planification est active et correcte
"""
import platform
system = platform.system().lower()
if system == 'windows':
return verify_windows_scheduler()
elif system in ('linux', 'darwin'):
return verify_linux_cron()
else:
logger.error(f"Système non supporté: {system}")
return False
[docs]
def verify_windows_scheduler():
"""
Vérifie l'état des tâches planifiées Windows.
"""
try:
folder_name = "scripts_franck"
# Vérifier chaque tâche planifiée
for hour in range(7, 21, 3):
task_name = f"{folder_name}\\SurveillanceProjetQualiteAir_{hour}"
cmd = ['schtasks', '/query', '/tn', task_name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Tâche {hour}h non trouvée")
return False
# Vérifier la tâche de démarrage
boot_task_name = f"{folder_name}\\SurveillanceProjetQualiteAir_Boot"
boot_check = subprocess.run(
['schtasks', '/query', '/tn', boot_task_name],
capture_output=True
)
if boot_check.returncode != 0:
logger.error("Tâche de démarrage non trouvée")
return False
return True
except Exception as e:
logger.error(f"Erreur vérification Task Scheduler: {e}")
return False
[docs]
def verify_linux_cron():
"""
Vérifie l'état des tâches cron sous Linux/Unix.
"""
try:
from crontab import CronTab
cron = CronTab(user=True)
jobs = list(cron.find_comment('surveillance_projet'))
if not jobs:
logger.error("Tâches cron non trouvées")
return False
for job in jobs:
if not job.is_valid():
logger.error("Configuration cron invalide")
return False
return True
except Exception as e:
logger.error(f"Erreur vérification cron: {e}")
return False
[docs]
def main():
"""
Fonction principale de surveillance.
"""
try:
# Ajouter un en-tête de démarrage plus visible
logger.info("="*50)
logger.info("DÉMARRAGE DU SERVICE DE SURVEILLANCE")
logger.info("="*50)
# Logging du contexte d'exécution
logger.info(f"Utilisateur courant: {os.getenv('USERNAME')}")
logger.info(f"Répertoire courant: {os.getcwd()}")
logger.info(f"Python: {sys.executable}")
# Configurer le cron
logger.info("Configuration du planificateur...")
setup_scheduler()
# Enregistrer l'exécution
log_cron_execution()
logger.info("Exécution enregistrée dans cron_execution.log")
# Vérifier le statut initial du cron
logger.info("Vérification du statut du planificateur...")
if not verify_scheduler_status():
logger.warning("Problème détecté - tentative de réinitialisation")
setup_scheduler()
else:
logger.info("Statut du planificateur OK")
# Chemin du projet à surveiller
project_path = Path(r"C:\AJC_projets\projet_qualite_air")
logger.info(f"Chemin du projet: {project_path}")
# Vérifier l'existence du dossier
if not project_path.exists():
raise FileNotFoundError(f"Dossier projet non trouvé: {project_path}")
logger.info("Dossier projet trouvé")
# Créer l'observateur
logger.info("Initialisation de l'observateur...")
event_handler = ProjectChangeHandler(project_path)
observer = Observer()
observer.schedule(event_handler, str(project_path), recursive=True)
logger.info(f"Démarrage de la surveillance de {project_path}")
observer.start()
logger.info("Observateur démarré avec succès")
last_cron_check = datetime.now()
try:
logger.info("Entrée dans la boucle de surveillance")
while True:
time.sleep(1)
# Vérifier le cron toutes les heures
if (datetime.now() - last_cron_check).total_seconds() > 3600:
logger.info("Vérification horaire du planificateur...")
if not verify_scheduler_status():
logger.warning("Problème détecté - tentative de réinitialisation")
setup_scheduler()
else:
logger.info("Statut du planificateur OK")
last_cron_check = datetime.now()
except KeyboardInterrupt:
logger.info("Arrêt demandé par l'utilisateur")
observer.stop()
logger.info("Surveillance arrêtée proprement")
observer.join()
except Exception as e:
logger.error(f"Erreur critique lors de la surveillance: {e}")
logger.exception("Détails de l'erreur:")
raise
finally:
logger.info("="*50)
logger.info("SERVICE DE SURVEILLANCE TERMINÉ")
logger.info("="*50)
if __name__ == "__main__":
main()