187 lines
No EOL
7.1 KiB
Python
187 lines
No EOL
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test de connexion réelle à l'API Atmo Data
|
|
Nécessite un fichier credentials.json valide
|
|
"""
|
|
|
|
from atmo_data_wrapper import AtmoDataClient, AtmoDataException
|
|
from datetime import datetime
|
|
|
|
def test_real_api_connection():
|
|
"""Test de connexion et requêtes réelles à l'API"""
|
|
print("=== Test de connexion réelle à l'API Atmo Data ===\n")
|
|
|
|
try:
|
|
# Initialisation du client
|
|
client = AtmoDataClient()
|
|
print("Client initialisé")
|
|
|
|
# Test de connexion automatique
|
|
print("Tentative de connexion avec credentials.json...")
|
|
success = client.auto_login()
|
|
|
|
if not success:
|
|
print("❌ Échec de l'authentification")
|
|
return False
|
|
|
|
print("✅ Connexion réussie !")
|
|
print(f"URL de l'API: {client.base_url}")
|
|
|
|
# Test 1: Récupération des indices ATMO
|
|
print("\n=== Test 1: Indices ATMO ===")
|
|
try:
|
|
indices = client.get_indices_atmo(format="geojson")
|
|
print(f"✅ Indices ATMO récupérés: {len(indices)} éléments")
|
|
|
|
# Afficher un échantillon
|
|
if len(indices) > 0:
|
|
first_item = indices[0]
|
|
print(f" Premier élément - Zone: {first_item.lib_zone}")
|
|
print(f" Qualité: {first_item.get_qualificatif()}")
|
|
print(f" AASQA: {first_item.get_aasqa_name()}")
|
|
|
|
# Statistiques
|
|
stats = indices.get_statistics()
|
|
print(f" Statistiques: {stats}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Erreur récupération indices: {e}")
|
|
|
|
# Test 2: Épisodes de pollution
|
|
print("\n=== Test 2: Épisodes de pollution ===")
|
|
try:
|
|
episodes = client.get_episodes_3jours(format="geojson")
|
|
print(f"✅ Épisodes récupérés: {len(episodes)} éléments")
|
|
|
|
if len(episodes) > 0:
|
|
alerts_actives = [ep for ep in episodes if ep.is_alert_active()]
|
|
print(f" Alertes actives: {len(alerts_actives)}")
|
|
|
|
for episode in alerts_actives[:3]: # Max 3 exemples
|
|
print(f" - {episode.lib_zone}: {episode.get_alert_level()} ({episode.lib_pol})")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Erreur récupération épisodes: {e}")
|
|
|
|
# Test 3: Données d'émissions (région)
|
|
print("\n=== Test 3: Données d'émissions ===")
|
|
try:
|
|
emissions = client.get_emissions(
|
|
echelle="region",
|
|
format="geojson"
|
|
)
|
|
print(f"✅ Données d'émissions récupérées: {len(emissions)} éléments")
|
|
|
|
if len(emissions) > 0:
|
|
first_emission = emissions[0]
|
|
print(f" Premier territoire: {first_emission.name}")
|
|
total_em = first_emission.get_total_emissions()
|
|
print(f" NOx: {total_em['NOx']:.1f} t/an")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Erreur récupération émissions: {e}")
|
|
|
|
# Test 4: Indices pollen
|
|
print("\n=== Test 4: Indices pollen ===")
|
|
try:
|
|
pollens = client.get_indices_pollens(format="geojson")
|
|
print(f"✅ Indices pollen récupérés: {len(pollens)} éléments")
|
|
|
|
if len(pollens) > 0:
|
|
alerts_pollen = [p for p in pollens if p.is_alert_active()]
|
|
print(f" Alertes pollen actives: {len(alerts_pollen)}")
|
|
|
|
for pollen in alerts_pollen[:3]: # Max 3 exemples
|
|
dangerous = pollen.get_dangerous_pollens()
|
|
if dangerous:
|
|
print(f" - Pollens à risque: {', '.join(dangerous)}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Erreur récupération pollens: {e}")
|
|
|
|
# Test 5: Filtrage géographique (Paris)
|
|
print("\n=== Test 5: Filtrage géographique ===")
|
|
try:
|
|
# Bounding box de Paris
|
|
bbox = "2.2 48.8 2.4 48.9"
|
|
indices_paris = client.get_indices_atmo(
|
|
bounding_box=bbox,
|
|
format="geojson"
|
|
)
|
|
print(f"✅ Indices Paris récupérés: {len(indices_paris)} éléments")
|
|
|
|
# Filtrage par proximité
|
|
if len(indices_paris) > 0:
|
|
from atmo_data_wrapper import Coordinates
|
|
paris_center = Coordinates(2.3522, 48.8566)
|
|
nearby = indices_paris.filter_by_coordinates(paris_center, 10.0)
|
|
print(f" Dans un rayon de 10km: {len(nearby)} éléments")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Erreur filtrage géographique: {e}")
|
|
|
|
print("\n✅ Tests terminés avec succès !")
|
|
return True
|
|
|
|
except AtmoDataException as e:
|
|
print(f"❌ Erreur API: {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Erreur inattendue: {e}")
|
|
return False
|
|
|
|
def test_credentials_file():
|
|
"""Test de présence et validité du fichier credentials"""
|
|
print("=== Vérification du fichier credentials ===\n")
|
|
|
|
import os
|
|
import json
|
|
|
|
credentials_file = "credentials.json"
|
|
example_file = "credentials.json.example"
|
|
|
|
# Vérifier la présence du fichier exemple
|
|
if os.path.exists(example_file):
|
|
print(f"✅ Fichier exemple trouvé: {example_file}")
|
|
else:
|
|
print(f"❌ Fichier exemple manquant: {example_file}")
|
|
|
|
# Vérifier la présence du fichier credentials
|
|
if os.path.exists(credentials_file):
|
|
print(f"✅ Fichier credentials trouvé: {credentials_file}")
|
|
|
|
try:
|
|
with open(credentials_file, 'r') as f:
|
|
creds = json.load(f)
|
|
|
|
required_fields = ['username', 'password']
|
|
missing_fields = [field for field in required_fields if field not in creds]
|
|
|
|
if missing_fields:
|
|
print(f"❌ Champs manquants: {missing_fields}")
|
|
else:
|
|
print("✅ Structure du fichier credentials valide")
|
|
|
|
# Masquer les credentials sensibles
|
|
safe_creds = {k: "***" if k in ['password'] else v for k, v in creds.items()}
|
|
print(f" Contenu: {safe_creds}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"❌ Erreur de format JSON: {e}")
|
|
|
|
else:
|
|
print(f"❌ Fichier credentials manquant: {credentials_file}")
|
|
print(f"💡 Créez le fichier à partir de {example_file}")
|
|
print(" 1. Copiez credentials.json.example vers credentials.json")
|
|
print(" 2. Remplacez les valeurs par vos vrais identifiants")
|
|
|
|
if __name__ == "__main__":
|
|
# Test du fichier credentials d'abord
|
|
test_credentials_file()
|
|
print()
|
|
|
|
# Puis test de connexion réelle si possible
|
|
if input("Tester la connexion réelle à l'API ? (y/n): ").lower() == 'y':
|
|
test_real_api_connection()
|
|
else:
|
|
print("Test de connexion ignoré.") |