1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66id: oss_connect
namespace: company.team
inputs:
- id: dataset_uri
type: STRING
displayName: Dataset URI
defaults: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: discount_amount
type: FLOAT
displayName: Discount Amount
description: By default, it's set to 0 (no discount).
min: 0
max: 1
defaults: 0
tasks:
- id: code
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
beforeCommands:
- pip install kestra
outputFiles:
- processed_orders.csv
script: |
import pandas as pd
from kestra import Kestra
df = pd.read_csv('{{ inputs.dataset_uri }}')
total_revenue = df['total'].sum()
Kestra.outputs({"total": total_revenue})
if {{ inputs.discount_amount }} > 0:
df['discounted_total'] = df['total'] * (1 - {{ inputs.discount_amount }})
df.to_csv('processed_orders.csv')
- id: slack_message
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ kv('SLACK_WEBHOOK') }}"
payload: |
{
"text": "Total: ${{ outputs.code.vars.total }}"
}
- id: s3_upload_discounts
type: io.kestra.plugin.aws.s3.Upload
runIf: "{{ inputs.discount_amount > 0 }}"
region: eu-west-2
bucket: oss-example
key: "processed_orders.csv"
from: "{{ outputs.code.outputFiles['processed_orders.csv'] }}"
accessKeyId: "{{ kv('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ kv('AWS_SECRET_KEY_ID') }}"
errors:
- id: slack_notification
type: io.kestra.plugin.notifications.slack.SlackExecution
url: "{{ kv('SLACK_WEBHOOK') }}"
channel: "#general"
executionId: "{{ execution.id }}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "@daily"