Skip to content

Platform api

dadosfera.services.platform_api.pipeline_last_status

pipeline_last_status(platform_api_base_url, pipeline_id)

Get the last status of a pipeline.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

pipeline_id

The ID of the pipeline.

TYPE: str

RETURNS DESCRIPTION
str

The last status of the pipeline.

TYPE: str

Source code in dadosfera/services/platform_api.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def pipeline_last_status(platform_api_base_url: str, pipeline_id: str) -> str:
    """Get the last status of a pipeline.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        pipeline_id (str): The ID of the pipeline.

    Returns:
        str: The last status of the pipeline.
    """
    url = f"{platform_api_base_url}/pipeline/{pipeline_id}/pipeline_run"
    headers = get_headers("GET", url=url, headers={})
    r = requests.get(url, headers=headers)
    r.raise_for_status()
    if len(r.json()) > 0:
        pipeline_runs = r.json()
        return max(
            pipeline_runs, key=lambda x: datetime.fromisoformat(x["created_at"])
        )["last_status"]
    return "NO_PIPELINE_RUNS"

dadosfera.services.platform_api.poll_for_terminal_state

poll_for_terminal_state(platform_api_base_url, pipeline_id, poll_interval=10)

Polls the pipeline_last_status until it reaches a terminal state (FAILED, SUCCESS).

PARAMETER DESCRIPTION
platform_api_base_url

Base URL of the platform API.

TYPE: str

pipeline_id

The ID of the pipeline to check.

TYPE: str

poll_interval

Time in seconds between each poll. Default is 10 seconds.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
str

The terminal state of the pipeline.

TYPE: None

Source code in dadosfera/services/platform_api.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def poll_for_terminal_state(
    platform_api_base_url: str, pipeline_id: str, poll_interval=10
) -> None:
    """Polls the pipeline_last_status until it reaches a terminal state (FAILED, SUCCESS).

    Args:
        platform_api_base_url (str): Base URL of the platform API.
        pipeline_id (str): The ID of the pipeline to check.
        poll_interval (int, optional): Time in seconds between each poll. Default is 10 seconds.

    Returns:
        str: The terminal state of the pipeline.
    """
    terminal_states = ["FAILED", "SUCCESS"]
    while True:
        current_status = pipeline_last_status(platform_api_base_url, pipeline_id)
        logger.info(
            f"The current status of the last run of pipeline_id {pipeline_id} is {current_status}"
        )
        if current_status in terminal_states:
            return current_status
        time.sleep(poll_interval)

dadosfera.services.platform_api.delete_pipeline

delete_pipeline(platform_api_base_url, pipeline_id, customer_name)

Delete a pipeline.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

pipeline_id

The ID of the pipeline to delete.

TYPE: str

customer_name

The name of the customer.

TYPE: str

Source code in dadosfera/services/platform_api.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def delete_pipeline(
    platform_api_base_url: str, pipeline_id: str, customer_name: str
) -> None:
    """Delete a pipeline.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        pipeline_id (str): The ID of the pipeline to delete.
        customer_name (str): The name of the customer.
    """
    url = f"{platform_api_base_url}/pipeline/{pipeline_id}"
    headers = get_headers("DELETE", url=url, headers={"customer_name": customer_name})
    r = requests.delete(url, headers=headers)
    r.raise_for_status()
    logger.info(f"Successfully deleted pipeline: {pipeline_id}")

dadosfera.services.platform_api.update_jdbc_dataset_asset

update_jdbc_dataset_asset(platform_api_base_url, job_id, payload)

Update a JDBC dataset asset.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

job_id

The ID of the job to update.

TYPE: str

payload

The data payload for the update.

TYPE: Dict[str, Any]

Source code in dadosfera/services/platform_api.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def update_jdbc_dataset_asset(
    platform_api_base_url: str, job_id: str, payload: Dict[str, Any]
) -> None:
    """Update a JDBC dataset asset.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        job_id (str): The ID of the job to update.
        payload (Dict[str, Any]): The data payload for the update.
    """
    url = f"{platform_api_base_url}/data_assets/jdbc/{job_id}"
    headers = get_headers("PATCH", url=url, headers=None, data=json.dumps(payload))
    logger.info(f"Making request to {url}")
    r = requests.patch(url=url, data=json.dumps(payload), headers=headers)
    if r.status_code != 200:
        logger.info(
            f"The request has returned an status code different than 200. More details: {r.text}"
        )
        r.raise_for_status()
    logger.info(f"Successfully patched the job_id: {job_id}")

dadosfera.services.platform_api.jdbc_reset_job_state

jdbc_reset_job_state(platform_api_base_url, job_id)

Reset the state of a JDBC job.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

job_id

The ID of the job to reset.

TYPE: str

Source code in dadosfera/services/platform_api.py
219
220
221
222
223
224
225
226
227
228
229
230
231
def jdbc_reset_job_state(platform_api_base_url: str, job_id: str) -> None:
    """Reset the state of a JDBC job.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        job_id (str): The ID of the job to reset.
    """
    logger.info(f"Reseting the job state of the job {job_id}")
    update_jdbc_dataset_asset(
        platform_api_base_url=platform_api_base_url,
        job_id=job_id,
        payload={"incremental_column_value": None},
    )

