Module learning_orchestra_client.function.python
Expand source code
from learning_orchestra_client.observe.observe import Observer
from learning_orchestra_client._util._response_treat import ResponseTreat
from learning_orchestra_client._util._entity_reader import EntityReader
import requests
from typing import Union
class FunctionPython:
__CODE_FIELD = "function"
__PARAMETERS_FIELD = "functionParameters"
__NAME_FIELD = "name"
__DESCRIPTION_FIELD = "description"
def __init__(self, cluster_ip: str):
self.__api_path = "/api/learningOrchestra/v1/function/python"
self.__service_url = f'{cluster_ip}{self.__api_path}'
self.__response_treat = ResponseTreat()
self.__cluster_ip = cluster_ip
self.__entity_reader = EntityReader(self.__service_url)
self.__observer = Observer(self.__cluster_ip)
def run_function_sync(self,
name: str,
parameters: dict,
code: str,
description: str = "",
pretty_response: bool = False) -> Union[dict, str]:
"""
description: This method runs a python 3 code in sync mode, so it
represents a wildcard for the data scientist. It can be used when
train, predict, tune, explore or any other pipe must be customized. The
function is also useful for new pipes. pretty_response: If true it
returns a string, otherwise a dictionary.
name: Is the name of the object stored in Learning Orchestra storage
system (volume or mongoDB).
url: Url to CSV file.
return: A JSON object with an error or warning message or the correct
operation result.
"""
request_body = {
self.__NAME_FIELD: name,
self.__PARAMETERS_FIELD: parameters,
self.__CODE_FIELD: code,
self.__DESCRIPTION_FIELD: description}
request_url = self.__service_url
response = requests.post(url=request_url, json=request_body)
self.__observer.wait(name)
return self.__response_treat.treatment(response, pretty_response)
def run_function_async(self,
name: str,
parameters: dict,
code: str,
description: str = "",
pretty_response: bool = False) -> Union[dict, str]:
"""
description: This method runs a python 3 code in async mode, so it
represents a wildcard for the data scientist. It does not lock the
caller, so a wait method must be used. It can be used when train,
predict, tune, explore or any other pipe must be customized. The
function is also useful for new pipes.
pretty_response: If true it returns a string, otherwise a dictionary.
name: Is the name of the function to be called
code: the Python code
parameters: the parameters of the function being called
return: A JSON object with an error or warning message or the correct
operation result.
"""
request_body = {
self.__NAME_FIELD: name,
self.__PARAMETERS_FIELD: parameters,
self.__CODE_FIELD: code,
self.__DESCRIPTION_FIELD: description}
request_url = self.__service_url
response = requests.post(url=request_url, json=request_body)
return self.__response_treat.treatment(response, pretty_response)
def search_all_executions(self, pretty_response: bool = False) \
-> Union[dict, str]:
"""
description: This method retrieves all created functions metadata,
i.e., it does not retrieve the function result content.
pretty_response: If true it returns a string, otherwise a dictionary.
return: All function executions metadata stored in Learning Orchestra
or an empty result.
"""
response = self.__entity_reader.read_all_instances_from_entity()
return self.__response_treat.treatment(response, pretty_response)
def delete_execution(self, name: str, pretty_response=False) \
-> Union[dict, str]:
"""
description: This method is responsible for deleting the function.
This delete operation is asynchronous, so it does not lock the caller
until the deletion finished. Instead, it returns a JSON object with a
URL for a future use. The caller uses the URL for delete checks.
pretty_response: If true it returns a string, otherwise a dictionary.
name: Represents the function name.
return: JSON object with an error message, a warning message or a
correct delete message
"""
request_url = f'{self.__service_url}/{name}'
response = requests.delete(request_url)
return self.__response_treat.treatment(response, pretty_response)
def search_execution_content(self,
name: str,
query: dict = {},
limit: int = 10,
skip: int = 0,
pretty_response: bool = False) \
-> Union[dict, str]:
"""
description: This method is responsible for retrieving the function
results, including metadata. A function is executed many times, using
different parameters,
thus many results are stored
in Learning Orchestra.
pretty_response: If true it returns a string, otherwise a dictionary.
name: Is the name of the function.
query: Query to make in MongoDB(default: empty query)
limit: Number of rows to return in pagination(default: 10) (maximum is
set at 20 rows per request)
skip: Number of rows to skip in pagination(default: 0)
return:
A page with some function results inside or an error if there
is no such function. The current page is also returned to be used in
future content requests.
"""
response = self.__entity_reader.read_entity_content(
name, query, limit, skip)
return self.__response_treat.treatment(response, pretty_response)
def wait(self, dataset_name: str, timeout: int = None) -> dict:
"""
description: This method is responsible to create a synchronization
barrier for the run_function_async method or delete_function method.
name: Represents the function name.
timeout: Represents the time in seconds to wait for a function to
finish its run.
return: JSON object with an error message, a warning message or a
correct function result
"""
return self.__observer.wait(dataset_name, timeout)
Classes
class FunctionPython (cluster_ip: str)
-
Expand source code
class FunctionPython: __CODE_FIELD = "function" __PARAMETERS_FIELD = "functionParameters" __NAME_FIELD = "name" __DESCRIPTION_FIELD = "description" def __init__(self, cluster_ip: str): self.__api_path = "/api/learningOrchestra/v1/function/python" self.__service_url = f'{cluster_ip}{self.__api_path}' self.__response_treat = ResponseTreat() self.__cluster_ip = cluster_ip self.__entity_reader = EntityReader(self.__service_url) self.__observer = Observer(self.__cluster_ip) def run_function_sync(self, name: str, parameters: dict, code: str, description: str = "", pretty_response: bool = False) -> Union[dict, str]: """ description: This method runs a python 3 code in sync mode, so it represents a wildcard for the data scientist. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the object stored in Learning Orchestra storage system (volume or mongoDB). url: Url to CSV file. return: A JSON object with an error or warning message or the correct operation result. """ request_body = { self.__NAME_FIELD: name, self.__PARAMETERS_FIELD: parameters, self.__CODE_FIELD: code, self.__DESCRIPTION_FIELD: description} request_url = self.__service_url response = requests.post(url=request_url, json=request_body) self.__observer.wait(name) return self.__response_treat.treatment(response, pretty_response) def run_function_async(self, name: str, parameters: dict, code: str, description: str = "", pretty_response: bool = False) -> Union[dict, str]: """ description: This method runs a python 3 code in async mode, so it represents a wildcard for the data scientist. It does not lock the caller, so a wait method must be used. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function to be called code: the Python code parameters: the parameters of the function being called return: A JSON object with an error or warning message or the correct operation result. """ request_body = { self.__NAME_FIELD: name, self.__PARAMETERS_FIELD: parameters, self.__CODE_FIELD: code, self.__DESCRIPTION_FIELD: description} request_url = self.__service_url response = requests.post(url=request_url, json=request_body) return self.__response_treat.treatment(response, pretty_response) def search_all_executions(self, pretty_response: bool = False) \ -> Union[dict, str]: """ description: This method retrieves all created functions metadata, i.e., it does not retrieve the function result content. pretty_response: If true it returns a string, otherwise a dictionary. return: All function executions metadata stored in Learning Orchestra or an empty result. """ response = self.__entity_reader.read_all_instances_from_entity() return self.__response_treat.treatment(response, pretty_response) def delete_execution(self, name: str, pretty_response=False) \ -> Union[dict, str]: """ description: This method is responsible for deleting the function. This delete operation is asynchronous, so it does not lock the caller until the deletion finished. Instead, it returns a JSON object with a URL for a future use. The caller uses the URL for delete checks. pretty_response: If true it returns a string, otherwise a dictionary. name: Represents the function name. return: JSON object with an error message, a warning message or a correct delete message """ request_url = f'{self.__service_url}/{name}' response = requests.delete(request_url) return self.__response_treat.treatment(response, pretty_response) def search_execution_content(self, name: str, query: dict = {}, limit: int = 10, skip: int = 0, pretty_response: bool = False) \ -> Union[dict, str]: """ description: This method is responsible for retrieving the function results, including metadata. A function is executed many times, using different parameters, thus many results are stored in Learning Orchestra. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function. query: Query to make in MongoDB(default: empty query) limit: Number of rows to return in pagination(default: 10) (maximum is set at 20 rows per request) skip: Number of rows to skip in pagination(default: 0) return: A page with some function results inside or an error if there is no such function. The current page is also returned to be used in future content requests. """ response = self.__entity_reader.read_entity_content( name, query, limit, skip) return self.__response_treat.treatment(response, pretty_response) def wait(self, dataset_name: str, timeout: int = None) -> dict: """ description: This method is responsible to create a synchronization barrier for the run_function_async method or delete_function method. name: Represents the function name. timeout: Represents the time in seconds to wait for a function to finish its run. return: JSON object with an error message, a warning message or a correct function result """ return self.__observer.wait(dataset_name, timeout)
Methods
def delete_execution(self, name: str, pretty_response=False) ‑> Union[dict, str]
-
description: This method is responsible for deleting the function. This delete operation is asynchronous, so it does not lock the caller until the deletion finished. Instead, it returns a JSON object with a URL for a future use. The caller uses the URL for delete checks.
pretty_response: If true it returns a string, otherwise a dictionary. name: Represents the function name.
return: JSON object with an error message, a warning message or a correct delete message
Expand source code
def delete_execution(self, name: str, pretty_response=False) \ -> Union[dict, str]: """ description: This method is responsible for deleting the function. This delete operation is asynchronous, so it does not lock the caller until the deletion finished. Instead, it returns a JSON object with a URL for a future use. The caller uses the URL for delete checks. pretty_response: If true it returns a string, otherwise a dictionary. name: Represents the function name. return: JSON object with an error message, a warning message or a correct delete message """ request_url = f'{self.__service_url}/{name}' response = requests.delete(request_url) return self.__response_treat.treatment(response, pretty_response)
def run_function_async(self, name: str, parameters: dict, code: str, description: str = '', pretty_response: bool = False) ‑> Union[dict, str]
-
description: This method runs a python 3 code in async mode, so it represents a wildcard for the data scientist. It does not lock the caller, so a wait method must be used. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes.
pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function to be called code: the Python code parameters: the parameters of the function being called
return: A JSON object with an error or warning message or the correct operation result.
Expand source code
def run_function_async(self, name: str, parameters: dict, code: str, description: str = "", pretty_response: bool = False) -> Union[dict, str]: """ description: This method runs a python 3 code in async mode, so it represents a wildcard for the data scientist. It does not lock the caller, so a wait method must be used. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function to be called code: the Python code parameters: the parameters of the function being called return: A JSON object with an error or warning message or the correct operation result. """ request_body = { self.__NAME_FIELD: name, self.__PARAMETERS_FIELD: parameters, self.__CODE_FIELD: code, self.__DESCRIPTION_FIELD: description} request_url = self.__service_url response = requests.post(url=request_url, json=request_body) return self.__response_treat.treatment(response, pretty_response)
def run_function_sync(self, name: str, parameters: dict, code: str, description: str = '', pretty_response: bool = False) ‑> Union[dict, str]
-
description: This method runs a python 3 code in sync mode, so it represents a wildcard for the data scientist. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes. pretty_response: If true it returns a string, otherwise a dictionary.
name: Is the name of the object stored in Learning Orchestra storage system (volume or mongoDB). url: Url to CSV file.
return: A JSON object with an error or warning message or the correct operation result.
Expand source code
def run_function_sync(self, name: str, parameters: dict, code: str, description: str = "", pretty_response: bool = False) -> Union[dict, str]: """ description: This method runs a python 3 code in sync mode, so it represents a wildcard for the data scientist. It can be used when train, predict, tune, explore or any other pipe must be customized. The function is also useful for new pipes. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the object stored in Learning Orchestra storage system (volume or mongoDB). url: Url to CSV file. return: A JSON object with an error or warning message or the correct operation result. """ request_body = { self.__NAME_FIELD: name, self.__PARAMETERS_FIELD: parameters, self.__CODE_FIELD: code, self.__DESCRIPTION_FIELD: description} request_url = self.__service_url response = requests.post(url=request_url, json=request_body) self.__observer.wait(name) return self.__response_treat.treatment(response, pretty_response)
def search_all_executions(self, pretty_response: bool = False) ‑> Union[dict, str]
-
description: This method retrieves all created functions metadata, i.e., it does not retrieve the function result content.
pretty_response: If true it returns a string, otherwise a dictionary.
return: All function executions metadata stored in Learning Orchestra or an empty result.
Expand source code
def search_all_executions(self, pretty_response: bool = False) \ -> Union[dict, str]: """ description: This method retrieves all created functions metadata, i.e., it does not retrieve the function result content. pretty_response: If true it returns a string, otherwise a dictionary. return: All function executions metadata stored in Learning Orchestra or an empty result. """ response = self.__entity_reader.read_all_instances_from_entity() return self.__response_treat.treatment(response, pretty_response)
def search_execution_content(self, name: str, query: dict = {}, limit: int = 10, skip: int = 0, pretty_response: bool = False) ‑> Union[dict, str]
-
description: This method is responsible for retrieving the function results, including metadata. A function is executed many times, using different parameters, thus many results are stored in Learning Orchestra.
pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function. query: Query to make in MongoDB(default: empty query) limit: Number of rows to return in pagination(default: 10) (maximum is set at 20 rows per request) skip: Number of rows to skip in pagination(default: 0)
return: A page with some function results inside or an error if there is no such function. The current page is also returned to be used in future content requests.
Expand source code
def search_execution_content(self, name: str, query: dict = {}, limit: int = 10, skip: int = 0, pretty_response: bool = False) \ -> Union[dict, str]: """ description: This method is responsible for retrieving the function results, including metadata. A function is executed many times, using different parameters, thus many results are stored in Learning Orchestra. pretty_response: If true it returns a string, otherwise a dictionary. name: Is the name of the function. query: Query to make in MongoDB(default: empty query) limit: Number of rows to return in pagination(default: 10) (maximum is set at 20 rows per request) skip: Number of rows to skip in pagination(default: 0) return: A page with some function results inside or an error if there is no such function. The current page is also returned to be used in future content requests. """ response = self.__entity_reader.read_entity_content( name, query, limit, skip) return self.__response_treat.treatment(response, pretty_response)
def wait(self, dataset_name: str, timeout: int = None) ‑> dict
-
description: This method is responsible to create a synchronization barrier for the run_function_async method or delete_function method.
name: Represents the function name. timeout: Represents the time in seconds to wait for a function to finish its run.
return: JSON object with an error message, a warning message or a correct function result
Expand source code
def wait(self, dataset_name: str, timeout: int = None) -> dict: """ description: This method is responsible to create a synchronization barrier for the run_function_async method or delete_function method. name: Represents the function name. timeout: Represents the time in seconds to wait for a function to finish its run. return: JSON object with an error message, a warning message or a correct function result """ return self.__observer.wait(dataset_name, timeout)