mirror of
https://github.com/Kludex/awesome-fastapi-projects.git
synced 2024-12-18 08:10:18 +00:00
244 lines
7.8 KiB
Python
244 lines
7.8 KiB
Python
|
"""The logic for scraping the source graph data processing it."""
|
||
|
import asyncio
|
||
|
|
||
|
import sqlalchemy.dialects.sqlite
|
||
|
import typer
|
||
|
from loguru import logger
|
||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
||
|
from app.database import Dependency, Repo, RepoDependency
|
||
|
from app.dependencies import acquire_dependencies_data_for_repository
|
||
|
from app.source_graph.client import AsyncSourceGraphSSEClient
|
||
|
from app.source_graph.mapper import create_or_update_repos_from_source_graph_repos_data
|
||
|
from app.types import RepoId
|
||
|
from app.uow import async_session_uow
|
||
|
|
||
|
|
||
|
async def _create_dependencies_for_repo(session: AsyncSession, repo: Repo) -> None:
|
||
|
"""
|
||
|
Create dependencies for a repo.
|
||
|
|
||
|
For each parsed dependency, creates a new record in the database, if such a
|
||
|
dependency does not exist.
|
||
|
Then, assigns the dependencies to the given repo.
|
||
|
|
||
|
:param session: An asynchronous session object
|
||
|
:param repo: A repo for which to create and assign the dependencies
|
||
|
"""
|
||
|
# Acquire the dependencies data for the repo
|
||
|
logger.info(
|
||
|
"Acquiring the dependencies data for the repo with id {repo_id}.",
|
||
|
repo_id=repo.id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
try:
|
||
|
(
|
||
|
revision,
|
||
|
dependencies_create_data,
|
||
|
) = await acquire_dependencies_data_for_repository(repo)
|
||
|
except RuntimeError:
|
||
|
# If the parsing fails,
|
||
|
# just skip creating the dependencies
|
||
|
logger.error(
|
||
|
"Failed to acquire the dependencies data for the repo with id {repo_id}.",
|
||
|
repo_id=repo.id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
return
|
||
|
if repo.last_checked_revision == revision:
|
||
|
# If the repo has already been updated,
|
||
|
# just skip creating the dependencies
|
||
|
logger.info(
|
||
|
"The repo with id {repo_id} has fresh dependencies.",
|
||
|
repo_id=repo.id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
return
|
||
|
if not dependencies_create_data:
|
||
|
# If there are no dependencies,
|
||
|
# just skip creating the dependencies
|
||
|
logger.info(
|
||
|
"The repo with id {repo_id} has no dependencies.",
|
||
|
repo_id=repo.id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
return
|
||
|
# Update the repo with the revision hash
|
||
|
logger.info(
|
||
|
"Updating the repo with id {repo_id} with the revision hash {revision}.",
|
||
|
repo_id=repo.id,
|
||
|
revision=revision,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
update_repo_statement = (
|
||
|
sqlalchemy.update(Repo)
|
||
|
.where(Repo.id == repo.id)
|
||
|
.values(last_checked_revision=revision)
|
||
|
)
|
||
|
await session.execute(update_repo_statement)
|
||
|
# Create dependencies - on conflict do nothing.
|
||
|
# This is to avoid creating duplicate dependencies.
|
||
|
logger.info(
|
||
|
"Creating the dependencies for the repo with id {repo_id}.",
|
||
|
repo_id=repo.id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
insert_dependencies_statement = sqlalchemy.dialects.sqlite.insert(
|
||
|
Dependency
|
||
|
).on_conflict_do_nothing(index_elements=[Dependency.name])
|
||
|
await session.execute(
|
||
|
insert_dependencies_statement.returning(Dependency),
|
||
|
[
|
||
|
{
|
||
|
"name": dependency_data.name,
|
||
|
}
|
||
|
for dependency_data in dependencies_create_data
|
||
|
],
|
||
|
)
|
||
|
# Re-fetch the dependencies from the database
|
||
|
dependencies = (
|
||
|
await session.scalars(
|
||
|
sqlalchemy.select(Dependency).where(
|
||
|
Dependency.name.in_(
|
||
|
[
|
||
|
dependency_data.name
|
||
|
for dependency_data in dependencies_create_data
|
||
|
]
|
||
|
)
|
||
|
)
|
||
|
)
|
||
|
).all()
|
||
|
# Add the dependencies to the repo
|
||
|
insert_repo_dependencies_statement = sqlalchemy.dialects.sqlite.insert(
|
||
|
RepoDependency
|
||
|
).on_conflict_do_nothing([RepoDependency.repo_id, RepoDependency.dependency_id])
|
||
|
await session.execute(
|
||
|
insert_repo_dependencies_statement,
|
||
|
[
|
||
|
{
|
||
|
"repo_id": repo.id,
|
||
|
"dependency_id": dependency.id,
|
||
|
}
|
||
|
for dependency in dependencies
|
||
|
],
|
||
|
)
|
||
|
|
||
|
|
||
|
async def scrape_source_graph_repos() -> None:
|
||
|
"""
|
||
|
Iterate over the source graph repos and create or update them in the database.
|
||
|
|
||
|
:return: None
|
||
|
"""
|
||
|
async with AsyncSourceGraphSSEClient() as sg_client:
|
||
|
async with async_session_uow() as session:
|
||
|
async with asyncio.TaskGroup() as tg:
|
||
|
logger.info(
|
||
|
"Creating or updating repos from source graph repos data.",
|
||
|
enqueue=True,
|
||
|
)
|
||
|
async for sg_repos_data in sg_client.aiter_fastapi_repos():
|
||
|
logger.info(
|
||
|
"Received {count} repos.",
|
||
|
count=len(sg_repos_data),
|
||
|
enqueue=True,
|
||
|
)
|
||
|
tg.create_task(
|
||
|
create_or_update_repos_from_source_graph_repos_data(
|
||
|
session=session,
|
||
|
source_graph_repos_data=sg_repos_data,
|
||
|
)
|
||
|
)
|
||
|
await session.commit()
|
||
|
|
||
|
|
||
|
async def parse_dependencies_for_repo(
|
||
|
semaphore: asyncio.Semaphore, repo_id: RepoId
|
||
|
) -> None:
|
||
|
"""
|
||
|
Parse the dependencies for a given repo and create them in the database.
|
||
|
|
||
|
:param semaphore: A semaphore to limit the number of concurrent requests
|
||
|
:param repo_id: The id of the repo for which to parse the dependencies
|
||
|
:return: None
|
||
|
"""
|
||
|
async with async_session_uow() as session, semaphore:
|
||
|
# Fetch the repo from the database
|
||
|
logger.info(
|
||
|
"Fetching the repo with id {repo_id}.", repo_id=repo_id, enqueue=True
|
||
|
)
|
||
|
repo = (
|
||
|
await session.scalars(sqlalchemy.select(Repo).where(Repo.id == repo_id))
|
||
|
).one()
|
||
|
# Create the dependencies for the repo
|
||
|
logger.info(
|
||
|
"Creating the dependencies for the repo with id {repo_id}.",
|
||
|
repo_id=repo_id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
await _create_dependencies_for_repo(session=session, repo=repo)
|
||
|
await session.commit()
|
||
|
|
||
|
|
||
|
async def parse_dependencies_for_repos() -> None:
|
||
|
"""
|
||
|
Parse the dependencies for all the repos in the database.
|
||
|
|
||
|
:return: None.
|
||
|
"""
|
||
|
logger.info("Fetching the repos from the database.", enqueue=True)
|
||
|
async with async_session_uow() as session:
|
||
|
repo_ids = (
|
||
|
await session.scalars(
|
||
|
sqlalchemy.select(Repo.id).order_by(
|
||
|
Repo.last_checked_revision.is_(None).desc()
|
||
|
)
|
||
|
)
|
||
|
).all()
|
||
|
logger.info("Fetched {count} repos.", count=len(repo_ids), enqueue=True)
|
||
|
logger.info("Parsing the dependencies for the repos.", enqueue=True)
|
||
|
semaphore = asyncio.Semaphore(10)
|
||
|
async with asyncio.TaskGroup() as tg:
|
||
|
for repo_id in repo_ids:
|
||
|
logger.info(
|
||
|
"Parsing the dependencies for repo {repo_id}.",
|
||
|
repo_id=repo_id,
|
||
|
enqueue=True,
|
||
|
)
|
||
|
tg.create_task(
|
||
|
parse_dependencies_for_repo(
|
||
|
semaphore=semaphore, repo_id=RepoId(repo_id)
|
||
|
)
|
||
|
)
|
||
|
|
||
|
|
||
|
app = typer.Typer()
|
||
|
|
||
|
|
||
|
@app.command()
|
||
|
def scrape_repos() -> None:
|
||
|
"""
|
||
|
Scrape the FastAPI-related repositories utilizing the source graph API.
|
||
|
|
||
|
:return: None
|
||
|
"""
|
||
|
logger.info("Scraping the source graph repos.", enqueue=True)
|
||
|
asyncio.run(scrape_source_graph_repos())
|
||
|
|
||
|
|
||
|
@app.command()
|
||
|
def parse_dependencies() -> None:
|
||
|
"""
|
||
|
Parse the dependencies for all the repos in the database.
|
||
|
|
||
|
:return: None.
|
||
|
"""
|
||
|
logger.info(
|
||
|
"Parsing the dependencies for all the repos in the database.", enqueue=True
|
||
|
)
|
||
|
asyncio.run(parse_dependencies_for_repos())
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
app()
|