dadosfera.services.platform_api.jdbc_update_from_incremental_to_full_table

jdbc_update_from_incremental_to_full_table(platform_api_base_url, job_id)

Update a JDBC job from incremental to full table loading.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

job_id

The ID of the job to update.

TYPE: str

Source code in dadosfera/services/platform_api.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def jdbc_update_from_incremental_to_full_table(
    platform_api_base_url: str, job_id: str
) -> None:
    """Update a JDBC job from incremental to full table loading.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        job_id (str): The ID of the job to update.
    """
    logger.info(
        f"Updating the job_type of the job_id {job_id}. From incremental to full load."
    )
    update_jdbc_dataset_asset(
        platform_api_base_url=platform_api_base_url,
        job_id=job_id,
        payload={
            "incremental_column_name": None,
            "incremental_column_value": None,
            "load_type": "full_table",
        },
    )

dadosfera.services.platform_api.jdbc_update_from_full_table_to_incremental

jdbc_update_from_full_table_to_incremental(platform_api_base_url, incremental_column_name, job_id)

Update a JDBC job from full table to incremental loading.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL of the platform API.

TYPE: str

incremental_column_name

The name of the incremental column.

TYPE: str

job_id

The ID of the job to update.

TYPE: str

Source code in dadosfera/services/platform_api.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def jdbc_update_from_full_table_to_incremental(
    platform_api_base_url: str, incremental_column_name: str, job_id: str
) -> None:
    """Update a JDBC job from full table to incremental loading.

    Args:
        platform_api_base_url (str): The base URL of the platform API.
        incremental_column_name (str): The name of the incremental column.
        job_id (str): The ID of the job to update.
    """
    logger.info(
        f"Updating the job_type of the job_id {job_id}. From Full Load to Incremental."
    )
    update_jdbc_dataset_asset(
        platform_api_base_url=platform_api_base_url,
        job_id=job_id,
        payload={
            "incremental_column_name": incremental_column_name,
            "load_type": "incremental",
        },
    )
    logger.info("Success")

dadosfera.services.platform_api.update_job_memory

update_job_memory(platform_api_base_url, job_id, memory_allocation_mb)

Update the memory allocation for a specific job in the platform.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL for the platform API.

TYPE: str

job_id

The unique identifier for the job to be updated.

TYPE: str

memory_allocation_mb

The amount of memory (in megabytes) to allocate for the job.

TYPE: int

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
HTTPError

If the request to the platform API returns a status code other than 200.

Source code in dadosfera/services/platform_api.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def update_job_memory(
        platform_api_base_url: str,
        job_id: str,
        memory_allocation_mb: int
    ) -> None:
    """
    Update the memory allocation for a specific job in the platform.

    Args:
        platform_api_base_url (str): The base URL for the platform API.
        job_id (str): The unique identifier for the job to be updated.
        memory_allocation_mb (int): The amount of memory (in megabytes) to allocate for the job.

    Returns:
        None

    Raises:
        HTTPError: If the request to the platform API returns a status code other than 200.
    """
    url = f"{platform_api_base_url}/jobs/{job_id}/memory"

    payload = {"amount": memory_allocation_mb}
    headers = get_headers("PUT", url=url, headers=None, data=json.dumps(payload))
    logger.info(f"Making request to {url}")
    r = requests.put(url=url, data=json.dumps(payload), headers=headers)
    if r.status_code != 200:
        logger.info(
            f"The request has returned an status code different than 200. More details: {r.text}"
        )
        r.raise_for_status()
    logger.info(f"Successfully updated the memory for job_id: {job_id}")

dadosfera.services.platform_api.update_pipeline_memory

update_pipeline_memory(platform_api_base_url, pipeline_id, memory_allocation_mb)

Update the memory allocation for a specific pipeline in the platform.

PARAMETER DESCRIPTION
platform_api_base_url

The base URL for the platform API.

TYPE: str

pipeline_id

The unique identifier for the pipeline to be updated.

TYPE: str

memory_allocation_mb

The amount of memory (in megabytes) to allocate for the pipeline.

TYPE: int

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
HTTPError

If the request to the platform API returns a status code other than 200.

Source code in dadosfera/services/platform_api.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def update_pipeline_memory(
        platform_api_base_url: str,
        pipeline_id: str,
        memory_allocation_mb: int
    ) -> None:
    """
    Update the memory allocation for a specific pipeline in the platform.

    Args:
        platform_api_base_url (str): The base URL for the platform API.
        pipeline_id (str): The unique identifier for the pipeline to be updated.
        memory_allocation_mb (int): The amount of memory (in megabytes) to allocate for the pipeline.

    Returns:
        None

    Raises:
        HTTPError: If the request to the platform API returns a status code other than 200.
    """
    url = f"{platform_api_base_url}/pipeline/{pipeline_id}/memory"

    payload = {"amount": memory_allocation_mb}
    headers = get_headers("PUT", url=url, headers=None, data=json.dumps(payload))
    logger.info(f"Making request to {url}")
    r = requests.put(url=url, data=json.dumps(payload), headers=headers)
    if r.status_code != 200:
        logger.info(
            f"The request has returned an status code different than 200. More details: {r.text}"
        )
        r.raise_for_status()
    logger.info(f"Successfully updated the memory for pipeline_id: {pipeline_id}")