166 lines
4.4 KiB
Python
166 lines
4.4 KiB
Python
import logging
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import time
|
|
|
|
import pytest
|
|
|
|
import pandas._testing as tm
|
|
|
|
from pandas.io.parsers import read_csv
|
|
|
|
|
|
@pytest.fixture
|
|
def tips_file(datapath):
|
|
"""Path to the tips dataset"""
|
|
return datapath("io", "data", "csv", "tips.csv")
|
|
|
|
|
|
@pytest.fixture
|
|
def jsonl_file(datapath):
|
|
"""Path to a JSONL dataset"""
|
|
return datapath("io", "parser", "data", "items.jsonl")
|
|
|
|
|
|
@pytest.fixture
|
|
def salaries_table(datapath):
|
|
"""DataFrame with the salaries dataset"""
|
|
return read_csv(datapath("io", "parser", "data", "salaries.csv"), sep="\t")
|
|
|
|
|
|
@pytest.fixture
|
|
def feather_file(datapath):
|
|
return datapath("io", "data", "feather", "feather-0_3_1.feather")
|
|
|
|
|
|
@pytest.fixture
|
|
def s3so(worker_id):
|
|
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
|
|
return {"client_kwargs": {"endpoint_url": f"http://127.0.0.1:555{worker_id}/"}}
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def s3_base(worker_id):
|
|
"""
|
|
Fixture for mocking S3 interaction.
|
|
|
|
Sets up moto server in separate process
|
|
"""
|
|
pytest.importorskip("s3fs")
|
|
pytest.importorskip("boto3")
|
|
requests = pytest.importorskip("requests")
|
|
logging.getLogger("requests").disabled = True
|
|
|
|
with tm.ensure_safe_environment_variables():
|
|
# temporary workaround as moto fails for botocore >= 1.11 otherwise,
|
|
# see https://github.com/spulec/moto/issues/1924 & 1952
|
|
os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key")
|
|
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret")
|
|
|
|
pytest.importorskip("moto", minversion="1.3.14")
|
|
pytest.importorskip("flask") # server mode needs flask too
|
|
|
|
# Launching moto in server mode, i.e., as a separate process
|
|
# with an S3 endpoint on localhost
|
|
|
|
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
|
|
endpoint_port = f"555{worker_id}"
|
|
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
|
|
|
|
# pipe to null to avoid logging in terminal
|
|
proc = subprocess.Popen(
|
|
shlex.split(f"moto_server s3 -p {endpoint_port}"),
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
timeout = 5
|
|
while timeout > 0:
|
|
try:
|
|
# OK to go once server is accepting connections
|
|
r = requests.get(endpoint_uri)
|
|
if r.ok:
|
|
break
|
|
except Exception:
|
|
pass
|
|
timeout -= 0.1
|
|
time.sleep(0.1)
|
|
yield endpoint_uri
|
|
|
|
proc.terminate()
|
|
proc.wait()
|
|
|
|
|
|
@pytest.fixture()
|
|
def s3_resource(s3_base, tips_file, jsonl_file, feather_file):
|
|
"""
|
|
Sets up S3 bucket with contents
|
|
|
|
The primary bucket name is "pandas-test". The following datasets
|
|
are loaded.
|
|
|
|
- tips.csv
|
|
- tips.csv.gz
|
|
- tips.csv.bz2
|
|
- items.jsonl
|
|
|
|
A private bucket "cant_get_it" is also created. The boto3 s3 resource
|
|
is yielded by the fixture.
|
|
"""
|
|
import boto3
|
|
import s3fs
|
|
|
|
test_s3_files = [
|
|
("tips#1.csv", tips_file),
|
|
("tips.csv", tips_file),
|
|
("tips.csv.gz", tips_file + ".gz"),
|
|
("tips.csv.bz2", tips_file + ".bz2"),
|
|
("items.jsonl", jsonl_file),
|
|
("simple_dataset.feather", feather_file),
|
|
]
|
|
|
|
def add_tips_files(bucket_name):
|
|
for s3_key, file_name in test_s3_files:
|
|
with open(file_name, "rb") as f:
|
|
cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)
|
|
|
|
bucket = "pandas-test"
|
|
conn = boto3.resource("s3", endpoint_url=s3_base)
|
|
cli = boto3.client("s3", endpoint_url=s3_base)
|
|
|
|
try:
|
|
cli.create_bucket(Bucket=bucket)
|
|
except: # noqa
|
|
# OK is bucket already exists
|
|
pass
|
|
try:
|
|
cli.create_bucket(Bucket="cant_get_it", ACL="private")
|
|
except: # noqa
|
|
# OK is bucket already exists
|
|
pass
|
|
timeout = 2
|
|
while not cli.list_buckets()["Buckets"] and timeout > 0:
|
|
time.sleep(0.1)
|
|
timeout -= 0.1
|
|
|
|
add_tips_files(bucket)
|
|
add_tips_files("cant_get_it")
|
|
s3fs.S3FileSystem.clear_instance_cache()
|
|
yield conn
|
|
|
|
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})
|
|
|
|
try:
|
|
s3.rm(bucket, recursive=True)
|
|
except: # noqa
|
|
pass
|
|
try:
|
|
s3.rm("cant_get_it", recursive=True)
|
|
except: # noqa
|
|
pass
|
|
timeout = 2
|
|
while cli.list_buckets()["Buckets"] and timeout > 0:
|
|
time.sleep(0.1)
|
|
timeout -= 0.1
|