py_atmo_data_wrapper/tests/test_real_connection.py
2025-07-14 17:56:57 +02:00

187 lines
No EOL
7.1 KiB
Python

#!/usr/bin/env python3
"""
Test de connexion réelle à l'API Atmo Data
Nécessite un fichier credentials.json valide
"""
from atmo_data_wrapper import AtmoDataClient, AtmoDataException
from datetime import datetime
def test_real_api_connection():
"""Test de connexion et requêtes réelles à l'API"""
print("=== Test de connexion réelle à l'API Atmo Data ===\n")
try:
# Initialisation du client
client = AtmoDataClient()
print("Client initialisé")
# Test de connexion automatique
print("Tentative de connexion avec credentials.json...")
success = client.auto_login()
if not success:
print("❌ Échec de l'authentification")
return False
print("✅ Connexion réussie !")
print(f"URL de l'API: {client.base_url}")
# Test 1: Récupération des indices ATMO
print("\n=== Test 1: Indices ATMO ===")
try:
indices = client.get_indices_atmo(format="geojson")
print(f"✅ Indices ATMO récupérés: {len(indices)} éléments")
# Afficher un échantillon
if len(indices) > 0:
first_item = indices[0]
print(f" Premier élément - Zone: {first_item.lib_zone}")
print(f" Qualité: {first_item.get_qualificatif()}")
print(f" AASQA: {first_item.get_aasqa_name()}")
# Statistiques
stats = indices.get_statistics()
print(f" Statistiques: {stats}")
except Exception as e:
print(f"❌ Erreur récupération indices: {e}")
# Test 2: Épisodes de pollution
print("\n=== Test 2: Épisodes de pollution ===")
try:
episodes = client.get_episodes_3jours(format="geojson")
print(f"✅ Épisodes récupérés: {len(episodes)} éléments")
if len(episodes) > 0:
alerts_actives = [ep for ep in episodes if ep.is_alert_active()]
print(f" Alertes actives: {len(alerts_actives)}")
for episode in alerts_actives[:3]: # Max 3 exemples
print(f" - {episode.lib_zone}: {episode.get_alert_level()} ({episode.lib_pol})")
except Exception as e:
print(f"❌ Erreur récupération épisodes: {e}")
# Test 3: Données d'émissions (région)
print("\n=== Test 3: Données d'émissions ===")
try:
emissions = client.get_emissions(
echelle="region",
format="geojson"
)
print(f"✅ Données d'émissions récupérées: {len(emissions)} éléments")
if len(emissions) > 0:
first_emission = emissions[0]
print(f" Premier territoire: {first_emission.name}")
total_em = first_emission.get_total_emissions()
print(f" NOx: {total_em['NOx']:.1f} t/an")
except Exception as e:
print(f"❌ Erreur récupération émissions: {e}")
# Test 4: Indices pollen
print("\n=== Test 4: Indices pollen ===")
try:
pollens = client.get_indices_pollens(format="geojson")
print(f"✅ Indices pollen récupérés: {len(pollens)} éléments")
if len(pollens) > 0:
alerts_pollen = [p for p in pollens if p.is_alert_active()]
print(f" Alertes pollen actives: {len(alerts_pollen)}")
for pollen in alerts_pollen[:3]: # Max 3 exemples
dangerous = pollen.get_dangerous_pollens()
if dangerous:
print(f" - Pollens à risque: {', '.join(dangerous)}")
except Exception as e:
print(f"❌ Erreur récupération pollens: {e}")
# Test 5: Filtrage géographique (Paris)
print("\n=== Test 5: Filtrage géographique ===")
try:
# Bounding box de Paris
bbox = "2.2 48.8 2.4 48.9"
indices_paris = client.get_indices_atmo(
bounding_box=bbox,
format="geojson"
)
print(f"✅ Indices Paris récupérés: {len(indices_paris)} éléments")
# Filtrage par proximité
if len(indices_paris) > 0:
from atmo_data_wrapper import Coordinates
paris_center = Coordinates(2.3522, 48.8566)
nearby = indices_paris.filter_by_coordinates(paris_center, 10.0)
print(f" Dans un rayon de 10km: {len(nearby)} éléments")
except Exception as e:
print(f"❌ Erreur filtrage géographique: {e}")
print("\n✅ Tests terminés avec succès !")
return True
except AtmoDataException as e:
print(f"❌ Erreur API: {e}")
return False
except Exception as e:
print(f"❌ Erreur inattendue: {e}")
return False
def test_credentials_file():
"""Test de présence et validité du fichier credentials"""
print("=== Vérification du fichier credentials ===\n")
import os
import json
credentials_file = "credentials.json"
example_file = "credentials.json.example"
# Vérifier la présence du fichier exemple
if os.path.exists(example_file):
print(f"✅ Fichier exemple trouvé: {example_file}")
else:
print(f"❌ Fichier exemple manquant: {example_file}")
# Vérifier la présence du fichier credentials
if os.path.exists(credentials_file):
print(f"✅ Fichier credentials trouvé: {credentials_file}")
try:
with open(credentials_file, 'r') as f:
creds = json.load(f)
required_fields = ['username', 'password']
missing_fields = [field for field in required_fields if field not in creds]
if missing_fields:
print(f"❌ Champs manquants: {missing_fields}")
else:
print("✅ Structure du fichier credentials valide")
# Masquer les credentials sensibles
safe_creds = {k: "***" if k in ['password'] else v for k, v in creds.items()}
print(f" Contenu: {safe_creds}")
except json.JSONDecodeError as e:
print(f"❌ Erreur de format JSON: {e}")
else:
print(f"❌ Fichier credentials manquant: {credentials_file}")
print(f"💡 Créez le fichier à partir de {example_file}")
print(" 1. Copiez credentials.json.example vers credentials.json")
print(" 2. Remplacez les valeurs par vos vrais identifiants")
if __name__ == "__main__":
# Test du fichier credentials d'abord
test_credentials_file()
print()
# Puis test de connexion réelle si possible
if input("Tester la connexion réelle à l'API ? (y/n): ").lower() == 'y':
test_real_api_connection()
else:
print("Test de connexion ignoré.")