๐Ÿ“ฆ wrussell1999 / kestra-examples

๐Ÿ“„ oss_connect.yml ยท 66 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66id: oss_connect
namespace: company.team

inputs:
  - id: dataset_uri
    type: STRING
    displayName: Dataset URI
    defaults: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv

  - id: discount_amount
    type: FLOAT
    displayName: Discount Amount
    description: By default, it's set to 0 (no discount).
    min: 0
    max: 1
    defaults: 0

tasks:
  - id: code
    type: io.kestra.plugin.scripts.python.Script
    containerImage: ghcr.io/kestra-io/pydata:latest
    beforeCommands:
      - pip install kestra
    outputFiles:
      - processed_orders.csv
    script: |
      import pandas as pd
      from kestra import Kestra

      df = pd.read_csv('{{ inputs.dataset_uri }}')
      total_revenue = df['total'].sum()
      Kestra.outputs({"total": total_revenue})

      if {{ inputs.discount_amount }} > 0:
        df['discounted_total'] = df['total'] * (1 - {{ inputs.discount_amount }})
        df.to_csv('processed_orders.csv')

  - id: slack_message
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ kv('SLACK_WEBHOOK') }}"
    payload: |
      {
        "text": "Total: ${{ outputs.code.vars.total }}"
      }

  - id: s3_upload_discounts
    type: io.kestra.plugin.aws.s3.Upload
    runIf: "{{ inputs.discount_amount > 0 }}"
    region: eu-west-2
    bucket: oss-example
    key: "processed_orders.csv"
    from: "{{ outputs.code.outputFiles['processed_orders.csv'] }}"
    accessKeyId: "{{ kv('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ kv('AWS_SECRET_KEY_ID') }}"

errors:
  - id: slack_notification
    type: io.kestra.plugin.notifications.slack.SlackExecution
    url: "{{ kv('SLACK_WEBHOOK') }}"
    channel: "#general"
    executionId: "{{ execution.id }}"

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "@daily"