Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.api_fastapi.core_api.base import BaseModel


class MessageResponse(BaseModel):
"""Message response schema."""

message: str
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from fastapi import APIRouter, Depends, HTTPException, Path, Request, status

from airflow.api_fastapi.execution_api.datamodels.common import MessageResponse
from airflow.api_fastapi.execution_api.datamodels.variable import (
VariablePostBody,
VariableResponse,
Expand Down Expand Up @@ -96,12 +97,11 @@ def put_variable(
):
"""Set an Airflow Variable."""
Variable.set(key=variable_key, value=body.value, description=body.description, team_name=team_name)
return {"message": "Variable successfully set"}
return MessageResponse(message="Variable successfully set")


@router.delete(
"/{variable_key:path}",
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
status.HTTP_403_FORBIDDEN: {"description": "Task does not have access to the variable"},
Expand All @@ -113,3 +113,4 @@ def delete_variable(
):
"""Delete an Airflow Variable."""
Variable.delete(key=variable_key, team_name=team_name)
return MessageResponse(message=f"Variable with key {variable_key} successfully deleted.")
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.execution_api.datamodels.common import MessageResponse
from airflow.api_fastapi.execution_api.datamodels.xcom import (
XComResponse,
XComSequenceIndexResponse,
Expand Down Expand Up @@ -348,7 +349,7 @@ def set_xcom(
mapped_length: Annotated[
int | None, Query(description="Number of mapped tasks this value expands into")
] = None,
):
) -> MessageResponse:
"""Set an Airflow XCom."""
from airflow.configuration import conf

Expand Down Expand Up @@ -410,7 +411,7 @@ def set_xcom(
},
)

return {"message": "XCom successfully set"}
return MessageResponse(message="XCom successfully set")


@router.delete(
Expand All @@ -425,7 +426,7 @@ def delete_xcom(
task_id: str,
key: Annotated[str, Path(min_length=1)],
map_index: Annotated[int, Query()] = -1,
):
) -> MessageResponse:
"""Delete a single XCom Value."""
query = delete(XComModel).where(
XComModel.key == key,
Expand All @@ -436,4 +437,4 @@ def delete_xcom(
)
session.execute(query)
session.commit()
return {"message": f"XCom with key: {key} successfully deleted."}
return MessageResponse(message=f"XCom with key: {key} successfully deleted.")
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,11 @@ def ensure_start_date_in_dag_run(response: ResponseInfo) -> None: # type: ignor
"""Ensure start_date is never None in direct DagRun responses for previous API versions."""
if response.body.get("start_date") is None:
response.body["start_date"] = response.body.get("run_after")


class AddMessageResponseModel(VersionChange):
"""Add MessageResponse schema to common datamodels."""

description = __doc__

instructions_to_migrate_to_previous_version = ()
8 changes: 8 additions & 0 deletions task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ class IntermediateTIState(str, Enum):
DEFERRED = "deferred"


class MessageResponse(BaseModel):
"""
Message response schema.
"""

message: Annotated[str, Field(title="Message")]


class PrevSuccessfulDagRunResponse(BaseModel):
"""
Schema for response with previous successful DagRun information for Task Template Context.
Expand Down