Skip to content

Connections

dadosfera.services.maestro.fetch_paginated_connections

fetch_paginated_connections(maestro_base_url, token, additional_params={})

Fetch connections from Maestro with pagination..

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance (e.g., 'https://maestro.example.com/api').

TYPE: str

token

Authentication token for API access.

TYPE: str

additional_params

Additional query parameters to include in the request. These parameters will override default sorting parameters if there are conflicts. Defaults to {}.

TYPE: Dict[str, str] DEFAULT: {}

size

Number of data assets to fetch per page. A larger size means fewer API calls but more data per request. Defaults to 50.

TYPE: int

start_page

Page number to start fetching from. Useful for resuming interrupted fetches. Defaults to 1.

TYPE: int

RETURNS DESCRIPTION
List[Dict]

List[Dict]: List of connections.

RAISES DESCRIPTION
ValueError

If size or start_page is less than or equal to zero.

HTTPError

For failed API requests. Common cases: - 401: Invalid or expired token - 403: Insufficient permissions - 404: Connection ID not found

requests.exceptions.ConnectionError: For network connectivity issues requests.exceptions.Timeout: For request timeouts requests.exceptions.RequestException: For other request-related errors

Source code in dadosfera/services/maestro/connections.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def fetch_paginated_connections(
    maestro_base_url: str,
    token: str,
    additional_params: Dict[str, str] = {},
) -> List[Dict]:
    """
    Fetch connections from Maestro with pagination..

    Args:
        maestro_base_url (str): Base URL of the Maestro instance
            (e.g., 'https://maestro.example.com/api').
        token (str): Authentication token for API access.
        additional_params (Dict[str, str], optional): Additional query parameters to include
            in the request. These parameters will override default sorting parameters if
            there are conflicts. Defaults to {}.
        size (int, optional): Number of data assets to fetch per page. A larger size means
            fewer API calls but more data per request. Defaults to 50.
        start_page (int, optional): Page number to start fetching from. Useful for resuming
            interrupted fetches. Defaults to 1.


    Returns:
        List[Dict]: List of connections.

    Raises:
        ValueError: If `size` or `start_page` is less than or equal to zero.
        requests.exceptions.HTTPError: For failed API requests. Common cases:
           - 401: Invalid or expired token
           - 403: Insufficient permissions
           - 404: Connection ID not found
       requests.exceptions.ConnectionError: For network connectivity issues
       requests.exceptions.Timeout: For request timeouts
       requests.exceptions.RequestException: For other request-related errors
    """

    # Validate input parameters
    if additional_params['size'] and additional_params['size'] <= 0:
        raise ValueError("Size must be greater than zero.")
    if additional_params['start_page'] and additional_params['start_page'] <= 0:
        raise ValueError("Start page must be greater than zero.")

    connections = []
    params = {}
    params.update(additional_params)

    try:
        response = requests.get(
            f"{maestro_base_url}/connections",
            headers={"Content-Type": "application/json", "Authorization": token},
            params=params
        )
        response.raise_for_status()
        response_json = response.json()

    except requests.exceptions.HTTPError as errh:
        logger.info("Http Error:",errh)
    except requests.exceptions.ConnectionError as errc:
        logger.info("Error Connecting:",errc)
    except requests.exceptions.Timeout as errt:
        logger.info("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
        logger.info("OOps: Something Else",err)

    connections = response_json.get("connections", [])
    logger.info(f"Connections retrieved: {len(connections)}")
    return connections

dadosfera.services.maestro.delete_connection

delete_connection(maestro_base_url, token, connection_id)

Delete a data connection from the Maestro platform.

Permanently removes a data connection and all its associated configurations from Maestro. The operation cannot be undone. Requires appropriate delete permissions.

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance (e.g., 'https://maestro.example.com/api').

TYPE: str

token

Authentication token for API access. Must have 'connection:delete' permission.

TYPE: str

connection_id

Unique identifier of the connection to be deleted (e.g., 'conn_abc123').

TYPE: str

RAISES DESCRIPTION
HTTPError

For failed API requests. Common cases: - 401: Invalid or expired token - 403: Insufficient permissions - 404: Connection ID not found

ConnectionError

For network connectivity issues

Timeout

For request timeouts

RequestException

For other request-related errors

## Example ### Delete a database connection:

try:
    delete_connection(
        maestro_base_url="https://maestro.example.com/api",
        token="bearer_token_123",
        connection_id="conn_abc123"
    )
    logger.info("Connection deleted successfully")
except requests.exceptions.HTTPError as e:
    if e.response.status_code == 414:
        logger.info("Connection not found")
    elif e.response.status_code == 403:
        logger.info("Permission denied")
    else:
        raise
    'Connection deleted successfully'

Note
  • This operation is irreversible - the connection cannot be restored after deletion
  • Any pipelines or transformations using this connection will need to be updated
  • Requires the 'connection:delete' permission in the authentication token
  • The function logs both the start and completion of the deletion process
Source code in dadosfera/services/maestro/connections.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def delete_connection(maestro_base_url: str, token: str, connection_id: str) -> None:
    """Delete a data connection from the Maestro platform.

   Permanently removes a data connection and all its associated configurations from Maestro.
   The operation cannot be undone. Requires appropriate delete permissions.

   Args:
       maestro_base_url (str): Base URL of the Maestro instance
           (e.g., 'https://maestro.example.com/api').
       token (str): Authentication token for API access. Must have 'connection:delete'
           permission.
       connection_id (str): Unique identifier of the connection to be deleted
           (e.g., 'conn_abc123').

   Raises:
       requests.exceptions.HTTPError: For failed API requests. Common cases:
           - 401: Invalid or expired token
           - 403: Insufficient permissions
           - 404: Connection ID not found
       requests.exceptions.ConnectionError: For network connectivity issues
       requests.exceptions.Timeout: For request timeouts
       requests.exceptions.RequestException: For other request-related errors

     ## Example
     ### Delete a database connection:
    ```python    
    try:
        delete_connection(
            maestro_base_url="https://maestro.example.com/api",
            token="bearer_token_123",
            connection_id="conn_abc123"
        )
        logger.info("Connection deleted successfully")
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 414:
            logger.info("Connection not found")
        elif e.response.status_code == 403:
            logger.info("Permission denied")
        else:
            raise
        'Connection deleted successfully'
    ```

   Note:
       - This operation is irreversible - the connection cannot be restored after deletion
       - Any pipelines or transformations using this connection will need to be updated
       - Requires the 'connection:delete' permission in the authentication token
       - The function logs both the start and completion of the deletion process
   """

    logger.info(f"Deleting connection: {connection_id}")
    try:
        response = requests.delete(
            f"{maestro_base_url}/connections/{connection_id}",
            headers={"Content-Type": "application/json", "Authorization": token},
        )

        response.raise_for_status()
    except requests.exceptions.HTTPError as errh:
        logger.info("Http Error:",errh)
    except requests.exceptions.ConnectionError as errc:
        logger.info("Error Connecting:",errc)
    except requests.exceptions.Timeout as errt:
        logger.info("Timeout Error:",errt)
    except requests.exceptions.RequestException as err:
        logger.info("OOps: Something Else",err)
    logger.info(f"Successfully deleted connection: {connection_id}")

dadosfera.services.maestro.delete_inactive_connections

delete_inactive_connections(maestro_base_url, token, additional_params={})

Bulk delete inactive connections from Maestro. Deletes multiple connections that match the specified criteria and haven't been updated in the last 60 days. This is useful for cleaning up stale or abandoned connections. The deletion is permanent and cannot be undone.

PARAMETER DESCRIPTION
maestro_base_url

Base URL of the Maestro instance (e.g., 'https://maestro.example.com/api').

TYPE: str

token

Authentication token for API access. Must have 'connection:delete' permission.

TYPE: str

additional_params

Filtering parameters to specify which connections to consider for deletion. Common filters include: - type: Connection type (e.g., 'postgres', 'snowflake') - status: Connection status (e.g., 'active', 'inactive') - name: Connection name pattern Defaults to {}.

TYPE: Dict[str, str] DEFAULT: {}

RETURNS DESCRIPTION

None

RAISES DESCRIPTION
HTTPError

For failed API requests. Common cases: - 401: Invalid or expired token - 403: Insufficient permissions

ConnectionError

For network connectivity issues

Timeout

For request timeouts

ValueError

If date parsing fails for connection timestamps

Source code in dadosfera/services/maestro/connections.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def delete_inactive_connections(
    maestro_base_url: str, token: str, additional_params: Dict[str, str] = {}
):
    """Bulk delete inactive connections from Maestro.
        Deletes multiple connections that match the specified criteria and haven't been updated
        in the last 60 days. This is useful for cleaning up stale or abandoned connections.
        The deletion is permanent and cannot be undone.

        Args:
            maestro_base_url (str): Base URL of the Maestro instance
                (e.g., 'https://maestro.example.com/api').
            token (str): Authentication token for API access. Must have 'connection:delete'
                permission.
            additional_params (Dict[str, str], optional): Filtering parameters to specify which
                connections to consider for deletion. Common filters include:
                - type: Connection type (e.g., 'postgres', 'snowflake')
                - status: Connection status (e.g., 'active', 'inactive')
                - name: Connection name pattern
                Defaults to {}.

        Returns:
            None

        Raises:
            requests.exceptions.HTTPError: For failed API requests. Common cases:
                - 401: Invalid or expired token
                - 403: Insufficient permissions
            requests.exceptions.ConnectionError: For network connectivity issues
            requests.exceptions.Timeout: For request timeouts

            ValueError: If date parsing fails for connection timestamps
    """


    connections = fetch_paginated_connections(
        maestro_base_url=maestro_base_url,
        token=token,
        additional_params=additional_params,
    )

    two_months_ago = datetime.strptime((datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d'), '%Y-%m-%d')

    for connection in connections:
        connection_id = connection["id"]
        updated_at = datetime.strptime(parse(connection["updated_at"]).strftime('%Y-%m-%d'), '%Y-%m-%d')

        if updated_at < two_months_ago:
            delete_connection(
                maestro_base_url=maestro_base_url, token=token, connection_id=connection_id
            )