Quickstart example: Python

A convenient Python wrapper class has been provided as an example to get quickly up and running calculations with the OLI API(s).

Sample wrapper class

The user can simple copy this code over and use in an existing Python project. This class provides a simple interface described below.

Please go to the very end of this page to see an example program using this class

import requests
import json
import time

class OLIApi:
    '''
    A class to wrap OLI Cloud API calls to be accessible in a simple manner. This
    is just an example
    '''
    def __init__(self, username, password):
        '''
        Constructs all necessary attributes for OLIApi class

        username: user's username
        password: user's password
        '''
        self.__username = username
        self.__password = password
        self.__jwt_token = ""
        self.__refresh_token = ""
        self.__root_url = "https://api.olisystems.com"
        self.__auth_url = "https://auth.olisystems.com/auth/realms/api/protocol/openid-connect/token"
        self.__dbs_url = self.__root_url + "/channel/dbs"
        self.__upload_dbs_url = self.__root_url + "/channel/upload/dbs"

    def login(self):
        '''
        Login into user credentials for the OLI Cloud and returns:
        :return: True on success, False on failure
        '''

        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        body = {
            "username": self.__username,
            "password": self.__password,
            "grant_type": "password",
            "client_id": "apiclient",
        }

        req_result = requests.post(self.__auth_url, headers=headers, data=body)
        if req_result.status_code == 200:
            req_result = req_result.json()
            if "access_token" in req_result:
                self.__jwt_token = req_result["access_token"]
                if "refresh_token" in req_result:
                    self.__refresh_token = req_result["refresh_token"]
                    return True

        return False

    def refresh_token(self):
        '''
        Refreshes the access token using the reresh token got obtained on login and returns:
        :return: True on success, False on failure
        '''

        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        body = {
            "refresh_token": self.__refresh_token,
            "grant_type": "refresh_token",
            "client_id": "apiclient",
        }

        req_result = requests.post(self.__auth_url, headers=headers, data=body)
        if req_result.status_code == 200:
            req_result = req_result.json()
            if bool(req_result):
                if "access_token" in req_result:
                    self.__jwt_token = req_result["access_token"]
                    if "refresh_token" in req_result:
                        self.__refresh_token = req_result["refresh_token"]
                        return True

        return False

    def request_auto_login(self, req_func):
        '''
        Gets a new access token if the request returns with an expired token error. First tries with the refresh token
        if its still active or simple relogs in using the username and password.

        :param req_func: function to call
        :return: Returns an empty dict if failed
        '''

        num_tries = 1
        while num_tries <= 2:

            headers = {
                "authorization": "Bearer " + self.__jwt_token
            }

            req_result = req_func(headers)
            if req_result.status_code == 200:
                ret_val = json.loads(req_result.text)
                return ret_val
            elif num_tries == 1 and req_result.status_code == 401:
                req_result = req_result.json()
                if not self.refresh_token():
                    if not self.login():
                         break
            else:
                break
            num_tries = num_tries + 1

        return dict()

    def upload_dbs_file(self, file_path):
        '''
        Uploads a dbs file to the OLI Cloud given a full file path.

        :param file_path: full path to dbs file
        :return: dictionary containing the
        uploaded file id
        '''
        req_result = dict()

        # read the file data in
        try:
            with open(file_path, "rb") as file:
                files = {"files": file}

                req_result = self.request_auto_login(lambda headers: requests.post(self.__upload_dbs_url, headers=headers,
                                               files=files))
        except IOError:
            pass

        return req_result

    def get_user_dbs_files(self):
        '''
        Returns a dictionary containing a list of dbs file(s) uploaded

        :return: dictionary containing list of dbs files
        '''
        return self.request_auto_login(
            lambda headers: requests.get(self.__dbs_url, headers=headers))

    def call(self, function_name, chemistry_model_file_id, json_input = dict(), poll_time = 1.0, max_request = 1000):
        '''
        calls a function in the OLI Engine API.

        :param function_name: name of function to call
        :param chemistry_model_file_id: the chemistry model file if for this calculation
        :param json_input: calculation input JSON
        :param poll_time: max delay between each call
        :param max_request: maximum requests
        :return: dictionary containing result or error
        '''

        # formulate url
        endpoint = ""
        method = "POST"
        if function_name == "chemistry-info" or function_name == "corrosion-contact-surface":
            endpoint = self.__root_url + "/engine/file/" + chemistry_model_file_id + "/" + function_name
            method = "GET"
        else:
            endpoint = self.__root_url + "/engine/flash/" + chemistry_model_file_id + "/" + function_name
            method = "POST"

        # http body
        if bool(json_input):
            data = json.dumps(json_input)
        else:
            data = ""

        def add_additional_header(headers):
            headers["content-type"] = "application/json"
            if method == "POST":
                return requests.post(endpoint, headers=headers, data=data)

            output = requests.get(endpoint, headers=headers, data=data)
            return output

        #first call
        results_link = ""
        start_time = time.time()
        request_result1 = self.request_auto_login(add_additional_header)
        end_time = time.time()
        request_time = end_time - start_time
        print("First request time =", request_time)
        if bool(request_result1):
            if request_result1["status"] == "SUCCESS":
                if "data" in request_result1:
                    if "status" in request_result1["data"]:
                        if request_result1["data"]["status"] == "IN QUEUE" or request_result1["data"]["status"] == "IN PROGRESS":
                            if "resultsLink" in request_result1["data"]:
                                results_link = request_result1["data"]["resultsLink"]

        print(results_link)

        # error in getting results link
        if results_link == "":
            return dict()

        # poll on results link until success
        data = ""
        endpoint = results_link
        method = "GET"
        request_iter = 0
        while True:
            # make request and time
            start_time = time.time()
            request_result2 = self.request_auto_login(add_additional_header)
            end_time = time.time()
            request_time = end_time - start_time
            print("Second request time =", request_time)

            # check if max requests exceeded
            request_iter = request_iter + 1
            if request_iter > max_request:
                break

            # extract
            print(request_result2)
            if bool(request_result2):
                if "status" in request_result2:
                    status = request_result2["status"]
                    print(status)
                    if status == "PROCESSED" or status == "FAILED":
                        if "data" in request_result2:
                            return request_result2["data"]
                        else:
                            break
                    elif status == "IN QUEUE" or status == "IN PROGRESS":
                        if poll_time > request_time:
                            time.sleep(poll_time - request_time)
                        continue
                    else:
                        break
                else:
                    break
            else:
                break

        return dict()

