1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88# Copyright ยฉ 2026 Pathway
from __future__ import annotations
import os
import time
import uuid
from pathlib import Path
from typing import Generator
import boto3
import pytest
import pathway as pw
from pathway.internals import parse_graph
from pathway.tests.utils import SerializationTestHelper
CREDENTIALS_DIR = Path(os.getenv("CREDENTIALS_DIR", default=Path(__file__).parent))
TESTS_BUCKET = "aws-integrationtest"
@pytest.fixture(autouse=True)
def parse_graph_teardown():
yield
parse_graph.G.clear()
@pytest.fixture(scope="session")
def root_s3_path() -> str:
return f"integration_tests/{time.time()}-{uuid.uuid4()}"
@pytest.fixture
def s3_path(
request: pytest.FixtureRequest, root_s3_path: str
) -> Generator[str, None, None]:
node_name = request.node.name
path = f"{root_s3_path}/{node_name}/{uuid.uuid4()}"
yield path
# Remove the artifacts created by test under the path
s3 = boto3.client(
"s3",
aws_access_key_id=os.environ.get("AWS_S3_ACCESS_KEY"),
aws_secret_access_key=os.environ.get("AWS_S3_SECRET_ACCESS_KEY"),
)
response = s3.list_objects_v2(Bucket=TESTS_BUCKET, Prefix=path)
if "Contents" not in response:
return
objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
s3.delete_objects(Bucket=TESTS_BUCKET, Delete={"Objects": objects_to_delete})
while response.get("IsTruncated"):
response = s3.list_objects_v2(
Bucket=TESTS_BUCKET,
Prefix=path,
ContinuationToken=response["NextContinuationToken"],
)
if "Contents" not in response:
break
objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
s3.delete_objects(Bucket=TESTS_BUCKET, Delete={"Objects": objects_to_delete})
@pytest.fixture(autouse=True)
def disable_monitoring(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("PATHWAY_MONITORING_SERVER", raising=False)
monkeypatch.delenv("PATHWAY_DETAILED_MONITORING_DIR", raising=False)
pw.set_monitoring_config()
@pytest.fixture
def credentials_dir() -> Path:
return CREDENTIALS_DIR
@pytest.fixture
def serialization_tester():
return SerializationTestHelper()
@pytest.fixture
def tcp_port() -> int:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]