first commit

This commit is contained in:
mathieu 2025-07-14 17:56:57 +02:00
commit a233e18c0b
48 changed files with 55300 additions and 0 deletions

View file

@ -0,0 +1,109 @@
"""
Atmo Data Wrapper - Wrapper Python pour l'API Atmo Data
=====================================================
Ce package fournit un wrapper Python pour l'API Atmo Data (https://admindata.atmo-france.org),
permettant d'accéder facilement aux données de qualité de l'air et de pollution des
Associations agréées de surveillance de la qualité de l'air (AASQA) françaises.
Modules principaux:
- client: Client principal pour l'API
- models: Classes pour les données typées
- constants: Constantes et configurations
Usage:
from atmo_data_wrapper import AtmoDataClient
client = AtmoDataClient()
client.auto_login()
indices = client.get_indices_atmo()
"""
__version__ = "1.0.0"
__author__ = "Atmo Data Wrapper Team"
__email__ = "contact@atmo-france.org"
__description__ = "Wrapper Python pour l'API Atmo Data"
# Import des classes principales pour faciliter l'usage
from .core.client import AtmoDataClient
from .core.models import (
AtmoDataBase,
IndiceAtmo,
EpisodePollution,
EmissionData,
IndicePollen,
AtmoDataCollection,
Coordinates
)
from .core.constants import (
AASQA_CODES,
INDICES_ATMO,
INDICES_POLLENS,
CODE_COLOR_QUALIF,
CODE_POLLUANT,
CODE_TAXON,
SECTEURS_EMISSIONS,
POLLUANTS,
CODE_POLLUANT_EPISODES,
TAXON_MAPPING,
ATMO_LICENCE_COURTE,
ATMO_LICENCE_LONGUE,
ATMO_LICENCE_COMPLETE
)
from .core.utils import (
get_aasqa_by_department,
get_aasqa_info,
get_aasqa_website,
list_departments_by_aasqa,
search_aasqa_by_name,
get_departments_count,
validate_department_coverage,
get_aasqa_statistics,
get_atmo_licence,
print_atmo_licence
)
from .core.exceptions import AtmoDataException
__all__ = [
# Client principal
'AtmoDataClient',
# Classes de données
'AtmoDataBase',
'IndiceAtmo',
'EpisodePollution',
'EmissionData',
'IndicePollen',
'AtmoDataCollection',
'Coordinates',
# Constantes
'AASQA_CODES',
'INDICES_ATMO',
'INDICES_POLLENS',
'CODE_COLOR_QUALIF',
'CODE_POLLUANT',
'CODE_TAXON',
'SECTEURS_EMISSIONS',
'POLLUANTS',
'CODE_POLLUANT_EPISODES',
'TAXON_MAPPING',
'ATMO_LICENCE_COURTE',
'ATMO_LICENCE_LONGUE',
'ATMO_LICENCE_COMPLETE',
# Fonctions utilitaires AASQA
'get_aasqa_by_department',
'get_aasqa_info',
'get_aasqa_website',
'list_departments_by_aasqa',
'search_aasqa_by_name',
'get_departments_count',
'validate_department_coverage',
'get_aasqa_statistics',
'get_atmo_licence',
'print_atmo_licence',
# Exceptions
'AtmoDataException'
]

View file

@ -0,0 +1,77 @@
"""
Core module for Atmo Data Wrapper
"""
from .client import AtmoDataClient
from .models import (
AtmoDataBase,
IndiceAtmo,
EpisodePollution,
EmissionData,
IndicePollen,
AtmoDataCollection,
Coordinates
)
from .constants import (
AASQA_CODES,
INDICES_ATMO,
INDICES_POLLENS,
CODE_COLOR_QUALIF,
CODE_POLLUANT,
CODE_TAXON,
SECTEURS_EMISSIONS,
POLLUANTS,
CODE_POLLUANT_EPISODES,
TAXON_MAPPING,
ATMO_LICENCE_COURTE,
ATMO_LICENCE_LONGUE,
ATMO_LICENCE_COMPLETE
)
from .utils import (
get_aasqa_by_department,
get_aasqa_info,
get_aasqa_website,
list_departments_by_aasqa,
search_aasqa_by_name,
get_departments_count,
validate_department_coverage,
get_aasqa_statistics,
get_atmo_licence,
print_atmo_licence
)
from .exceptions import AtmoDataException
__all__ = [
'AtmoDataClient',
'AtmoDataBase',
'IndiceAtmo',
'EpisodePollution',
'EmissionData',
'IndicePollen',
'AtmoDataCollection',
'Coordinates',
'AASQA_CODES',
'INDICES_ATMO',
'INDICES_POLLENS',
'CODE_COLOR_QUALIF',
'CODE_POLLUANT',
'CODE_TAXON',
'SECTEURS_EMISSIONS',
'POLLUANTS',
'CODE_POLLUANT_EPISODES',
'TAXON_MAPPING',
'ATMO_LICENCE_COURTE',
'ATMO_LICENCE_LONGUE',
'ATMO_LICENCE_COMPLETE',
'get_aasqa_by_department',
'get_aasqa_info',
'get_aasqa_website',
'list_departments_by_aasqa',
'search_aasqa_by_name',
'get_departments_count',
'validate_department_coverage',
'get_aasqa_statistics',
'get_atmo_licence',
'print_atmo_licence',
'AtmoDataException'
]

View file