Example program demonstrating running an isothermal calculation

Replace "username" and "password" with correct values.

# example program (Isothermal flash)
if __name__ == "__main__":
    oliapi = OLIApi("username", "password")
    if oliapi.login():

        # upload chemistry file (this needs to be done only once to get the file id)
        # this needs to be only done once per chemistry model file
        # after that the id is suffcient
        result = oliapi.upload_dbs_file("API_CALL_ISOTHERMAL\\test_isothermal.dbs")
        print(json.dumps(result, indent=2))

        chemistry_file_id = result["file"][0]["id"]

        # display all available dbs files
        result = oliapi.get_user_dbs_files()
        print(json.dumps(result, indent=2))

        # get chemistry information
        result = oliapi.call("chemistry-info", chemistry_file_id)
        print(json.dumps(result, indent=2))

        # create isothermal flash input
        flash_input = {
            "params": {
                "temperature": {
                    "value": 30.0,
                    "unit": "°C"
                },
                "pressure": {
                    "value": 1.5,
                    "unit": "atm"
                },
                "inflows": {
                    "unit": "mol",
                    "values": {
                        "H2O": 50.0,
                        "CO2": 10.0,
                        "NACL": 20.0,
                        "BENZENE": 10.0
                    }
                }
            }
        }

        # call the flash function
        result = oliapi.call("isothermal", chemistry_file_id, flash_input)
        print(json.dumps(result, indent=2).encode('utf8'))

Last updated