Quickstart example: Python
A convenient Python wrapper class has been provided as an example to get quickly up and running calculations with the OLI API(s).
Sample wrapper class
The user can simple copy this code over and use in an existing Python project. This class provides a simple interface described below.
Please go to the very end of this page to see an example program using this class
import requests
import json
import time
class OLIApi:
A class to wrap OLI Cloud API calls to be accessible in a simple manner. This
is just an example
def __init__(self, username, password):
Constructs all necessary attributes for OLIApi class
username: user's username
password: user's password
self.__username = username
self.__password = password
self.__jwt_token = ""
self.__refresh_token = ""
self.__root_url = "https://api.olisystems.com"
self.__auth_url = "https://auth.olisystems.com/auth/realms/api/protocol/openid-connect/token"
self.__dbs_url = self.__root_url + "/channel/dbs"
self.__upload_dbs_url = self.__root_url + "/channel/upload/dbs"
def login(self):
Login into user credentials for the OLI Cloud and returns:
:return: True on success, False on failure
headers = {
"Content-Type": "application/x-www-form-urlencoded"
body = {
"username": self.__username,
"password": self.__password,
"grant_type": "password",
"client_id": "apiclient",
req_result = requests.post(self.__auth_url, headers=headers, data=body)
if req_result.status_code == 200:
req_result = req_result.json()
if "access_token" in req_result:
self.__jwt_token = req_result["access_token"]
if "refresh_token" in req_result:
self.__refresh_token = req_result["refresh_token"]
return True
return False
def refresh_token(self):
Refreshes the access token using the reresh token got obtained on login and returns:
:return: True on success, False on failure
headers = {
"Content-Type": "application/x-www-form-urlencoded"
body = {
"refresh_token": self.__refresh_token,
"grant_type": "refresh_token",
"client_id": "apiclient",
req_result = requests.post(self.__auth_url, headers=headers, data=body)
if req_result.status_code == 200:
req_result = req_result.json()
if bool(req_result):
if "access_token" in req_result:
self.__jwt_token = req_result["access_token"]
if "refresh_token" in req_result:
self.__refresh_token = req_result["refresh_token"]
return True
return False
def request_auto_login(self, req_func):
Gets a new access token if the request returns with an expired token error. First tries with the refresh token
if its still active or simple relogs in using the username and password.
:param req_func: function to call
:return: Returns an empty dict if failed
num_tries = 1
while num_tries <= 2:
headers = {
"authorization": "Bearer " + self.__jwt_token
req_result = req_func(headers)
if req_result.status_code == 200:
ret_val = json.loads(req_result.text)
return ret_val
elif num_tries == 1 and req_result.status_code == 401:
req_result = req_result.json()
if not self.refresh_token():
if not self.login():
num_tries = num_tries + 1
return dict()
def upload_dbs_file(self, file_path):
Uploads a dbs file to the OLI Cloud given a full file path.
:param file_path: full path to dbs file
:return: dictionary containing the
uploaded file id
req_result = dict()
# read the file data in
with open(file_path, "rb") as file:
files = {"files": file}
req_result = self.request_auto_login(lambda headers: requests.post(self.__upload_dbs_url, headers=headers,
except IOError:
return req_result
def get_user_dbs_files(self):
Returns a dictionary containing a list of dbs file(s) uploaded
:return: dictionary containing list of dbs files
return self.request_auto_login(
lambda headers: requests.get(self.__dbs_url, headers=headers))
def call(self, function_name, chemistry_model_file_id, json_input = dict(), poll_time = 1.0, max_request = 1000):
calls a function in the OLI Engine API.
:param function_name: name of function to call
:param chemistry_model_file_id: the chemistry model file if for this calculation
:param json_input: calculation input JSON
:param poll_time: max delay between each call
:param max_request: maximum requests
:return: dictionary containing result or error
# formulate url
endpoint = ""
method = "POST"
if function_name == "chemistry-info" or function_name == "corrosion-contact-surface":
endpoint = self.__root_url + "/engine/file/" + chemistry_model_file_id + "/" + function_name
method = "GET"
endpoint = self.__root_url + "/engine/flash/" + chemistry_model_file_id + "/" + function_name
method = "POST"
# http body
if bool(json_input):
data = json.dumps(json_input)
data = ""
def add_additional_header(headers):
headers["content-type"] = "application/json"
if method == "POST":
return requests.post(endpoint, headers=headers, data=data)
output = requests.get(endpoint, headers=headers, data=data)
return output
#first call
results_link = ""
start_time = time.time()
request_result1 = self.request_auto_login(add_additional_header)
end_time = time.time()
request_time = end_time - start_time
print("First request time =", request_time)
if bool(request_result1):
if request_result1["status"] == "SUCCESS":
if "data" in request_result1:
if "status" in request_result1["data"]:
if request_result1["data"]["status"] == "IN QUEUE" or request_result1["data"]["status"] == "IN PROGRESS":
if "resultsLink" in request_result1["data"]:
results_link = request_result1["data"]["resultsLink"]
# error in getting results link
if results_link == "":
return dict()
# poll on results link until success
data = ""
endpoint = results_link
method = "GET"
request_iter = 0
while True:
# make request and time
start_time = time.time()
request_result2 = self.request_auto_login(add_additional_header)
end_time = time.time()
request_time = end_time - start_time
print("Second request time =", request_time)
# check if max requests exceeded
request_iter = request_iter + 1
if request_iter > max_request:
# extract
if bool(request_result2):
if "status" in request_result2:
status = request_result2["status"]
if status == "PROCESSED" or status == "FAILED":
if "data" in request_result2:
return request_result2["data"]
elif status == "IN QUEUE" or status == "IN PROGRESS":
if poll_time > request_time:
time.sleep(poll_time - request_time)
return dict()
Example program demonstrating running an isothermal calculation
Replace "username" and "password" with correct values.
# example program (Isothermal flash)
if __name__ == "__main__":
oliapi = OLIApi("username", "password")
if oliapi.login():
# upload chemistry file (this needs to be done only once to get the file id)
# this needs to be only done once per chemistry model file
# after that the id is suffcient
result = oliapi.upload_dbs_file("API_CALL_ISOTHERMAL\\test_isothermal.dbs")
print(json.dumps(result, indent=2))
chemistry_file_id = result["file"][0]["id"]
# display all available dbs files
result = oliapi.get_user_dbs_files()
print(json.dumps(result, indent=2))
# get chemistry information
result = oliapi.call("chemistry-info", chemistry_file_id)
print(json.dumps(result, indent=2))
# create isothermal flash input
flash_input = {
"params": {
"temperature": {
"value": 30.0,
"unit": "°C"
"pressure": {
"value": 1.5,
"unit": "atm"
"inflows": {
"unit": "mol",
"values": {
"H2O": 50.0,
"CO2": 10.0,
"NACL": 20.0,
"BENZENE": 10.0
# call the flash function
result = oliapi.call("isothermal", chemistry_file_id, flash_input)
print(json.dumps(result, indent=2).encode('utf8'))
Last updated
Was this helpful?