External Storage

Ingest and Migrate a Data Item

This section outlines the steps required to ingest and migrate data using the ska-dlm REST API interface.

  1. Obtain an API token, authorizing use of DLM by a specific user

  2. Determine the location of the file(s) you wish to register, and register this location with DLM

  3. Register the storage on the DLM system

  4. Ensure the storage is accessible from DLM

  5. Ingest the files into DLM one-by-one

  6. Instruct DLM to migrate the newly ingested item to a secondary storage

  7. Query for all copies of the data item

In the Python code below, we register an arbitrarily chosen file residing on an external storage, Acacia, located at the Pawsey Centre. We make use of the DLM deployment on the DP platform, in the dp-dm namespace.

1. Prepare token to be placed in the header of your REST calls

  • Open a browser and go to the /token_by_auth_flow endpoint on the DLM server URL.

  • For example, for the DLM deployment on the DP cluster: https://sdhp.stfc.skao.int/dp-dm/dlm/token_by_auth_flow.

  • Login with your credentials

  • If successful, a token will be returned. Copy the token.

from requests import Session

# this URL is for DLM deployment in the 'dp-dm' namespace on the DP test platform
DLM_URL = "https://sdhp.stfc.skao.int/dp-dm/dlm"
token = <your token>
headers = {"Authorization": f"Bearer {token}"}
session = Session()

2. Check if your desired location (e.g., Pawsey) is already known to DLM

# create location details
location_name = "Pawsey"
location_type = "low-integration"

location = session.get(
    f"{DLM_URL}/storage/query_location",
    params={"location_name": location_name},
    headers=headers,
    timeout=60,
)
print(location.json())
location_id = location.json()[0]["location_id"]  # if location exists, get the location id

If your desired location doesn’t already exist, initialise it

loc_params = {
    "location_name": location_name,
    "location_type": location_type,
}
location = session.post(
    f"{DLM_URL}/storage/init_location", params=loc_params, headers=headers, timeout=60
)
print(location.json())
location_id = location.json()  # get the location id

3. Check if your desired storage (e.g., Acacia) is already known to DLM

storage_params = {
    "storage_name": "Acacia",
    "location_id": location_id,
}
storage = session.get(
    f"{DLM_URL}/storage/query_storage", params=storage_params, headers=headers, timeout=60
)
print(storage.json())
storage_id = storage.json()[0]["storage_id"]  # if the storage exists, get the storage id

If your storage is not listed, register a supported storage endpoint. The default config_type is rclone. For more information, refer to the rclone configuration docs.

storage_params = {
    "storage_name": "Acacia",
    "root_directory": "rascil", # an existing directory on Acacia
    "location_id": location_id,
    "storage_type": "objectstore",
    "storage_interface": "s3",
    "storage_capacity": 100000000,
}
storage = session.post(
    f"{DLM_URL}/storage/init_storage",
    params=storage_params,
    headers=headers,
    timeout=60,
)
print(storage.json())
storage_id = storage.json()  # get the storage_id

4. Check what configurations for your storage are already known to DLM

config = session.get(
    f"{DLM_URL}/storage/get_storage_config",
    params={"storage_id": storage_id},
    headers=headers,
    timeout=60,
)
print(config.json())

Supply a configuration for your storage, if one doesn’t exist. The default config_type is rclone. For further details, refer to the rclone configuration docs.

acacia_config = {
    "name": "myacacia",
    "type": "s3",
    "parameters": {
        "access_key_id": "<your-access-key-id>",
        "endpoint": "https://projects.pawsey.org.au",
        "provider": "Ceph",
        "secret_access_key": "<your-secret-access-key>",
    },
}
config = session.post(
    f"{DLM_URL}/storage/create_storage_config",
    params={"storage_id": storage_id},
    json=acacia_config,
    headers=headers,
    timeout=60,
)
print(config.json())

5. Register a data item that exists on your storage

item_params = {
    "item_name": "test_item",
    "uri": "1197634128-cal_avg32.ms.tar.xj", # randomly chosen example file
    "storage_name": "Acacia",
    "storage_id": storage_id,
}
json_body = {"execution_block": "eb-m001-20191031-12345"}  # example metadata
acacia_response = session.post(
    f"{DLM_URL}/ingest/register_data_item",
    params=item_params,
    json=json_body,
    headers=headers,
    timeout=60,
)
print(acacia_response.json())

6. Trigger a migration to a second storage. If your destination storage isn’t known to DLM, first initialise it (using the method above).

migration_params = {
    "item_name": "test_item",
    "destination_name": <dest_storage>,
    "path": <dest_path>,
}
migration_response = session.post(
    f"{DLM_URL}/migration/copy_data_item",
    params=migration_params,
    headers=headers,
    timeout=60,
)
print(migration_response.json())

7. Query for all copies of the data item

response = session.get(
    f"{DLM_URL}/request/query_data_item",
    params={"item_name": "test_item"},
    headers=headers,
    timeout=60,
)
print(response.json())