Importing data from Databricks into SC Navigator

This article describes how to import new data from Databricks into your AIMMS SC Navigator cloud account using your own Databricks environment. You can download the example Jupyter notebook here and use the template directly in your Databricks environment, or use it locally if you have Python installed.

Parquet File Definition SC Navigator

This how to article assumes you have already created datasets according to the Parquet File Definition of SC Navigator. You can find more information about the Parquet File Definition here: Parquet File Structure.

What to do in AIMMS

Each AIMMS SC Navigator cloud account is by default equipped with an Azure Data Lake Storage Gen2 (ADLS). For this route we will be utilizing the external access to this storage account to import new data.

The results will be saved in the parquet file format in the import folder of the ADLS.

You will also need to create a SAS-token to get external access to the ADLS. You can find instructions on how to do this on the page SAS Tokens.

Configuration in Databricks

You can download the full Jupyter notebook example from the link in the first paragraph and replace the ‘TODO’-items in the notebook with your own variables. Below we will describe the code fragments and how they can be used.

In Databricks you will be using Python packages for the connection to the ADLS. First these need to be installed:

1%pip install azure-storage-file-datalake azure-identity

Including a restart to apply them:

1# restart python to apply installed packages
2dbutils.library.restartPython()

First you load the data in Databricks you would like to upload to SC Navigator. For this example we have all our input data in Hive tables in the sc_navigator_input schema. We explicitly define which datasets of this schema we want to upload by using the datasets list.

 1datasets = ['_dataset_info',
 2            'bill_of_material',
 3            'custom_objective',
 4            'customer_definition',
 5            'customer_groups',
 6            'customer_product',
 7            'inventory_product_step',
 8            'inventory_step',
 9            'location_definition',
10            'location_groups',
11            'location_period',
12            'location',
13            'mode_of_transport_definition',
14            'mode_of_transport_product',
15            'period_definition',
16            'product_definition',
17            'product_groups',
18            'production_bom',
19            'production_definition',
20            'production_groups',
21            'production_routing',
22            'production_step',
23            'settings',
24            'supplier_definition',
25            'supplier_product_step',
26            'supplier_step',
27            'transport_trip_data',
28            'uom_product',
29            'warehouse_definition',
30            'warehouse_groups',
31            'warehouse_product_step',
32            'warehouse_step']

With the following code we load the data from our schema and create Pandas Dataframes for every table. We also do some transformations since Hive does not support spaces or brackets.

 1 # get the prepared data from databricks hive tables and do some transformations
 2 import numpy as np
 3 dataframes_for_input = dict()
 4 for dataset in datasets:
 5     # load the data
 6     df = spark.table("sc_navigator_input." + dataset).toPandas()
 7     # transform to the Parquet File Definition SC Navigator definition
 8     df.columns = [x.replace("_", " ") for x in df.columns]
 9     df.columns = [x.replace(".", "(") for x in df.columns]
10     df.columns = [x.replace("-", ")") for x in df.columns]
11     df = df.replace({np.nan: None})
12     if dataset != "_dataset_info":
13         dataset = dataset.replace("_", "-")
14     # store result
15     dataframes_for_input[dataset] = df

Then you’ll need to provide the SAS-token and the URL, both of which are exposed/generated in the previous step when creating the SAS-token:

1# TODO: provide the sas token and the URL of your storage
2 account_url_full = "<put here your URL>"
3 # remove the folder from the url
4 account_url = account_url_full[:-20]
5 sas_token = "<put here your sas token>"
6 # the file system is fixed, this is the container SC Navigator will write its results
7 file_system="sc-navigator-import"

An example of the value expected at account_url_full is https://CloudName.dfs.core.windows.net/sc-navigator-export. For the sas_token parameter you should only paste the SAS-token itself.

You’ll need to import more required packages:

1import os
2from azure.storage.filedatalake import (
3     DataLakeServiceClient,
4     DataLakeDirectoryClient,
5     FileSystemClient
6)
7from azure.identity import DefaultAzureCredential

And create a DataLakeServiceClient to access the storage:

1# create a DataLakeServiceClient to access the storage
2dsc = DataLakeServiceClient(account_url=account_url, credential=sas_token)

You can now list all available scenarios on the storage:

 1# list all available scenario's on the storage
 2# Note that you need to save the scenario from SCN with datalake storage enables before they will be visible!
 3file_system_client = dsc.get_file_system_client(file_system=file_system)
 4paths = file_system_client.get_paths()
 5list_of_scenarios = list()
 6for path in paths:
 7     if "/" not in path.name and path.name != "info.txt":
 8             print(path.name + '\n')
 9             list_of_scenarios.append(path.name)
10     else:
11             continue

If this code block gives an error, please check the provided URL and the SAS token (this token needs to be valid and cannot be expired). For further reference we refer to the Microsoft documentation.

Now we create a directory on ADLS to upload the data to:

1# create a new directory to store the datasets
2 directory_name = "<directory name>"
3 dld_client = file_system_client.create_directory(directory_name)

If you made a mistake our would like to delete data you can do the following:

1client = dsc.get_directory_client(file_system=file_system,directory="<directory name>")
2client.delete_directory()

Before we upload any data we would like to inspect the _dataset_info table. The Dataset field will be used to identify the uploaded data in SC Navigator

1dataframes_for_input["_dataset_info"]

Now we are ready to upload the prepared datasets:

1# upload the files for the selected scenario's
2 client = dld_client
3 for k,v in dataframes_for_input.items():
4     file_name = k + ".parquet"
5     file_client = client.create_file(file_name)
6     bytes_data = v.to_parquet()
7     file_client.append_data(data=bytes_data, offset=0, length=len(bytes_data))
8     file_client.flush_data(len(bytes_data))

To show all data:

1# show all data
2[k for k,v in dataframes_for_visualization.items()]

You could use the following code to see if the upload is successful and inspect the data on ADLS.

1 import io
2 import pandas as pd
3 file_client = client.get_file_client("_dataset_info.parquet")
4 data_file = file_client.download_file()
5 data_binary = data_file.readall()
6 parquet_file = io.BytesIO(data_binary)
7 df = pd.read_parquet(parquet_file)
8 display(df)

Congratulations! You just uploaded data from Databricks to SC Navigator. Now open the System Configuration app (or ask you admin to do this) and push the rebuild overview button on the File System Management tab. If you now open SC Navigator you should see that your dataset is added to the overview.