External Storage
Ingest and Migrate a Data Item
This section outlines the steps required to ingest and migrate data using the ska-dlm REST API interface.
Obtain an API token, authorizing use of DLM by a specific user
Determine the location of the file(s) you wish to register, and register this location with DLM
Register the storage on the DLM system
Ensure the storage is accessible from DLM
Ingest the files into DLM one-by-one
Instruct DLM to migrate the newly ingested item to a secondary storage
Query for all copies of the data item
In the Python code below, we register an arbitrarily chosen file residing on an external storage, Acacia, located at the Pawsey Centre. We make use of the DLM deployment on the DP platform, in the dp-dm namespace.
1. Prepare token to be placed in the header of your REST calls
Open a browser and go to the
/token_by_auth_flowendpoint on the DLM server URL.For example, for the DLM deployment on the DP cluster:
https://sdhp.stfc.skao.int/dp-dm/dlm/token_by_auth_flow.Login with your credentials
If successful, a token will be returned. Copy the token.
from requests import Session
# this URL is for DLM deployment in the 'dp-dm' namespace on the DP test platform
DLM_URL = "https://sdhp.stfc.skao.int/dp-dm/dlm"
token = <your token>
headers = {"Authorization": f"Bearer {token}"}
session = Session()
2. Check if your desired location (e.g., Pawsey) is already known to DLM
# create location details
location_name = "Pawsey"
location_type = "low-integration"
location = session.get(
f"{DLM_URL}/storage/query_location",
params={"location_name": location_name},
headers=headers,
timeout=60,
)
print(location.json())
location_id = location.json()[0]["location_id"] # if location exists, get the location id
If your desired location doesn’t already exist, initialise it
loc_params = {
"location_name": location_name,
"location_type": location_type,
}
location = session.post(
f"{DLM_URL}/storage/init_location", params=loc_params, headers=headers, timeout=60
)
print(location.json())
location_id = location.json() # get the location id
3. Check if your desired storage (e.g., Acacia) is already known to DLM
storage_params = {
"storage_name": "Acacia",
"location_id": location_id,
}
storage = session.get(
f"{DLM_URL}/storage/query_storage", params=storage_params, headers=headers, timeout=60
)
print(storage.json())
storage_id = storage.json()[0]["storage_id"] # if the storage exists, get the storage id
If your storage is not listed, register a supported storage endpoint. The default config_type is rclone. For more information, refer to the rclone configuration docs.
storage_params = {
"storage_name": "Acacia",
"root_directory": "rascil", # an existing directory on Acacia
"location_id": location_id,
"storage_type": "objectstore",
"storage_interface": "s3",
"storage_capacity": 100000000,
}
storage = session.post(
f"{DLM_URL}/storage/init_storage",
params=storage_params,
headers=headers,
timeout=60,
)
print(storage.json())
storage_id = storage.json() # get the storage_id
4. Check what configurations for your storage are already known to DLM
config = session.get(
f"{DLM_URL}/storage/get_storage_config",
params={"storage_id": storage_id},
headers=headers,
timeout=60,
)
print(config.json())
Supply a configuration for your storage, if one doesn’t exist. The default config_type is rclone. For further details, refer to the rclone configuration docs.
acacia_config = {
"name": "myacacia",
"type": "s3",
"parameters": {
"access_key_id": "<your-access-key-id>",
"endpoint": "https://projects.pawsey.org.au",
"provider": "Ceph",
"secret_access_key": "<your-secret-access-key>",
},
}
config = session.post(
f"{DLM_URL}/storage/create_storage_config",
params={"storage_id": storage_id},
json=acacia_config,
headers=headers,
timeout=60,
)
print(config.json())
5. Register a data item that exists on your storage
item_params = {
"item_name": "test_item",
"uri": "1197634128-cal_avg32.ms.tar.xj", # randomly chosen example file
"storage_name": "Acacia",
"storage_id": storage_id,
}
json_body = {"execution_block": "eb-m001-20191031-12345"} # example metadata
acacia_response = session.post(
f"{DLM_URL}/ingest/register_data_item",
params=item_params,
json=json_body,
headers=headers,
timeout=60,
)
print(acacia_response.json())
6. Trigger a migration to a second storage. If your destination storage isn’t known to DLM, first initialise it (using the method above).
migration_params = {
"item_name": "test_item",
"destination_name": <dest_storage>,
"path": <dest_path>,
}
migration_response = session.post(
f"{DLM_URL}/migration/copy_data_item",
params=migration_params,
headers=headers,
timeout=60,
)
print(migration_response.json())
7. Query for all copies of the data item
response = session.get(
f"{DLM_URL}/request/query_data_item",
params={"item_name": "test_item"},
headers=headers,
timeout=60,
)
print(response.json())