Skip to content

Pipelines

dadosfera.services.maestro.fetch_paginated_pipelines

fetch_paginated_pipelines(maestro_base_url, token, additional_params={}, size=500, start_page=1)

Fetch pipelines from Maestro with pagination.

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance.

TYPE: str

token

Authentication token.

TYPE: str

additional_params

Additional parameters to be passed in the request. Defaults to {}.

TYPE: Dict[str, str] DEFAULT: {}

size

Number of pipelines to fetch per request. Defaults to 500.

TYPE: int DEFAULT: 500

start_page

Starting page number. Defaults to 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION

List[Dict]: List of pipelines.

Source code in dadosfera/services/maestro/pipelines.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def fetch_paginated_pipelines(
    maestro_base_url: str,
    token: str,
    additional_params: Dict[str, str] = {},
    size: int = 500,
    start_page: int = 1
):
    """Fetch pipelines from Maestro with pagination.

    Args:
        maestro_base_url (str): Base URL of the Maestro instance.
        token (str): Authentication token.
        additional_params (Dict[str, str], optional): Additional parameters to be passed in
          the request. Defaults to {}.
        size (int, optional): Number of pipelines to fetch per request. Defaults to 500.
        start_page (int, optional): Starting page number. Defaults to 1.

    Returns:
        List[Dict]: List of pipelines.

    """
    params = {"size": size, "page": start_page, "order": "asc", "sort_by": "display_name"}
    params.update(additional_params)
    try:
        response = requests.get(
            f"{maestro_base_url}/pipelinesv2",
            headers={"Content-Type": "application/json", "Authorization": token},
            params=params
        )
        response.raise_for_status()
    except requests.exceptions.HTTPError as errh:
        logger.info("Http Error:",errh)
    except requests.exceptions.ConnectionError as errc:
        logger.info("Error Connecting:",errc)
    except requests.exceptions.Timeout as errt:
        logger.info("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
        logger.info("OOps: Something Else",err)
    response_json = response.json()
    return response_json["pipelines"]

dadosfera.services.maestro.fetch_pipeline_execution_history

fetch_pipeline_execution_history(maestro_base_url, token, pipeline_id)

Fetch the execution history and status records for a specific pipeline.

Retrieves a list of historical execution records including status, timestamps, and other metadata for each pipeline run.

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance (e.g., 'https://maestro.example.com/api').

TYPE: str

token

Authentication token for API access.

TYPE: str

pipeline_id

Unique identifier of the pipeline.

TYPE: str

RETURNS DESCRIPTION
List[Dict[str, Any]]

List[Dict[str, Any]]: List of execution records, each containing: - status: Current status of the run

RAISES DESCRIPTION
HTTPError

For failed API requests. Common cases: - 401: Invalid or expired token - 403: Insufficient permissions

ConnectionError

For network connectivity issues

Timeout

For request timeouts

RequestException

For other request-related errors

Source code in dadosfera/services/maestro/pipelines.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def fetch_pipeline_execution_history(
    maestro_base_url: str,
    token: str,
    pipeline_id: str
) -> List[Dict[str, Any]]:
    """Fetch the execution history and status records for a specific pipeline.

    Retrieves a list of historical execution records including status, timestamps,
    and other metadata for each pipeline run.

    Args:
        maestro_base_url (str): Base URL of the Maestro instance
            (e.g., 'https://maestro.example.com/api').
        token (str): Authentication token for API access.
        pipeline_id (str): Unique identifier of the pipeline.

    Returns:
        List[Dict[str, Any]]: List of execution records, each containing:
            - status: Current status of the run


    Raises:
        requests.exceptions.HTTPError: For failed API requests. Common cases:
                - 401: Invalid or expired token
                - 403: Insufficient permissions
        requests.exceptions.ConnectionError: For network connectivity issues
        requests.exceptions.Timeout: For request timeouts
        requests.exceptions.RequestException: For other request-related errors
    """
    try:
        response = requests.get(
            f"{maestro_base_url}/pipelinesv2/{pipeline_id}/status",
            headers={"Content-Type": "application/json", "Authorization": token}
        )
        response.raise_for_status()
    except requests.exceptions.HTTPError as errh:
        logger.info("Http Error:",errh)
    except requests.exceptions.ConnectionError as errc:
        logger.info("Error Connecting:",errc)
    except requests.exceptions.Timeout as errt:
        logger.info("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
        logger.info("OOps: Something Else",err)

    response_json = response.json()
    return response_json["status"]

dadosfera.services.maestro.fetch_execution_history_all_pipelines

fetch_execution_history_all_pipelines(maestro_base_url, token)

Fetch the execution history and status records for all pipelines.

Retrieves a list of historical execution records including status, timestamps, and other metadata for each pipeline run.

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance (e.g., 'https://maestro.example.com/api').

TYPE: str

token

Authentication token for API access.

TYPE: str

RETURNS DESCRIPTION
List[Dict]

List[Dict]: List of pipelines execution history.

Source code in dadosfera/services/maestro/pipelines.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def fetch_execution_history_all_pipelines(
    maestro_base_url: str,
    token: str
) -> List[Dict]:
    """Fetch the execution history and status records for all pipelines.

    Retrieves a list of historical execution records including status, timestamps,
    and other metadata for each pipeline run.

    Args:
        maestro_base_url (str): Base URL of the Maestro instance
            (e.g., 'https://maestro.example.com/api').
        token (str): Authentication token for API access.

    Returns:
        List[Dict]: List of pipelines execution history.

    """
    all_pipelines = fetch_paginated_pipelines(maestro_base_url=maestro_base_url, token=token)
    pipeline_ids = [pipeline['id'] for pipeline in all_pipelines]


    execution_history_all_pipelines = []
    for pipeline_id in pipeline_ids:
        pipelines_runs_by_pipeline = fetch_pipeline_execution_history(
            maestro_base_url=maestro_base_url,
            token=token,
            pipeline_id=pipeline_id
        )

        execution_history_all_pipelines.extend(pipelines_runs_by_pipeline)

    return execution_history_all_pipelines