Quickstart example: Python
A convenient Python wrapper class has been provided as an example to get quickly up and running calculations with the OLI API(s).
Sample wrapper class
The user can simple copy this code over and use in an existing Python project. This class provides a simple interface described below.
Please go to the very end of this page to see an example program using this class
import requests
import json
import time
class OLIApi:
'''
A class to wrap OLI Cloud API calls to be accessible in a simple manner. This
is just an example
'''
def __init__(self, username, password):
'''
Constructs all necessary attributes for OLIApi class
username: user's username
password: user's password
'''
self.__username = username
self.__password = password
self.__jwt_token = ""
self.__refresh_token = ""
self.__root_url = "https://api.olisystems.com"
self.__auth_url = "https://auth.olisystems.com/auth/realms/api/protocol/openid-connect/token"
self.__dbs_url = self.__root_url + "/channel/dbs"
self.__upload_dbs_url = self.__root_url + "/channel/upload/dbs"
def login(self):
'''
Login into user credentials for the OLI Cloud and returns:
:return: True on success, False on failure
'''
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
body = {
"username": self.__username,
"password": self.__password,
"grant_type": "password",
"client_id": "apiclient",
}
req_result = requests.post(self.__auth_url, headers=headers, data=body)
if req_result.status_code == 200:
req_result = req_result.json()
if "access_token" in req_result:
self.__jwt_token = req_result["access_token"]
if "refresh_token" in req_result:
self.__refresh_token = req_result["refresh_token"]
return True
return False
def refresh_token(self):
'''
Refreshes the access token using the reresh token got obtained on login and returns:
:return: True on success, False on failure
'''
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
body = {
"refresh_token": self.__refresh_token,
"grant_type": "refresh_token",
"client_id": "apiclient",
}
req_result = requests.post(self.__auth_url, headers=headers, data=body)
if req_result.status_code == 200:
req_result = req_result.json()
if bool(req_result):
if "access_token" in req_result:
self.__jwt_token = req_result["access_token"]
if "refresh_token" in req_result:
self.__refresh_token = req_result["refresh_token"]
return True
return False
def request_auto_login(self, req_func):
'''
Gets a new access token if the request returns with an expired token error. First tries with the refresh token
if its still active or simple relogs in using the username and password.
:param req_func: function to call
:return: Returns an empty dict if failed
'''
num_tries = 1
while num_tries <= 2:
headers = {
"authorization": "Bearer " + self.__jwt_token
}
req_result = req_func(headers)
if req_result.status_code == 200:
ret_val = json.loads(req_result.text)
return ret_val
elif num_tries == 1 and req_result.status_code == 401:
req_result = req_result.json()
if not self.refresh_token():
if not self.login():
break
else:
break
num_tries = num_tries + 1
return dict()
def upload_dbs_file(self, file_path):
'''
Uploads a dbs file to the OLI Cloud given a full file path.
:param file_path: full path to dbs file
:return: dictionary containing the
uploaded file id
'''
req_result = dict()
# read the file data in
try:
with open(file_path, "rb") as file:
files = {"files": file}
req_result = self.request_auto_login(lambda headers: requests.post(self.__upload_dbs_url, headers=headers,
files=files))
except IOError:
pass
return req_result
def get_user_dbs_files(self):
'''
Returns a dictionary containing a list of dbs file(s) uploaded
:return: dictionary containing list of dbs files
'''
return self.request_auto_login(
lambda headers: requests.get(self.__dbs_url, headers=headers))
def call(self, function_name, chemistry_model_file_id, json_input = dict(), poll_time = 1.0, max_request = 1000):
'''
calls a function in the OLI Engine API.
:param function_name: name of function to call
:param chemistry_model_file_id: the chemistry model file if for this calculation
:param json_input: calculation input JSON
:param poll_time: max delay between each call
:param max_request: maximum requests
:return: dictionary containing result or error
'''
# formulate url
endpoint = ""
method = "POST"
if function_name == "chemistry-info" or function_name == "corrosion-contact-surface":
endpoint = self.__root_url + "/engine/file/" + chemistry_model_file_id + "/" + function_name
method = "GET"
else:
endpoint = self.__root_url + "/engine/flash/" + chemistry_model_file_id + "/" + function_name
method = "POST"
# http body
if bool(json_input):
data = json.dumps(json_input)
else:
data = ""
def add_additional_header(headers):
headers["content-type"] = "application/json"
if method == "POST":
return requests.post(endpoint, headers=headers, data=data)
output = requests.get(endpoint, headers=headers, data=data)
return output
#first call
results_link = ""
start_time = time.time()
request_result1 = self.request_auto_login(add_additional_header)
end_time = time.time()
request_time = end_time - start_time
print("First request time =", request_time)
if bool(request_result1):
if request_result1["status"] == "SUCCESS":
if "data" in request_result1:
if "status" in request_result1["data"]:
if request_result1["data"]["status"] == "IN QUEUE" or request_result1["data"]["status"] == "IN PROGRESS":
if "resultsLink" in request_result1["data"]:
results_link = request_result1["data"]["resultsLink"]
print(results_link)
# error in getting results link
if results_link == "":
return dict()
# poll on results link until success
data = ""
endpoint = results_link
method = "GET"
request_iter = 0
while True:
# make request and time
start_time = time.time()
request_result2 = self.request_auto_login(add_additional_header)
end_time = time.time()
request_time = end_time - start_time
print("Second request time =", request_time)
# check if max requests exceeded
request_iter = request_iter + 1
if request_iter > max_request:
break
# extract
print(request_result2)
if bool(request_result2):
if "status" in request_result2:
status = request_result2["status"]
print(status)
if status == "PROCESSED" or status == "FAILED":
if "data" in request_result2:
return request_result2["data"]
else:
break
elif status == "IN QUEUE" or status == "IN PROGRESS":
if poll_time > request_time:
time.sleep(poll_time - request_time)
continue
else:
break
else:
break
else:
break
return dict()
Example program demonstrating running an isothermal calculation
Replace "username" and "password" with correct values.
# example program (Isothermal flash)
if __name__ == "__main__":
oliapi = OLIApi("username", "password")
if oliapi.login():
# upload chemistry file (this needs to be done only once to get the file id)
# this needs to be only done once per chemistry model file
# after that the id is suffcient
result = oliapi.upload_dbs_file("API_CALL_ISOTHERMAL\\test_isothermal.dbs")
print(json.dumps(result, indent=2))
chemistry_file_id = result["file"][0]["id"]
# display all available dbs files
result = oliapi.get_user_dbs_files()
print(json.dumps(result, indent=2))
# get chemistry information
result = oliapi.call("chemistry-info", chemistry_file_id)
print(json.dumps(result, indent=2))
# create isothermal flash input
flash_input = {
"params": {
"temperature": {
"value": 30.0,
"unit": "°C"
},
"pressure": {
"value": 1.5,
"unit": "atm"
},
"inflows": {
"unit": "mol",
"values": {
"H2O": 50.0,
"CO2": 10.0,
"NACL": 20.0,
"BENZENE": 10.0
}
}
}
}
# call the flash function
result = oliapi.call("isothermal", chemistry_file_id, flash_input)
print(json.dumps(result, indent=2).encode('utf8'))
Last updated