@ -0,0 +1,598 @@
"""
Wrapper Python pour l'API Atmo Data
API d'accès aux données de qualité de l'air des AASQA françaises
"""
import requests
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Union
import json
import re
import csv
import os
from pathlib import Path
from .constants import (
FORMATS_VALIDES, AASQA_CODES, POLLUANTS, SECTEURS_EMISSIONS,
INDICES_ATMO, INDICES_POLLENS, TYPES_EPISODES, ECHEANCES_VALIDES,
ECHELLES_VALIDES, DEFAULT_API_URL, DEFAULT_TIMEOUT, TOKEN_VALIDITY_HOURS,
SAVE_FORMATS, FILE_EXTENSIONS
)
from .models import AtmoDataCollection
from .exceptions import AtmoDataException
class AtmoDataClient:
"""Client pour l'API Atmo Data"""
def __init__(self, base_url: Optional[str] = None, credentials_file: Optional[str] = None):
self.credentials_file = credentials_file or "credentials.json"
self._credentials = None
# Déterminer l'URL de base : paramètre > credentials > défaut
if base_url:
self.base_url = base_url.rstrip('/')
else:
try:
credentials = self._load_credentials()
self.base_url = credentials.get('api_url', DEFAULT_API_URL).rstrip('/')
except:
self.base_url = DEFAULT_API_URL.rstrip('/')
self.session = requests.Session()
self.session.timeout = DEFAULT_TIMEOUT
self.token = None
def _validate_format(self, format_value: str) -> None:
"""Valide le format de sortie supportés par l'API"""
if format_value not in FORMATS_VALIDES:
raise ValueError(f"Format invalide: {format_value}. Formats valides: {FORMATS_VALIDES}")
def _validate_aasqa(self, aasqa: str) -> None:
"""Valide le code AASQA"""
if aasqa not in AASQA_CODES:
raise ValueError(f"Code AASQA invalide: {aasqa}. Codes valides: {list(AASQA_CODES.keys())}")
def _validate_polluant(self, polluant: str) -> None:
"""Valide le polluant"""
if polluant not in POLLUANTS:
raise ValueError(f"Polluant invalide: {polluant}. Polluants valides: {POLLUANTS}")
def _validate_date(self, date: str) -> None:
"""Valide le format de date (YYYY-MM-DD)"""
if not re.match(r'^\d{4}-\d{2}-\d{2}$', date):
raise ValueError(f"Format de date invalide: {date}. Format attendu: YYYY-MM-DD")
try:
datetime.strptime(date, '%Y-%m-%d')
except ValueError:
raise ValueError(f"Date invalide: {date}")
def _validate_code_qualificatif_atmo(self, code: str) -> None:
"""Valide le code qualificatif ATMO"""
if code not in INDICES_ATMO:
raise ValueError(f"Code qualificatif ATMO invalide: {code}. Codes valides: {list(INDICES_ATMO.keys())}")
def _validate_code_qualificatif_pollen(self, code: str) -> None:
"""Valide le code qualificatif pollen"""
if code not in INDICES_POLLENS:
raise ValueError(f"Code qualificatif pollen invalide: {code}. Codes valides: {list(INDICES_POLLENS.keys())}")
def _validate_type_episode(self, type_episode: str) -> None:
"""Valide le type d'épisode de pollution"""
if type_episode not in TYPES_EPISODES:
raise ValueError(f"Type d'épisode invalide: {type_episode}. Types valides: {TYPES_EPISODES}")
def _validate_echeance(self, echeance: str) -> None:
"""Valide l'échéance"""
if echeance not in ECHEANCES_VALIDES:
raise ValueError(f"Échéance invalide: {echeance}. Échéances valides: {ECHEANCES_VALIDES}")
def _validate_echelle(self, echelle: str) -> None:
"""Valide l'échelle"""
if echelle not in ECHELLES_VALIDES:
raise ValueError(f"Échelle invalide: {echelle}. Échelles valides: {ECHELLES_VALIDES}")
def _validate_secteur(self, secteur: str) -> None:
"""Valide le secteur d'émission"""
if secteur not in SECTEURS_EMISSIONS:
raise ValueError(f"Secteur invalide: {secteur}. Secteurs valides: {list(SECTEURS_EMISSIONS.keys())}")
def _validate_bounding_box(self, bounding_box: str) -> None:
"""Valide le format de la bounding box"""
parts = bounding_box.split()
if len(parts) != 4:
raise ValueError(f"Format de bounding box invalide: {bounding_box}. Format attendu: 'xmin ymin xmax ymax'")
try:
coords = [float(part) for part in parts]
if coords[0] >= coords[2] or coords[1] >= coords[3]:
raise ValueError("Bounding box invalide: xmin doit être < xmax et ymin doit être < ymax")
except ValueError as e:
if "invalid literal for float()" in str(e):
raise ValueError(f"Coordonnées de bounding box invalides: {bounding_box}")
raise
def _load_credentials(self) -> Dict[str, str]:
"""Charge les credentials depuis le fichier JSON"""
if self._credentials is not None:
return self._credentials
try:
with open(self.credentials_file, 'r', encoding='utf-8') as f:
self._credentials = json.load(f)
# Valider les champs requis
required_fields = ['username', 'password']
missing_fields = [field for field in required_fields if field not in self._credentials]
if missing_fields:
raise ValueError(f"Champs manquants dans {self.credentials_file}: {missing_fields}")
# Mettre à jour l'URL de base si spécifiée dans les credentials
if 'api_url' in self._credentials:
self.base_url = self._credentials['api_url'].rstrip('/')
return self._credentials
except FileNotFoundError:
raise AtmoDataException(
f"Fichier de credentials '{self.credentials_file}' non trouvé. "
f"Créez-le à partir de '{self.credentials_file}.example' ou utilisez login() avec username/password."
)
except json.JSONDecodeError as e:
raise AtmoDataException(f"Erreur de format JSON dans {self.credentials_file}: {e}")
def login(self, username: Optional[str] = None, password: Optional[str] = None) -> bool:
"""
Connexion à l'API et récupération du token JWT
Args:
username: Nom d'utilisateur (optionnel si fichier credentials.json existe)
password: Mot de passe (optionnel si fichier credentials.json existe)
Returns:
bool: True si la connexion réussit
"""
# Si pas de username/password fournis, charger depuis le fichier
if username is None or password is None:
credentials = self._load_credentials()
username = username or credentials['username']
password = password or credentials['password']
url = f"{self.base_url}/api/login"
data = {
"username": username,
"password": password
}
try:
response = self.session.post(url, json=data)
response.raise_for_status()
result = response.json()
if 'token' in result:
self.token = result['token']
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
return True
return False
except requests.exceptions.RequestException as e:
raise AtmoDataException(f"Erreur de connexion: {e}")
def auto_login(self) -> bool:
"""
Connexion automatique en utilisant le fichier de credentials
Returns:
bool: True si la connexion réussit
"""
return self.login()
def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Effectue une requête à l'API
Args:
endpoint: Endpoint de l'API
params: Paramètres de la requête
Returns:
Dict: Réponse de l'API
"""
if not self.token:
raise AtmoDataException("Token non disponible. Veuillez vous connecter avec login()")
url = f"{self.base_url}{endpoint}"
try:
response = self.session.get(url, params=params)
response.raise_for_status()
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
return response.json()
else:
return {'data': response.text}
except requests.exceptions.RequestException as e:
raise AtmoDataException(f"Erreur lors de la requête: {e}")
def get_indices_atmo(self,
format: str = "geojson",
date: Optional[str] = None,
date_historique: Optional[str] = None,
code_zone: Optional[str] = None,
aasqa: Optional[str] = None,
code_qualificatif: Optional[str] = None,
bounding_box: Optional[str] = None,
bounding_box_srs: Optional[str] = None) -> Union[Dict[str, Any], AtmoDataCollection]:
"""
Récupération des indices ATMO
Args:
format: Format de sortie ('geojson' ou 'csv')
date: Date au format YYYY-MM-DD
date_historique: Date de début pour une période
code_zone: Code INSEE (commune/EPCI)
aasqa: Code AASQA (01-94)
code_qualificatif: Valeur d'indice (0-7)
bounding_box: Bounding box "xmin ymin xmax ymax"
bounding_box_srs: EPSG de la bounding box
Returns:
Union[Dict, AtmoDataCollection]: Données des indices ATMO (Dict pour CSV, AtmoDataCollection pour GeoJSON)
"""
# Validation et construction des paramètres
self._validate_format(format)
params = {'format': format}
if date:
self._validate_date(date)
params['date'] = date
if date_historique:
self._validate_date(date_historique)
params['date_historique'] = date_historique
if code_zone:
params['code_zone'] = code_zone
if aasqa:
self._validate_aasqa(aasqa)
params['aasqa'] = aasqa
if code_qualificatif:
self._validate_code_qualificatif_atmo(code_qualificatif)
params['code_qualificatif'] = code_qualificatif
if bounding_box:
self._validate_bounding_box(bounding_box)
params['bounding_box'] = bounding_box
if bounding_box_srs:
params['bounding_box_srs'] = bounding_box_srs
result = self._make_request('/api/v2/data/indices/atmo', params)
# Retourner un objet typé pour GeoJSON, dict brut pour CSV
if format == 'geojson' and 'type' in result:
return AtmoDataCollection(result, 'indices')
return result
def get_episodes_3jours(self,
format: str = "geojson",
aasqa: Optional[str] = None,
code_zone: Optional[str] = None,
polluant: Optional[str] = None,
type_episode: Optional[str] = None,
echeance: Optional[str] = None,
bounding_box: Optional[str] = None,
bounding_box_srs: Optional[str] = None) -> Union[Dict[str, Any], AtmoDataCollection]:
"""
Récupération des épisodes de pollution sur 3 jours (J-1, J0, J+1)
Args:
format: Format de sortie ('geojson' ou 'csv')
aasqa: Code AASQA (01-94)
code_zone: Code INSEE (département ou bassin d'air)
polluant: Polluant ('NO2', 'SO2', 'PM10', 'PM2.5', 'O3')
type_episode: Type d'épisode
echeance: Échéance ('-1', '0', '1')
bounding_box: Bounding box "xmin ymin xmax ymax"
bounding_box_srs: EPSG de la bounding box
Returns:
Union[Dict, AtmoDataCollection]: Données des épisodes (Dict pour CSV, AtmoDataCollection pour GeoJSON)
"""
# Validation et construction des paramètres
self._validate_format(format)
params = {'format': format}
if aasqa:
self._validate_aasqa(aasqa)
params['aasqa'] = aasqa
if code_zone:
params['code_zone'] = code_zone
if polluant:
self._validate_polluant(polluant)
params['polluant'] = polluant
if type_episode:
self._validate_type_episode(type_episode)
params['type_episode'] = type_episode
if echeance:
self._validate_echeance(echeance)
params['echeance'] = echeance
if bounding_box:
self._validate_bounding_box(bounding_box)
params['bounding_box'] = bounding_box
if bounding_box_srs:
params['bounding_box_srs'] = bounding_box_srs
result = self._make_request('/api/v2/data/episodes/3jours', params)
# Retourner un objet typé pour GeoJSON, dict brut pour CSV
if format == 'geojson' and 'type' in result:
return AtmoDataCollection(result, 'episodes')
return result
def get_episodes_historique(self,
date: str,
format: str = "geojson",
aasqa: Optional[str] = None,
code_zone: Optional[str] = None,
polluant: Optional[str] = None,
type_episode: Optional[str] = None,
date_historique: Optional[str] = None,
bounding_box: Optional[str] = None,
bounding_box_srs: Optional[str] = None) -> Union[Dict[str, Any], AtmoDataCollection]:
"""
Récupération des épisodes de pollution historiques
Args:
date: Date de référence (obligatoire) au format YYYY-MM-DD
format: Format de sortie ('geojson' ou 'csv')
aasqa: Code AASQA (01-94)
code_zone: Code INSEE (département ou bassin d'air)
polluant: Polluant ('NO2', 'SO2', 'PM10', 'PM2.5', 'O3')
type_episode: Type d'épisode
date_historique: Date de début pour une période
bounding_box: Bounding box "xmin ymin xmax ymax"
bounding_box_srs: EPSG de la bounding box
Returns:
Union[Dict, AtmoDataCollection]: Données des épisodes historiques (Dict pour CSV, AtmoDataCollection pour GeoJSON)
"""
# Validation et construction des paramètres
self._validate_date(date)
self._validate_format(format)
params = {'date': date, 'format': format}
if aasqa:
self._validate_aasqa(aasqa)
params['aasqa'] = aasqa
if code_zone:
params['code_zone'] = code_zone
if polluant:
self._validate_polluant(polluant)
params['polluant'] = polluant
if type_episode:
self._validate_type_episode(type_episode)
params['type_episode'] = type_episode
if date_historique:
self._validate_date(date_historique)
params['date_historique'] = date_historique
if bounding_box:
self._validate_bounding_box(bounding_box)
params['bounding_box'] = bounding_box
if bounding_box_srs:
params['bounding_box_srs'] = bounding_box_srs
result = self._make_request('/api/v2/data/episodes/historique', params)
# Retourner un objet typé pour GeoJSON, dict brut pour CSV
if format == 'geojson' and 'type' in result:
return AtmoDataCollection(result, 'episodes')
return result
def get_emissions(self,
format: str = "geojson",
echelle: str = "region",
code_zone: Optional[str] = None,
aasqa: Optional[str] = None,
secteur: Optional[str] = None,
bounding_box: Optional[str] = None,
bounding_box_srs: Optional[str] = None) -> Union[Dict[str, Any], AtmoDataCollection]:
"""
Récupération des données d'inventaires des émissions
Args:
format: Format de sortie ('geojson' ou 'csv')
echelle: Échelle d'agrégation ('region' ou 'epci')
code_zone: Code zone (EPCI uniquement)
aasqa: Code AASQA (01-94)
secteur: Code secteur (5, 6, 7, 34, 219)
bounding_box: Bounding box "xmin ymin xmax ymax"
bounding_box_srs: EPSG de la bounding box
Returns:
Union[Dict, AtmoDataCollection]: Données des émissions (Dict pour CSV, AtmoDataCollection pour GeoJSON)
"""
# Validation et construction des paramètres
self._validate_format(format)
self._validate_echelle(echelle)
params = {'format': format, 'echelle': echelle}
if code_zone:
params['code_zone'] = code_zone
if aasqa:
self._validate_aasqa(aasqa)
params['aasqa'] = aasqa
if secteur:
self._validate_secteur(secteur)
params['secteur'] = secteur
if bounding_box:
self._validate_bounding_box(bounding_box)
params['bounding_box'] = bounding_box
if bounding_box_srs:
params['bounding_box_srs'] = bounding_box_srs
result = self._make_request('/api/v2/data/inventaires/emissions', params)
# Retourner un objet typé pour GeoJSON, dict brut pour CSV
if format == 'geojson' and 'type' in result:
return AtmoDataCollection(result, 'emissions')
return result
def get_indices_pollens(self,
format: str = "geojson",
date: Optional[str] = None,
date_historique: Optional[str] = None,
code_zone: Optional[str] = None,
aasqa: Optional[str] = None,
code_qualificatif: Optional[str] = None,
alerte: Optional[bool] = None,
with_geom: bool = False,
bounding_box: Optional[str] = None,
bounding_box_srs: Optional[str] = None) -> Union[Dict[str, Any], AtmoDataCollection]:
"""
Récupération des indices pollen
Args:
format: Format de sortie ('geojson' ou 'csv')
date: Date au format YYYY-MM-DD
date_historique: Date de début pour une période
code_zone: Code INSEE (commune)
aasqa: Code AASQA (01-94)
code_qualificatif: Valeur d'indice pollinique (0-6)
alerte: Filtre par statut d'alerte
with_geom: Inclure les géométries
bounding_box: Bounding box "xmin ymin xmax ymax"
bounding_box_srs: EPSG de la bounding box
Returns:
Union[Dict, AtmoDataCollection]: Données des indices pollen (Dict pour CSV, AtmoDataCollection pour GeoJSON)
"""
# Validation et construction des paramètres
self._validate_format(format)
params = {'format': format, 'with_geom': with_geom}
if date:
self._validate_date(date)
params['date'] = date
if date_historique:
self._validate_date(date_historique)
params['date_historique'] = date_historique
if code_zone:
params['code_zone'] = code_zone
if aasqa:
self._validate_aasqa(aasqa)
params['aasqa'] = aasqa
if code_qualificatif:
self._validate_code_qualificatif_pollen(code_qualificatif)
params['code_qualificatif'] = code_qualificatif
if alerte is not None:
params['alerte'] = alerte
if bounding_box:
self._validate_bounding_box(bounding_box)
params['bounding_box'] = bounding_box
if bounding_box_srs:
params['bounding_box_srs'] = bounding_box_srs
result = self._make_request('/api/v2/data/indices/pollens', params)
# Retourner un objet typé pour GeoJSON, dict brut pour CSV
if format == 'geojson' and 'type' in result:
return AtmoDataCollection(result, 'pollens')
return result
def _validate_save_format(self, file_format: str) -> None:
"""Valide le format de sauvegarde"""
if file_format not in SAVE_FORMATS:
raise ValueError(f"Format de sauvegarde invalide: {file_format}. Formats supportés: {SAVE_FORMATS}")
def _extract_features_to_csv_data(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extrait les features GeoJSON pour conversion en CSV"""
csv_data = []
if 'features' in data:
for feature in data['features']:
row = {}
# Ajouter les propriétés
if 'properties' in feature:
row.update(feature['properties'])
# Ajouter les coordonnées si présentes
if 'geometry' in feature and feature['geometry'] and 'coordinates' in feature['geometry']:
coords = feature['geometry']['coordinates']
if isinstance(coords, list) and len(coords) >= 2:
row['longitude'] = coords[0]
row['latitude'] = coords[1]
csv_data.append(row)
return csv_data
def save_to_file(self, data: Dict[str, Any], filename: str, file_format: str = 'json') -> str:
"""
Sauvegarde les données de l'API dans un fichier
Args:
data: Données retournées par l'API
filename: Nom du fichier (sans extension)
file_format: Format de sauvegarde ('json', 'csv', 'geojson')
Returns:
str: Chemin complet du fichier créé
Raises:
ValueError: Si le format n'est pas supporté
IOError: Si erreur lors de l'écriture du fichier
"""
self._validate_save_format(file_format)
# Ajouter l'extension appropriée
if not filename.endswith(FILE_EXTENSIONS[file_format]):
filename += FILE_EXTENSIONS[file_format]
# Créer le répertoire parent si nécessaire
filepath = Path(filename)
filepath.parent.mkdir(parents=True, exist_ok=True)
try:
if file_format == 'json':
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif file_format == 'geojson':
# Vérifier que c'est bien du GeoJSON
if 'type' not in data or data['type'] != 'FeatureCollection':
raise ValueError("Les données ne sont pas au format GeoJSON")
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif file_format == 'csv':
csv_data = self._extract_features_to_csv_data(data)
if not csv_data:
raise ValueError("Aucune donnée à convertir en CSV")
# Obtenir tous les champs possibles
fieldnames = set()
for row in csv_data:
fieldnames.update(row.keys())
fieldnames = sorted(fieldnames)
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(csv_data)
return str(filepath.absolute())
except ValueError:
# Re-lancer les erreurs de validation sans les enrober
raise
except Exception as e:
raise IOError(f"Erreur lors de la sauvegarde: {e}")
# AtmoDataException is now imported from .exceptions

View file

@ -0,0 +1,297 @@
'''
Constantes pour le wrapper API Atmo Data
'''
# Formats de sortie supportés par l'API
FORMATS_VALIDES = ['geojson', 'csv']
# Codes AASQA (Associations Agréées de Surveillance de la Qualité de l'Air)
# Source : https://www.atmo-france.org/article/laasqa-de-votre-region
# Structure enrichie : région, organisme, départements, site web
AASQA_CODES = {
'01': {
'region': 'Guadeloupe',
'organisme': 'Gwad\'Air',
'departements': ['971'],
'site_web': 'https://www.gwadair.gp',
'description': 'Guadeloupe | Gwad\'Air'
},
'02': {
'region': 'Martinique',
'organisme': 'Madininair',
'departements': ['972'],
'site_web': 'https://www.madininair.fr',
'description': 'Martinique | Madininair'
},
'03': {
'region': 'Guyane',
'organisme': 'Atmo Guyane',
'departements': ['973'],
'site_web': 'https://www.atmo-guyane.org',
'description': 'Guyane | Atmo Guyane'
},
'04': {
'region': 'La Réunion',
'organisme': 'Atmo Réunion',
'departements': ['974'],
'site_web': 'https://www.atmo-reunion.net',
'description': 'La Réunion | Atmo Réunion'
},
'06': {
'region': 'Mayotte',
'organisme': 'Hawa Mayotte',
'departements': ['976'],
'site_web': 'https://www.hawa-mayotte.org',
'description': 'Mayotte | Hawa Mayotte'
},
'11': {
'region': 'Île-de-France',
'organisme': 'Airparif',
'departements': ['75', '77', '78', '91', '92', '93', '94', '95'],
'site_web': 'https://www.airparif.asso.fr',
'description': 'Île-de-France | Airparif'
},
'24': {
'region': 'Centre-Val de Loire',
'organisme': 'Lig\'Air',
'departements': ['18', '28', '36', '37', '41', '45'],
'site_web': 'https://www.ligair.fr',
'description': 'Centre-Val de Loire | Lig\'Air'
},
'27': {
'region': 'Bourgogne-Franche-Comté',
'organisme': 'Atmo Bourgogne-Franche-Comté',
'departements': ['21', '25', '39', '58', '70', '71', '89', '90'],
'site_web': 'https://www.atmo-bfc.org',
'description': 'Bourgogne-Franche-Comté | Atmo Bourgogne-Franche-Comté'
},
'28': {
'region': 'Normandie',
'organisme': 'Atmo Normandie',
'departements': ['14', '27', '50', '61', '76'],
'site_web': 'https://www.atmo-normandie.fr',
'description': 'Normandie | Atmo Normandie'
},
'32': {
'region': 'Hauts-de-France',
'organisme': 'Atmo Hauts-de-France',
'departements': ['02', '59', '60', '62', '80'],
'site_web': 'https://www.atmo-hdf.fr',
'description': 'Hauts-de-France | Atmo Hauts-de-France'
},
'44': {
'region': 'Grand Est',
'organisme': 'ATMO Grand-Est',
'departements': ['08', '10', '51', '52', '54', '55', '57', '67', '68', '88'],
'site_web': 'https://www.atmo-grandest.eu',
'description': 'Grand Est | ATMO Grand-Est'
},
'52': {
'region': 'Pays de la Loire',
'organisme': 'Air Pays de la Loire',
'departements': ['44', '49', '53', '72', '85'],
'site_web': 'https://www.airpl.org',
'description': 'Pays de la Loire | Air Pays de la Loire'
},
'53': {
'region': 'Bretagne',
'organisme': 'Air Breizh',
'departements': ['22', '29', '35', '56'],
'site_web': 'https://www.airbreizh.asso.fr',
'description': 'Bretagne | Air Breizh'
},
'75': {
'region': 'Nouvelle-Aquitaine',
'organisme': 'Atmo Nouvelle-Aquitaine',
'departements': ['16', '17', '19', '23', '24', '33', '40', '47', '64', '79', '86', '87'],
'site_web': 'https://www.atmo-nouvelleaquitaine.org',
'description': 'Nouvelle-Aquitaine | Atmo Nouvelle-Aquitaine'
},
'76': {
'region': 'Occitanie',
'organisme': 'Atmo Occitanie',
'departements': ['09', '11', '12', '30', '31', '32', '34', '46', '48', '65', '66', '81', '82'],
'site_web': 'https://www.atmo-occitanie.org',
'description': 'Occitanie | Atmo Occitanie'
},
'84': {
'region': 'Auvergne-Rhône-Alpes',
'organisme': 'Atmo Auvergne-Rhône-Alpes',
'departements': ['01', '03', '07', '15', '26', '38', '42', '43', '63', '69', '73', '74'],
'site_web': 'https://www.atmo-auvergnerhonealpes.fr',
'description': 'Auvergne-Rhône-Alpes | Atmo Auvergne-Rhône-Alpes'
},
'93': {
'region': 'Provence-Alpes-Côte d\'Azur',
'organisme': 'AtmoSud',
'departements': ['04', '05', '06', '13', '83', '84'],
'site_web': 'https://www.atmosud.org',
'description': 'Provence-Alpes-Côte d\'Azur | AtmoSud'
},
'94': {
'region': 'Corse',
'organisme': 'Qualitair',
'departements': ['2A', '2B'],
'site_web': 'https://www.qualitaircorse.org',
'description': 'Corse | Qualitair'
}
}
# Secteurs d'émission
SECTEURS_EMISSIONS = {
'5': 'Agriculture',
'6': 'Transport routier',
'7': 'Autres transports',
'34': 'Résidentiel - Tertiaire',
'219': 'Industrie, branche énergie, déchets'
}
# Indices de qualité de l'air ATMO
# Source : https://www.atmo-france.org/article/lindice-atmo
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
INDICES_ATMO = {
0 : 'Absent',
1 : 'Bon',
2 : 'Moyen',
3 : 'Dégradé',
4 : 'Mauvais',
5 : 'Très mauvais',
6 : 'Extrêmement mauvais',
7 : 'Événement'
}
# Indices polliniques
# Source : https://www.atmo-france.org/article/indice-pollen
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
INDICES_POLLENS = {
0 : 'Indisponible',
1 : 'Très faible',
2 : 'Faible',
3 : 'Modéré',
4 : 'Élevé',
5 : 'Très élevé',
6 : 'Extrêmement élevé'
}
# Couleur des qualificatifs des indices polluants et pollens :
# Source : https://www.atmo-france.org/article/lindice-atmo
# Source : https://www.atmo-france.org/article/indice-pollen
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
# code_[polluant|pollen] | code couleur (R,V,B) | code couleur #hexa | emojis ronds | emojis carrées
CODE_COLOR_QUALIF = {
0 : [(221,221,221), '#DDDDDD', "", ""],
1 : [(80,240,230), '#50F0E6', "🔵", "🟦"],
2 : [(80,204,170), '#50CCAA', "🟢", "🟩"],
3 : [(240,230,65), '#F0E641', "🟡", "🟨"],
4 : [(255,80,80), '#FF5050', "🔴", "🟥"],
5 : [(150,0,50), '#960032', "🟤", "🟫"],
6 : [(135,33,129), '#872181', "🟣", "🟪"],
7 : [(136,136,136), '#888888', "", ""]
}
"""
Correspondances émojis/couleurs parfaites
- Niveau 0 : Gris clair #DDDDDD
- Niveau 1 : 🔵🟦 Bleu clair #50F0E6
- Niveau 2 : 🟢🟩 Vert #50CCAA
- Niveau 3 : 🟡🟨 Jaune #F0E641
- Niveau 4 : 🔴🟥 Rouge #FF5050
- Niveau 5 : 🟤🟫 Marron #960032
- Niveau 6 : 🟣🟪 Violet #872181
- Niveau 7 : Noir #888888
"""
# Types d'épisodes de pollution
TYPES_EPISODES = [
'PAS DE DEPASSEMENT',
'INFORMATION ET RECOMMANDATION',
'ALERTE SUR PERSISTANCE',
'ALERTE'
]
# Échéances pour les épisodes de pollution (J-1, J0, J+1)
ECHEANCES_VALIDES = ['-1', '0', '1']
# Échelles d'agrégation pour les données d'émissions
ECHELLES_VALIDES = ['region', 'epci']
# URL par défaut de l'API
DEFAULT_API_URL = 'https://admindata.atmo-france.org'
# Timeout par défaut pour les requêtes (en secondes)
DEFAULT_TIMEOUT = 30
# Durée de validité du token JWT (en heures)
TOKEN_VALIDITY_HOURS = 24
# Formats de fichiers supportés pour la sauvegarde
SAVE_FORMATS = ['json', 'csv', 'geojson']
# Extensions de fichiers
FILE_EXTENSIONS = {
'json': '.json',
'csv': '.csv',
'geojson': '.geojson'
}
# Polluants réglementés pris en compte dans l'indice ATMO :
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
POLLUANTS = ['NO2', 'SO2', 'PM10', 'PM2.5', 'O3']
CODE_POLLUANT = {
'NO2' : 'dioxyde dazote' ,
'SO2' : 'dioxyde de soufre',
'PM10' : 'particules fines inférieures à 10 micromètres',
'PM2.5' : 'particules fines inférieures à 2.5 micromètres',
'O3' : 'ozone'
}
# Mapping des codes polluants pour les épisodes de pollution
CODE_POLLUANT_EPISODES = {
'1': 'NO2',
'2': 'SO2',
'3': 'O3',
'5': 'PM10',
'6': 'PM2.5'
}
# Taxons (espèces) pris en compte dans l'indice pollen :
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
CODE_TAXON = {
'ambr' : 'Ambroisie',
'arm' : 'Armoise',
'aul' : 'Aulne',
'boul' : 'Bouleau',
'gram' : 'Graminées',
'oliv' : 'Olivier',
}
# Mapping des variantes de noms de taxons vers les codes standards
TAXON_MAPPING = {
'aulne': 'aul',
'bouleau': 'boul',
'olivier': 'oliv',
'graminées': 'gram',
'graminee': 'gram', # Variante sans accent
'armoise': 'arm',
'artemisi': 'arm', # Variante latine de armoise
'ambroisie': 'ambr'
}
# Mentions légales et licence Atmo France / AASQA
# Source : Notice technique et dinformation des données open data sur la qualité de lair disponibles sur Atmo Data - Version 1er avril 2025
ATMO_LICENCE_COURTE = "Atmo France / AASQA"
ATMO_LICENCE_LONGUE = "Atmo France et les Associations agréées de surveillance de la qualité de l'air"
ATMO_LICENCE_COMPLETE = """Données sous licence ODbL (Open Database License)
Source: Atmo France et les Associations agréées de surveillance de la qualité de l'air
URL: https://www.atmo-france.org/
API: https://admindata.atmo-france.org/
Chacun peut bénéficier gratuitement de ces données mises en open data sous licence ODbL,
en indiquant la source "Atmo France et les Associations agréées de surveillance de la qualité de l'air"
ou "Atmo France / AASQA" dans sa version courte."""

View file

@ -0,0 +1,42 @@
"""
Exceptions personnalisées pour Atmo Data Wrapper
"""
class AtmoDataException(Exception):
"""Exception de base pour les erreurs liées à l'API Atmo Data"""
def __init__(self, message: str, status_code: int = None, response_data: dict = None):
super().__init__(message)
self.status_code = status_code
self.response_data = response_data
def __str__(self) -> str:
if self.status_code:
return f"AtmoDataException ({self.status_code}): {super().__str__()}"
return f"AtmoDataException: {super().__str__()}"
class AuthenticationError(AtmoDataException):
"""Erreur d'authentification"""
pass
class ValidationError(AtmoDataException):
"""Erreur de validation des paramètres"""
pass
class APIError(AtmoDataException):
"""Erreur de l'API Atmo Data"""
pass
class NetworkError(AtmoDataException):
"""Erreur de réseau"""
pass
class DataError(AtmoDataException):
"""Erreur dans le traitement des données"""
pass

View file

@ -0,0 +1,631 @@
"""
Classes pour les différents types de données de l'API Atmo Data
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
import statistics
from dataclasses import dataclass
from .constants import (
AASQA_CODES, INDICES_ATMO, INDICES_POLLENS, CODE_COLOR_QUALIF,
CODE_POLLUANT, CODE_TAXON, SECTEURS_EMISSIONS, CODE_POLLUANT_EPISODES, TAXON_MAPPING
)
@dataclass
class Coordinates:
"""Classe pour représenter les coordonnées géographiques"""
longitude: float
latitude: float
def __str__(self) -> str:
return f"({self.latitude:.4f}, {self.longitude:.4f})"
def distance_to(self, other: 'Coordinates') -> float:
"""Calcule la distance approximative entre deux points (en km)"""
import math
lat1, lon1 = math.radians(self.latitude), math.radians(self.longitude)
lat2, lon2 = math.radians(other.latitude), math.radians(other.longitude)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
r = 6371 # Rayon de la Terre en km
return r * c
class AtmoDataBase:
"""Classe de base pour tous les types de données Atmo"""
def __init__(self, feature: Dict[str, Any]):
self.raw_data = feature
self.properties = feature.get('properties', {})
self.geometry = feature.get('geometry', {})
# Propriétés communes
self.aasqa = self.properties.get('aasqa', '')
self.source = self.properties.get('source', '')
self.date_maj = self.properties.get('date_maj', '')
self.lib_zone = self.properties.get('lib_zone', '')
self.coordinates = self._extract_coordinates()
def _extract_coordinates(self) -> Optional[Coordinates]:
"""Extrait les coordonnées du feature"""
if self.geometry and self.geometry.get('type') == 'Point':
coords = self.geometry.get('coordinates', [])
if len(coords) >= 2:
return Coordinates(coords[0], coords[1])
elif 'x_wgs84' in self.properties and 'y_wgs84' in self.properties:
return Coordinates(
self.properties['x_wgs84'],
self.properties['y_wgs84']
)
return None
def get_aasqa_name(self) -> str:
"""Retourne le nom de l'AASQA"""
aasqa_info = AASQA_CODES.get(self.aasqa)
if aasqa_info:
return aasqa_info['description']
return f"AASQA {self.aasqa}"
def get_aasqa_website(self) -> str:
"""Retourne le site web de l'AASQA"""
aasqa_info = AASQA_CODES.get(self.aasqa)
if aasqa_info:
return aasqa_info['site_web']
return None
def get_aasqa_region(self) -> str:
"""Retourne la région de l'AASQA"""
aasqa_info = AASQA_CODES.get(self.aasqa)
if aasqa_info:
return aasqa_info['region']
return None
def get_aasqa_organisme(self) -> str:
"""Retourne l'organisme de l'AASQA"""
aasqa_info = AASQA_CODES.get(self.aasqa)
if aasqa_info:
return aasqa_info['organisme']
return None
def get_source(self) -> str:
"""Retourne le nom publique de l'AASQA, texte"""
return self.source
def has_coordinates(self) -> bool:
"""Vérifie si l'objet a des coordonnées"""
return self.coordinates is not None
def get_emoji_by_level(self, level: int, style: str = "round") -> str:
"""Récupère l'émoji correspondant au niveau de qualificatif
Args:
level: Niveau de qualificatif (0-7)
style: Style d'émoji - "round" pour ronds (🟢), "square" pour carrés (🟩)
Returns:
str: Émoji correspondant au niveau et style demandé
"""
color_info = CODE_COLOR_QUALIF.get(level, CODE_COLOR_QUALIF.get(0, [None, None, "", ""]))
if style == "square" and len(color_info) > 3:
# Utiliser l'émoji carré (4ème élément)
return color_info[3]
elif len(color_info) > 2:
# Utiliser l'émoji rond (3ème élément)
return color_info[2]
else:
# Fallback
return ""
def get_color_by_level(self, level: int) -> Tuple[str, List[int]]:
"""Récupère la couleur (hex, rgb) correspondant au niveau de qualificatif"""
color_info = CODE_COLOR_QUALIF.get(level)
if color_info and len(color_info) >= 2:
return color_info[1], color_info[0] # hex, rgb
return "#DDDDDD", [221, 221, 221] # Couleur par défaut
class IndiceAtmo(AtmoDataBase):
"""Classe pour les indices de qualité de l'air ATMO"""
def __init__(self, feature: Dict[str, Any]):
super().__init__(feature)
# Propriétés spécifiques aux indices ATMO
self.code_qual = self.properties.get('code_qual', 0)
self.lib_qual = self.properties.get('lib_qual', '')
self.coul_qual = self.properties.get('coul_qual', '')
self.code_zone = self.properties.get('code_zone', '')
self.lib_zone = self.properties.get('lib_zone', '')
self.date_dif = self.properties.get('date_dif', '')
self.date_ech = self.properties.get('date_ech', '')
# Champs supplémentaires selon la notice officielle
self.type_zone = self.properties.get('type_zone', '') # 'commune' ou 'EPCI'
self.x_reg = self.properties.get('x_reg', 0.0) # Coordonnées réglementaires
self.y_reg = self.properties.get('y_reg', 0.0)
self.epsg_reg = self.properties.get('epsg_reg', '') # Système de projection réglementaire
# Concentrations facultatives (selon notice)
self.conc_no2 = self.properties.get('conc_no2', 0) # Concentration en μg/m³
self.conc_so2 = self.properties.get('conc_so2', 0)
self.conc_o3 = self.properties.get('conc_o3', 0)
self.conc_pm10 = self.properties.get('conc_pm10', 0)
self.conc_pm25 = self.properties.get('conc_pm25', 0)
# Codes par polluant
self.code_no2 = self.properties.get('code_no2', 0)
self.code_so2 = self.properties.get('code_so2', 0)
self.code_o3 = self.properties.get('code_o3', 0)
self.code_pm10 = self.properties.get('code_pm10', 0)
self.code_pm25 = self.properties.get('code_pm25', 0)
def get_qualificatif(self) -> str:
"""Retourne le qualificatif textuel de l'indice"""
return INDICES_ATMO.get(self.code_qual, "Inconnu")
def get_color(self) -> Tuple[str, List[int]]:
"""Retourne la couleur (hex, rgb) associée à l'indice"""
return self.get_color_by_level(self.code_qual)
def get_emoji(self, style: str = "round") -> str:
"""Retourne l'émoji correspondant à l'indice ATMO
Args:
style: Style d'émoji - "round" pour ronds (🟢), "square" pour carrés (🟩)
"""
return self.get_emoji_by_level(self.code_qual, style)
def is_good_quality(self) -> bool:
"""Vérifie si la qualité de l'air est bonne (indice 1-2)"""
return self.code_qual in [1, 2]
def is_poor_quality(self) -> bool:
"""Vérifie si la qualité de l'air est mauvaise (indice 4+)"""
return self.code_qual >= 4
def get_worst_pollutant(self) -> Tuple[str, int]:
"""Retourne le polluant avec le plus mauvais indice"""
pollutants = {
'NO2': self.code_no2,
'SO2': self.code_so2,
'O3': self.code_o3,
'PM10': self.code_pm10,
'PM2.5': self.code_pm25
}
worst = max(pollutants.items(), key=lambda x: x[1])
return worst[0], worst[1]
def get_pollutants_summary(self) -> Dict[str, Dict[str, Any]]:
"""Retourne un résumé de tous les polluants"""
pollutants = {
'NO2': self.code_no2,
'SO2': self.code_so2,
'O3': self.code_o3,
'PM10': self.code_pm10,
'PM2.5': self.code_pm25
}
summary = {}
for polluant, code in pollutants.items():
summary[polluant] = {
'code': code,
'qualificatif': INDICES_ATMO.get(code, "Inconnu"),
'description': CODE_POLLUANT.get(polluant, polluant)
}
return summary
def get_concentrations(self) -> Dict[str, int]:
"""Retourne les concentrations de tous les polluants en μg/m³ (selon notice officielle)"""
return {
'NO2': self.conc_no2,
'SO2': self.conc_so2,
'O3': self.conc_o3,
'PM10': self.conc_pm10,
'PM2.5': self.conc_pm25
}
def is_commune_level(self) -> bool:
"""Vérifie si l'indice est calculé au niveau commune (selon notice officielle)"""
return self.type_zone.lower() == 'commune'
def is_epci_level(self) -> bool:
"""Vérifie si l'indice est calculé au niveau EPCI (selon notice officielle)"""
return self.type_zone.lower() == 'epci'
def get_responsible_pollutants(self) -> List[str]:
"""Retourne le(s) polluant(s) responsable(s) de l'indice global (selon règle n°4 de la notice)"""
pollutants = {
'NO2': self.code_no2,
'SO2': self.code_so2,
'O3': self.code_o3,
'PM10': self.code_pm10,
'PM2.5': self.code_pm25
}
# L'indice global correspond au qualificatif le plus dégradé
max_code = max(pollutants.values())
responsible = [polluant for polluant, code in pollutants.items() if code == max_code]
return responsible
def __str__(self) -> str:
return f"Indice ATMO {self.lib_zone}: {self.get_qualificatif()} ({self.code_qual})"
class EpisodePollution(AtmoDataBase):
"""Classe pour les épisodes de pollution"""
def __init__(self, feature: Dict[str, Any]):
super().__init__(feature)
# Propriétés spécifiques aux épisodes
self.code_pol = self.properties.get('code_pol', '')
self.lib_pol = self.properties.get('lib_pol', '')
self.code_zone = self.properties.get('code_zone', '')
self.lib_zone = self.properties.get('lib_zone', '')
self.date_dif = self.properties.get('date_dif', '')
self.date_ech = self.properties.get('date_ech', '')
self.etat = self.properties.get('etat', '')
def get_polluant_code(self) -> str:
"""Retourne le code du polluant principal"""
return CODE_POLLUANT_EPISODES.get(self.code_pol, self.code_pol)
def is_alert_active(self) -> bool:
"""Vérifie si une alerte est active"""
return self.etat not in ['PAS DE DEPASSEMENT', '']
def get_alert_level(self) -> str:
"""Retourne le niveau d'alerte"""
if 'INFORMATION' in self.etat:
return 'Information'
elif 'ALERTE' in self.etat:
return 'Alerte'
else:
return 'Aucune'
def is_geometry_complex(self) -> bool:
"""Vérifie si la géométrie est complexe (MultiPolygon)"""
return self.geometry.get('type') == 'MultiPolygon'
def __str__(self) -> str:
return f"Épisode {self.lib_pol} - {self.lib_zone}: {self.etat}"
class EmissionData(AtmoDataBase):
"""Classe pour les données d'émissions"""
def __init__(self, feature: Dict[str, Any]):
super().__init__(feature)
# Propriétés spécifiques aux émissions
self.code = self.properties.get('code', '')
self.name = self.properties.get('name', '')
self.population = self.properties.get('population', 0)
self.superficie = self.properties.get('superficie', 0)
# Émissions par polluant
self.nox = self.properties.get('nox', 0)
self.pm10 = self.properties.get('pm10', 0)
self.pm25 = self.properties.get('pm25', 0)
self.ges = self.properties.get('ges', 0) # Gaz à effet de serre
# Secteur
self.code_pcaet = self.properties.get('code_pcaet', '')
def get_emission_density(self, pollutant: str) -> float:
"""Calcule la densité d'émission par km² pour un polluant"""
emission_value = getattr(self, pollutant.lower(), 0)
if self.superficie > 0:
return emission_value / self.superficie
return 0
def get_emission_per_capita(self, pollutant: str) -> float:
"""Calcule l'émission par habitant pour un polluant"""
emission_value = getattr(self, pollutant.lower(), 0)
if self.population > 0:
return emission_value / self.population
return 0
def get_total_emissions(self) -> Dict[str, float]:
"""Retourne toutes les émissions"""
return {
'NOx': self.nox,
'PM10': self.pm10,
'PM2.5': self.pm25,
'GES': self.ges
}
def get_secteur_name(self) -> str:
"""Retourne le nom du secteur d'émission"""
return SECTEURS_EMISSIONS.get(self.code_pcaet, f"Secteur {self.code_pcaet}")
def __str__(self) -> str:
return f"Émissions {self.name}: NOx={self.nox:.1f}, PM10={self.pm10:.1f}"
class IndicePollen(AtmoDataBase):
"""Classe pour les indices pollen"""
def __init__(self, feature: Dict[str, Any]):
super().__init__(feature)
# Propriétés spécifiques aux pollens
# Notice : Déclenchement ou non de lalerte pollen (à partir de code_qual = 4) -> Boolean
self.alerte = self.properties.get('alerte', False)
# Codes par taxon (espèce)
# Notice : Classe du sous-indice, entier de 1 à 6, 0 si absent -> Int
self.code_ambr = self.properties.get('code_ambr', 0) # Ambroisie
self.code_arm = self.properties.get('code_arm', 0) # Armoise
self.code_aul = self.properties.get('code_aul', 0) # Aulne
self.code_boul = self.properties.get('code_boul', 0) # Bouleau
self.code_gram = self.properties.get('code_gram', 0) # Graminées
self.code_oliv = self.properties.get('code_oliv', 0) # Olivier
# Concentrations par taxon (grains/m³)
# Notice : Concentration de pollens du taxons (en grains/m³), à la commune -> Float
self.conc_ambr = self.properties.get('conc_ambr', 0.0) # Ambroisie
self.conc_arm = self.properties.get('conc_arm', 0.0) # Armoise
self.conc_aul = self.properties.get('conc_aul', 0.0) # Aulne
self.conc_boul = self.properties.get('conc_boul', 0.0) # Bouleau
self.conc_gram = self.properties.get('conc_gram', 0.0) # Graminées
self.conc_oliv = self.properties.get('conc_oliv', 0.0) # Olivier
# Taxon(s) responsable(s) de l'indice
# Notice : Taxon(s) responsable(s) de l'indice (aulne, bouleau, olivier, graminées, armoise, ambroisie) -> String
self.pollen_resp = self.properties.get('pollen_resp', '')
def is_alert_active(self) -> bool:
"""Vérifie si une alerte pollen est active"""
return self.alerte
def get_highest_pollen(self) -> Tuple[str, int]:
"""Retourne le pollen avec l'indice le plus élevé"""
pollens = {
'ambr': self.code_ambr,
'arm': self.code_arm,
'aul': self.code_aul,
'boul': self.code_boul,
'gram': self.code_gram,
'oliv': self.code_oliv
}
highest = max(pollens.items(), key=lambda x: x[1])
return highest[0], highest[1]
def get_pollens_summary(self, emoji_style: str = "round") -> Dict[str, Dict[str, Any]]:
"""Retourne un résumé de tous les pollens
Args:
emoji_style: Style d'émoji - "round" pour ronds (🟢), "square" pour carrés (🟩)
"""
pollens = {
'ambr': {'code': self.code_ambr, 'conc': self.conc_ambr},
'arm': {'code': self.code_arm, 'conc': self.conc_arm},
'aul': {'code': self.code_aul, 'conc': self.conc_aul},
'boul': {'code': self.code_boul, 'conc': self.conc_boul},
'gram': {'code': self.code_gram, 'conc': self.conc_gram},
'oliv': {'code': self.code_oliv, 'conc': self.conc_oliv}
}
summary = {}
for code_taxon, data in pollens.items():
indice = data['code']
concentration = data['conc']
summary[code_taxon] = {
'code': indice,
'concentration': concentration,
'qualificatif': INDICES_POLLENS.get(int(indice), "Inconnu"),
'espece': CODE_TAXON.get(code_taxon, code_taxon),
'couleur': self.get_color_by_level(int(indice)),
'emoji': self.get_emoji_by_level(int(indice), emoji_style),
'emoji_round': self.get_emoji_by_level(int(indice), "round"),
'emoji_square': self.get_emoji_by_level(int(indice), "square")
}
return summary
def get_concentrations(self) -> Dict[str, float]:
"""Retourne les concentrations de tous les pollens en grains/m³"""
return {
'ambr': self.conc_ambr,
'arm': self.conc_arm,
'aul': self.conc_aul,
'boul': self.conc_boul,
'gram': self.conc_gram,
'oliv': self.conc_oliv
}
def get_highest_concentration(self) -> Tuple[str, float]:
"""Retourne le pollen avec la plus haute concentration"""
concentrations = self.get_concentrations()
highest = max(concentrations.items(), key=lambda x: x[1])
return highest[0], highest[1]
def get_dangerous_pollens(self) -> List[str]:
"""Retourne la liste des pollens à indice élevé (4+)"""
pollens = {
'ambr': self.code_ambr,
'arm': self.code_arm,
'aul': self.code_aul,
'boul': self.code_boul,
'gram': self.code_gram,
'oliv': self.code_oliv
}
dangerous = []
for code_taxon, indice in pollens.items():
if indice >= 4:
dangerous.append(CODE_TAXON.get(code_taxon, code_taxon))
return dangerous
def get_responsible_pollens(self) -> List[str]:
"""Retourne le ou les taxons responsables de l'indice selon l'API"""
if not self.pollen_resp:
return []
# Le champ peut contenir plusieurs taxons séparés par des virgules ou des espaces
# Exemples: "graminées,armoise", "GRAMINEE", "ARTEMISI GRAMINEE"
pollen_resp_clean = self.pollen_resp.lower().strip()
# Séparer par virgules ou espaces
taxons_raw = []
if ',' in pollen_resp_clean:
taxons_raw = [t.strip() for t in pollen_resp_clean.split(',')]
else:
# Essayer de détecter plusieurs mots (séparés par des espaces)
words = pollen_resp_clean.split()
if len(words) > 1:
taxons_raw = words
else:
taxons_raw = [pollen_resp_clean]
taxons_formatted = []
for taxon_raw in taxons_raw:
if not taxon_raw: # Skip empty strings
continue
# Chercher d'abord dans le mapping
taxon_code = TAXON_MAPPING.get(taxon_raw)
if taxon_code:
# Utiliser le nom complet du CODE_TAXON
taxon_name = CODE_TAXON.get(taxon_code, taxon_raw.title())
if taxon_name not in taxons_formatted: # Éviter les doublons
taxons_formatted.append(taxon_name)
else:
# Si pas trouvé, formater simplement la première lettre en majuscule
formatted_name = taxon_raw.title()
if formatted_name not in taxons_formatted: # Éviter les doublons
taxons_formatted.append(formatted_name)
return taxons_formatted
def __str__(self) -> str:
highest_pollen, highest_code = self.get_highest_pollen()
espece = CODE_TAXON.get(highest_pollen, highest_pollen)
qualif = INDICES_POLLENS.get(highest_code, "Inconnu")
return f"Pollen - Plus élevé: {espece} ({qualif})"
class AtmoDataCollection:
"""Classe pour gérer une collection de données Atmo"""
def __init__(self, data: Dict[str, Any], data_type: str):
self.raw_data = data
self.data_type = data_type
self.features = data.get('features', [])
# Créer les objets typés
self.items = self._create_typed_objects()
def _create_typed_objects(self) -> List[AtmoDataBase]:
"""Crée les objets typés selon le type de données"""
objects = []
for feature in self.features:
if self.data_type == 'indices':
objects.append(IndiceAtmo(feature))
elif self.data_type == 'episodes':
objects.append(EpisodePollution(feature))
elif self.data_type == 'emissions':
objects.append(EmissionData(feature))
elif self.data_type == 'pollens':
objects.append(IndicePollen(feature))
else:
objects.append(AtmoDataBase(feature))
return objects
def __len__(self) -> int:
return len(self.items)
def __iter__(self):
return iter(self.items)
def __getitem__(self, index):
return self.items[index]
def filter_by_aasqa(self, aasqa_code: str) -> 'AtmoDataCollection':
"""Filtre par code AASQA"""
filtered_features = [
item.raw_data for item in self.items
if item.aasqa == aasqa_code
]
filtered_data = {
'type': self.raw_data.get('type'),
'features': filtered_features
}
return AtmoDataCollection(filtered_data, self.data_type)
def filter_by_coordinates(self, center: Coordinates, radius_km: float) -> 'AtmoDataCollection':
"""Filtre par proximité géographique"""
filtered_items = []
for item in self.items:
if item.has_coordinates():
distance = item.coordinates.distance_to(center)
if distance <= radius_km:
filtered_items.append(item.raw_data)
filtered_data = {
'type': self.raw_data.get('type'),
'features': filtered_items
}
return AtmoDataCollection(filtered_data, self.data_type)
def get_statistics(self) -> Dict[str, Any]:
"""Retourne des statistiques sur la collection"""
stats = {
'total_count': len(self.items),
'aasqa_count': len(set(item.aasqa for item in self.items)),
'data_type': self.data_type
}
# Statistiques spécifiques par type
if self.data_type == 'indices' and self.items:
indices_codes = [item.code_qual for item in self.items if hasattr(item, 'code_qual')]
if indices_codes:
stats['quality_stats'] = {
'moyenne': statistics.mean(indices_codes),
'min': min(indices_codes),
'max': max(indices_codes),
'bon_pourcentage': (sum(1 for x in indices_codes if x <= 2) / len(indices_codes)) * 100
}
elif self.data_type == 'episodes' and self.items:
alerts_count = sum(1 for item in self.items if hasattr(item, 'is_alert_active') and item.is_alert_active())
stats['alerts_active'] = alerts_count
stats['alerts_percentage'] = (alerts_count / len(self.items)) * 100
elif self.data_type == 'pollens' and self.items:
alerts_count = sum(1 for item in self.items if hasattr(item, 'alerte') and item.alerte)
stats['pollen_alerts'] = alerts_count
return stats
def to_summary(self) -> str:
"""Retourne un résumé textuel de la collection"""
stats = self.get_statistics()
summary = f"Collection {self.data_type.title()}: {stats['total_count']} éléments"
if self.data_type == 'indices' and 'quality_stats' in stats:
qs = stats['quality_stats']
summary += f" - Qualité moyenne: {qs['moyenne']:.1f}, Bonne qualité: {qs['bon_pourcentage']:.1f}%"
elif self.data_type == 'episodes' and 'alerts_active' in stats:
summary += f" - Alertes actives: {stats['alerts_active']} ({stats['alerts_percentage']:.1f}%)"
elif self.data_type == 'pollens' and 'pollen_alerts' in stats:
summary += f" - Alertes pollen: {stats['pollen_alerts']}"
return summary

View file

@ -0,0 +1,218 @@
"""
Fonctions utilitaires pour le wrapper Atmo Data
"""
from typing import Optional, List, Dict, Any
from .constants import AASQA_CODES, ATMO_LICENCE_COURTE, ATMO_LICENCE_LONGUE, ATMO_LICENCE_COMPLETE
def get_aasqa_by_department(department: str) -> Optional[str]:
"""
Trouve le code AASQA correspondant à un département
Args:
department: Code département (ex: '54', '75', '2A')
Returns:
str: Code AASQA ou None si non trouvé
"""
for aasqa_code, aasqa_data in AASQA_CODES.items():
if department in aasqa_data['departements']:
return aasqa_code
return None
def get_aasqa_info(aasqa_code: str) -> Optional[Dict[str, Any]]:
"""
Récupère les informations complètes d'une AASQA
Args:
aasqa_code: Code AASQA (ex: '44')
Returns:
dict: Informations complètes ou None si non trouvé
"""
return AASQA_CODES.get(aasqa_code)
def get_aasqa_website(aasqa_code: str) -> Optional[str]:
"""
Récupère le site web d'une AASQA
Args:
aasqa_code: Code AASQA (ex: '44')
Returns:
str: URL du site web ou None si non trouvé
"""
aasqa_info = AASQA_CODES.get(aasqa_code)
return aasqa_info['site_web'] if aasqa_info else None
def list_departments_by_aasqa(aasqa_code: str) -> List[str]:
"""
Liste les départements couverts par une AASQA
Args:
aasqa_code: Code AASQA (ex: '44')
Returns:
list: Liste des codes département
"""
aasqa_info = AASQA_CODES.get(aasqa_code)
return aasqa_info['departements'] if aasqa_info else []
def search_aasqa_by_name(search_term: str) -> List[Dict[str, Any]]:
"""
Recherche des AASQA par nom d'organisme ou de région
Args:
search_term: Terme de recherche (case insensitive)
Returns:
list: Liste des AASQA correspondantes avec leurs informations
"""
results = []
search_lower = search_term.lower()
for aasqa_code, aasqa_data in AASQA_CODES.items():
if (search_lower in aasqa_data['organisme'].lower() or
search_lower in aasqa_data['region'].lower()):
results.append({
'code': aasqa_code,
**aasqa_data
})
return results
def get_departments_count() -> Dict[str, int]:
"""
Retourne le nombre de départements par AASQA
Returns:
dict: Code AASQA -> nombre de départements
"""
return {
aasqa_code: len(aasqa_data['departements'])
for aasqa_code, aasqa_data in AASQA_CODES.items()
}
def validate_department_coverage() -> Dict[str, Any]:
"""
Valide la couverture départementale et détecte les anomalies
Returns:
dict: Rapport de validation avec statistiques et anomalies
"""
all_departments = []
for aasqa_data in AASQA_CODES.values():
all_departments.extend(aasqa_data['departements'])
unique_departments = set(all_departments)
duplicates = []
seen = set()
for dept in all_departments:
if dept in seen:
duplicates.append(dept)
seen.add(dept)
return {
'total_departments': len(all_departments),
'unique_departments': len(unique_departments),
'duplicates': list(set(duplicates)),
'has_duplicates': len(duplicates) > 0,
'coverage_complete': len(unique_departments) == len(all_departments)
}
def get_aasqa_statistics() -> Dict[str, Any]:
"""
Génère des statistiques sur les AASQA
Returns:
dict: Statistiques détaillées
"""
dept_counts = get_departments_count()
validation = validate_department_coverage()
max_coverage = max(dept_counts.values())
min_coverage = min(dept_counts.values())
max_aasqa = [code for code, count in dept_counts.items() if count == max_coverage]
min_aasqa = [code for code, count in dept_counts.items() if count == min_coverage]
return {
'total_aasqa': len(AASQA_CODES),
'total_departments_covered': validation['total_departments'],
'unique_departments': validation['unique_departments'],
'max_coverage': {
'count': max_coverage,
'aasqa_codes': max_aasqa,
'aasqa_names': [AASQA_CODES[code]['organisme'] for code in max_aasqa]
},
'min_coverage': {
'count': min_coverage,
'aasqa_codes': min_aasqa,
'aasqa_names': [AASQA_CODES[code]['organisme'] for code in min_aasqa]
},
'average_coverage': sum(dept_counts.values()) / len(dept_counts),
'has_anomalies': validation['has_duplicates']
}
def get_atmo_licence(format: str = "courte") -> str:
"""
Retourne la mention légale Atmo France selon le format demandé
Args:
format: Format de la licence ("courte", "longue", "complete")
Returns:
str: Mention légale correspondante
Examples:
>>> print(get_atmo_licence("courte"))
Atmo France / AASQA
>>> print(get_atmo_licence("longue"))
Atmo France et les Associations agréées de surveillance de la qualité de l'air
>>> print(get_atmo_licence("complete"))
Données sous licence ODbL (Open Database License)
Source: Atmo France et les Associations agréées de surveillance de la qualité de l'air
...
"""
format_lower = format.lower()
if format_lower == "courte":
return ATMO_LICENCE_COURTE
elif format_lower == "longue":
return ATMO_LICENCE_LONGUE
elif format_lower == "complete":
return ATMO_LICENCE_COMPLETE
else:
# Format par défaut si non reconnu
return ATMO_LICENCE_COURTE
def print_atmo_licence(format: str = "courte") -> None:
"""
Affiche la mention légale Atmo France selon le format demandé
Args:
format: Format de la licence ("courte", "longue", "complete")
Examples:
>>> print_atmo_licence("courte")
Atmo France / AASQA
>>> print_atmo_licence("complete")
Données sous licence ODbL (Open Database License)
Source: Atmo France et les Associations agréées de surveillance de la qualité de l'air
...
"""
print(get_atmo_licence(format))