#!/usr/bin/env python3 """ Test de connexion réelle à l'API Atmo Data Nécessite un fichier credentials.json valide """ from atmo_data_wrapper import AtmoDataClient, AtmoDataException from datetime import datetime def test_real_api_connection(): """Test de connexion et requêtes réelles à l'API""" print("=== Test de connexion réelle à l'API Atmo Data ===\n") try: # Initialisation du client client = AtmoDataClient() print("Client initialisé") # Test de connexion automatique print("Tentative de connexion avec credentials.json...") success = client.auto_login() if not success: print("❌ Échec de l'authentification") return False print("✅ Connexion réussie !") print(f"URL de l'API: {client.base_url}") # Test 1: Récupération des indices ATMO print("\n=== Test 1: Indices ATMO ===") try: indices = client.get_indices_atmo(format="geojson") print(f"✅ Indices ATMO récupérés: {len(indices)} éléments") # Afficher un échantillon if len(indices) > 0: first_item = indices[0] print(f" Premier élément - Zone: {first_item.lib_zone}") print(f" Qualité: {first_item.get_qualificatif()}") print(f" AASQA: {first_item.get_aasqa_name()}") # Statistiques stats = indices.get_statistics() print(f" Statistiques: {stats}") except Exception as e: print(f"❌ Erreur récupération indices: {e}") # Test 2: Épisodes de pollution print("\n=== Test 2: Épisodes de pollution ===") try: episodes = client.get_episodes_3jours(format="geojson") print(f"✅ Épisodes récupérés: {len(episodes)} éléments") if len(episodes) > 0: alerts_actives = [ep for ep in episodes if ep.is_alert_active()] print(f" Alertes actives: {len(alerts_actives)}") for episode in alerts_actives[:3]: # Max 3 exemples print(f" - {episode.lib_zone}: {episode.get_alert_level()} ({episode.lib_pol})") except Exception as e: print(f"❌ Erreur récupération épisodes: {e}") # Test 3: Données d'émissions (région) print("\n=== Test 3: Données d'émissions ===") try: emissions = client.get_emissions( echelle="region", format="geojson" ) print(f"✅ Données d'émissions récupérées: {len(emissions)} éléments") if len(emissions) > 0: first_emission = emissions[0] print(f" Premier territoire: {first_emission.name}") total_em = first_emission.get_total_emissions() print(f" NOx: {total_em['NOx']:.1f} t/an") except Exception as e: print(f"❌ Erreur récupération émissions: {e}") # Test 4: Indices pollen print("\n=== Test 4: Indices pollen ===") try: pollens = client.get_indices_pollens(format="geojson") print(f"✅ Indices pollen récupérés: {len(pollens)} éléments") if len(pollens) > 0: alerts_pollen = [p for p in pollens if p.is_alert_active()] print(f" Alertes pollen actives: {len(alerts_pollen)}") for pollen in alerts_pollen[:3]: # Max 3 exemples dangerous = pollen.get_dangerous_pollens() if dangerous: print(f" - Pollens à risque: {', '.join(dangerous)}") except Exception as e: print(f"❌ Erreur récupération pollens: {e}") # Test 5: Filtrage géographique (Paris) print("\n=== Test 5: Filtrage géographique ===") try: # Bounding box de Paris bbox = "2.2 48.8 2.4 48.9" indices_paris = client.get_indices_atmo( bounding_box=bbox, format="geojson" ) print(f"✅ Indices Paris récupérés: {len(indices_paris)} éléments") # Filtrage par proximité if len(indices_paris) > 0: from atmo_data_wrapper import Coordinates paris_center = Coordinates(2.3522, 48.8566) nearby = indices_paris.filter_by_coordinates(paris_center, 10.0) print(f" Dans un rayon de 10km: {len(nearby)} éléments") except Exception as e: print(f"❌ Erreur filtrage géographique: {e}") print("\n✅ Tests terminés avec succès !") return True except AtmoDataException as e: print(f"❌ Erreur API: {e}") return False except Exception as e: print(f"❌ Erreur inattendue: {e}") return False def test_credentials_file(): """Test de présence et validité du fichier credentials""" print("=== Vérification du fichier credentials ===\n") import os import json credentials_file = "credentials.json" example_file = "credentials.json.example" # Vérifier la présence du fichier exemple if os.path.exists(example_file): print(f"✅ Fichier exemple trouvé: {example_file}") else: print(f"❌ Fichier exemple manquant: {example_file}") # Vérifier la présence du fichier credentials if os.path.exists(credentials_file): print(f"✅ Fichier credentials trouvé: {credentials_file}") try: with open(credentials_file, 'r') as f: creds = json.load(f) required_fields = ['username', 'password'] missing_fields = [field for field in required_fields if field not in creds] if missing_fields: print(f"❌ Champs manquants: {missing_fields}") else: print("✅ Structure du fichier credentials valide") # Masquer les credentials sensibles safe_creds = {k: "***" if k in ['password'] else v for k, v in creds.items()} print(f" Contenu: {safe_creds}") except json.JSONDecodeError as e: print(f"❌ Erreur de format JSON: {e}") else: print(f"❌ Fichier credentials manquant: {credentials_file}") print(f"💡 Créez le fichier à partir de {example_file}") print(" 1. Copiez credentials.json.example vers credentials.json") print(" 2. Remplacez les valeurs par vos vrais identifiants") if __name__ == "__main__": # Test du fichier credentials d'abord test_credentials_file() print() # Puis test de connexion réelle si possible if input("Tester la connexion réelle à l'API ? (y/n): ").lower() == 'y': test_real_api_connection() else: print("Test de connexion ignoré.")