Module learning_orchestra_client.builder.builder

Expand source code
from learning_orchestra_client.observe.observe import Observer
from learning_orchestra_client._util._response_treat import ResponseTreat
from learning_orchestra_client._util._entity_reader import EntityReader
import requests
from typing import Union


class BuilderSparkMl:
    __TRAIN_FIELD = "trainDatasetName"
    __TEST_FIELD = "testDatasetName"
    __CODE_FIELD = "modelingCode"
    __CLASSIFIERS_LIST_FIELD = "classifiersList"

    def __init__(self, cluster_ip: str):
        self.__api_path = "/api/learningOrchestra/v1/builder/sparkml"
        self.__service_url = f'{cluster_ip}{self.__api_path}'
        self.__response_treat = ResponseTreat()
        self.__cluster_ip = cluster_ip
        self.__entity_reader = EntityReader(self.__service_url)
        self.__observer = Observer(self.__cluster_ip)

    def run_spark_ml_sync(self,
                          train_dataset_name: str,
                          test_dataset_name: str,
                          modeling_code: str,
                          model_classifiers: list,
                          pretty_response: bool = False) -> Union[dict, str]:
        """
        description: This method call runs several steps of a machine
        learning pipeline (transform, tune, train and evaluate, for instance)
        using a model code and several classifiers. It represents a way to run
        an entire pipeline. The caller waits until the method execution ends,
        since it is a synchronous method.

        train_dataset_name: Represent final train dataset.
        test_dataset_name: Represent final test dataset.
        modeling_code: Represent Python3 code for pyspark pre-processing model
        model_classifiers: list of initial classifiers to be used in the model
        pretty_response: if True it represents a result useful for visualization

        return: The set of predictions (URIs of them).
        """

        request_body_content = {
            self.__TRAIN_FIELD: train_dataset_name,
            self.__TEST_FIELD: test_dataset_name,
            self.__CODE_FIELD: modeling_code,
            self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
        }
        response = requests.post(url=self.__service_url,
                                 json=request_body_content)

        for classifier in model_classifiers:
            self.__observer.wait(f'{test_dataset_name}{classifier}')

        return self.__response_treat.treatment(response, pretty_response)

    def run_spark_ml_async(self,
                           train_dataset_name: str,
                           test_dataset_name: str,
                           modeling_code: str,
                           model_classifiers: list,
                           pretty_response: bool = False) -> Union[dict, str]:
        """
        description: This method call runs several steps of a machine
        learning pipeline (transform, tune, train and evaluate, for instance)
        using a model code and several classifiers. It represents a way to run
        an entire pipeline. The caller does not wait until the method execution
        ends, since it is an asynchronous method.

        train_dataset_name: Represent final train dataset.
        test_dataset_name: Represent final test dataset.
        modeling_code: Represent Python3 code for pyspark pre-processing model
        model_classifiers: list of initial classifiers to be used in the model
        pretty_response: if True it represents a result useful for visualization

        return: the URL to retrieve the Spark pipeline result
        """

        request_body_content = {
            self.__TRAIN_FIELD: train_dataset_name,
            self.__TEST_FIELD: test_dataset_name,
            self.__CODE_FIELD: modeling_code,
            self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
        }
        response = requests.post(url=self.__service_url,
                                 json=request_body_content)

        return self.__response_treat.treatment(response, pretty_response)

    def search_all_builders(self, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method retrieves all model predictions metadata. It
        does not retrieve the model predictions content.

        pretty_response: If true it returns a string, otherwise a dictionary.

        return: A list with all model predictions metadata stored in Learning
        Orchestra or an empty result.
        """

        response = self.__entity_reader.read_all_instances_from_entity()

        return self.__response_treat.treatment(response, pretty_response)

    def search_builder_register_predictions(self,
                                            builder_name: str,
                                            query: dict = {},
                                            limit: int = 10,
                                            skip: int = 0,
                                            pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method is responsible for retrieving the model
        predictions content.

        pretty_response: If true it returns a string, otherwise a dictionary.
        builder_name: Represents the model predictions name.
        query: Query to make in MongoDB(default: empty query)
        limit: Number of rows to return in pagination(default: 10) (maximum is
        set at 20 rows per request)
        skip: Number of rows to skip in pagination(default: 0)

        return: A page with some tuples or registers inside or an error if the
        pipeline runs incorrectly. The current page is also returned to be used
        in future content requests.
        """

        response = self.__entity_reader.read_entity_content(
            builder_name, query, limit, skip)

        return self.__response_treat.treatment(response, pretty_response)

    def search_builder(self, builder_name: str, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description:  This method is responsible for retrieving a specific
        model metadata.

        pretty_response: If true return indented string, else return dict.
        builder_name: Represents the model predictions name.
        limit: Number of rows to return in pagination(default: 10) (maximum is
        set at 20 rows per request)
        skip: Number of rows to skip in pagination(default: 0)

        return: Specific model prediction metadata stored in Learning Orchestra
        or an error if there is no such projections.
        """
        response = self.search_builder_register_predictions(
            builder_name, limit=1,
            pretty_response=pretty_response)

        return response

    def delete_builder(self, builder_name: str, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method is responsible for deleting a model prediction.
        The delete operation is always asynchronous,
        since the deletion is performed in background.

        pretty_response: If true it returns a string, otherwise a dictionary.
        builder_name: Represents the pipeline name.

        return: JSON object with an error message, a warning message or a
        correct delete message
        """

        cluster_url_dataset = f'{self.__service_url}/{builder_name}'

        response = requests.delete(cluster_url_dataset)

        return self.__response_treat.treatment(response, pretty_response)

    def wait(self, dataset_name: str, timeout: int = None) -> dict:
        """
           description: This method is responsible to create a synchronization
           barrier for the run_spark_ml_async method.

           dataset_name: Represents the pipeline name.
           timeout: Represents the time in seconds to wait for a builder to
           finish its run.

           return: JSON object with an error message, a warning message or a
           correct execution of a pipeline
        """
        return self.__observer.wait(dataset_name, timeout)

Classes

class BuilderSparkMl (cluster_ip: str)
Expand source code
class BuilderSparkMl:
    __TRAIN_FIELD = "trainDatasetName"
    __TEST_FIELD = "testDatasetName"
    __CODE_FIELD = "modelingCode"
    __CLASSIFIERS_LIST_FIELD = "classifiersList"

    def __init__(self, cluster_ip: str):
        self.__api_path = "/api/learningOrchestra/v1/builder/sparkml"
        self.__service_url = f'{cluster_ip}{self.__api_path}'
        self.__response_treat = ResponseTreat()
        self.__cluster_ip = cluster_ip
        self.__entity_reader = EntityReader(self.__service_url)
        self.__observer = Observer(self.__cluster_ip)

    def run_spark_ml_sync(self,
                          train_dataset_name: str,
                          test_dataset_name: str,
                          modeling_code: str,
                          model_classifiers: list,
                          pretty_response: bool = False) -> Union[dict, str]:
        """
        description: This method call runs several steps of a machine
        learning pipeline (transform, tune, train and evaluate, for instance)
        using a model code and several classifiers. It represents a way to run
        an entire pipeline. The caller waits until the method execution ends,
        since it is a synchronous method.

        train_dataset_name: Represent final train dataset.
        test_dataset_name: Represent final test dataset.
        modeling_code: Represent Python3 code for pyspark pre-processing model
        model_classifiers: list of initial classifiers to be used in the model
        pretty_response: if True it represents a result useful for visualization

        return: The set of predictions (URIs of them).
        """

        request_body_content = {
            self.__TRAIN_FIELD: train_dataset_name,
            self.__TEST_FIELD: test_dataset_name,
            self.__CODE_FIELD: modeling_code,
            self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
        }
        response = requests.post(url=self.__service_url,
                                 json=request_body_content)

        for classifier in model_classifiers:
            self.__observer.wait(f'{test_dataset_name}{classifier}')

        return self.__response_treat.treatment(response, pretty_response)

    def run_spark_ml_async(self,
                           train_dataset_name: str,
                           test_dataset_name: str,
                           modeling_code: str,
                           model_classifiers: list,
                           pretty_response: bool = False) -> Union[dict, str]:
        """
        description: This method call runs several steps of a machine
        learning pipeline (transform, tune, train and evaluate, for instance)
        using a model code and several classifiers. It represents a way to run
        an entire pipeline. The caller does not wait until the method execution
        ends, since it is an asynchronous method.

        train_dataset_name: Represent final train dataset.
        test_dataset_name: Represent final test dataset.
        modeling_code: Represent Python3 code for pyspark pre-processing model
        model_classifiers: list of initial classifiers to be used in the model
        pretty_response: if True it represents a result useful for visualization

        return: the URL to retrieve the Spark pipeline result
        """

        request_body_content = {
            self.__TRAIN_FIELD: train_dataset_name,
            self.__TEST_FIELD: test_dataset_name,
            self.__CODE_FIELD: modeling_code,
            self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
        }
        response = requests.post(url=self.__service_url,
                                 json=request_body_content)

        return self.__response_treat.treatment(response, pretty_response)

    def search_all_builders(self, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method retrieves all model predictions metadata. It
        does not retrieve the model predictions content.

        pretty_response: If true it returns a string, otherwise a dictionary.

        return: A list with all model predictions metadata stored in Learning
        Orchestra or an empty result.
        """

        response = self.__entity_reader.read_all_instances_from_entity()

        return self.__response_treat.treatment(response, pretty_response)

    def search_builder_register_predictions(self,
                                            builder_name: str,
                                            query: dict = {},
                                            limit: int = 10,
                                            skip: int = 0,
                                            pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method is responsible for retrieving the model
        predictions content.

        pretty_response: If true it returns a string, otherwise a dictionary.
        builder_name: Represents the model predictions name.
        query: Query to make in MongoDB(default: empty query)
        limit: Number of rows to return in pagination(default: 10) (maximum is
        set at 20 rows per request)
        skip: Number of rows to skip in pagination(default: 0)

        return: A page with some tuples or registers inside or an error if the
        pipeline runs incorrectly. The current page is also returned to be used
        in future content requests.
        """

        response = self.__entity_reader.read_entity_content(
            builder_name, query, limit, skip)

        return self.__response_treat.treatment(response, pretty_response)

    def search_builder(self, builder_name: str, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description:  This method is responsible for retrieving a specific
        model metadata.

        pretty_response: If true return indented string, else return dict.
        builder_name: Represents the model predictions name.
        limit: Number of rows to return in pagination(default: 10) (maximum is
        set at 20 rows per request)
        skip: Number of rows to skip in pagination(default: 0)

        return: Specific model prediction metadata stored in Learning Orchestra
        or an error if there is no such projections.
        """
        response = self.search_builder_register_predictions(
            builder_name, limit=1,
            pretty_response=pretty_response)

        return response

    def delete_builder(self, builder_name: str, pretty_response: bool = False) \
            -> Union[dict, str]:
        """
        description: This method is responsible for deleting a model prediction.
        The delete operation is always asynchronous,
        since the deletion is performed in background.

        pretty_response: If true it returns a string, otherwise a dictionary.
        builder_name: Represents the pipeline name.

        return: JSON object with an error message, a warning message or a
        correct delete message
        """

        cluster_url_dataset = f'{self.__service_url}/{builder_name}'

        response = requests.delete(cluster_url_dataset)

        return self.__response_treat.treatment(response, pretty_response)

    def wait(self, dataset_name: str, timeout: int = None) -> dict:
        """
           description: This method is responsible to create a synchronization
           barrier for the run_spark_ml_async method.

           dataset_name: Represents the pipeline name.
           timeout: Represents the time in seconds to wait for a builder to
           finish its run.

           return: JSON object with an error message, a warning message or a
           correct execution of a pipeline
        """
        return self.__observer.wait(dataset_name, timeout)

Methods

def delete_builder(self, builder_name: str, pretty_response: bool = False) ‑> Union[dict, str]

description: This method is responsible for deleting a model prediction. The delete operation is always asynchronous, since the deletion is performed in background.

pretty_response: If true it returns a string, otherwise a dictionary. builder_name: Represents the pipeline name.

return: JSON object with an error message, a warning message or a correct delete message

Expand source code
def delete_builder(self, builder_name: str, pretty_response: bool = False) \
        -> Union[dict, str]:
    """
    description: This method is responsible for deleting a model prediction.
    The delete operation is always asynchronous,
    since the deletion is performed in background.

    pretty_response: If true it returns a string, otherwise a dictionary.
    builder_name: Represents the pipeline name.

    return: JSON object with an error message, a warning message or a
    correct delete message
    """

    cluster_url_dataset = f'{self.__service_url}/{builder_name}'

    response = requests.delete(cluster_url_dataset)

    return self.__response_treat.treatment(response, pretty_response)
def run_spark_ml_async(self, train_dataset_name: str, test_dataset_name: str, modeling_code: str, model_classifiers: list, pretty_response: bool = False) ‑> Union[dict, str]

description: This method call runs several steps of a machine learning pipeline (transform, tune, train and evaluate, for instance) using a model code and several classifiers. It represents a way to run an entire pipeline. The caller does not wait until the method execution ends, since it is an asynchronous method.

train_dataset_name: Represent final train dataset. test_dataset_name: Represent final test dataset. modeling_code: Represent Python3 code for pyspark pre-processing model model_classifiers: list of initial classifiers to be used in the model pretty_response: if True it represents a result useful for visualization

return: the URL to retrieve the Spark pipeline result

Expand source code
def run_spark_ml_async(self,
                       train_dataset_name: str,
                       test_dataset_name: str,
                       modeling_code: str,
                       model_classifiers: list,
                       pretty_response: bool = False) -> Union[dict, str]:
    """
    description: This method call runs several steps of a machine
    learning pipeline (transform, tune, train and evaluate, for instance)
    using a model code and several classifiers. It represents a way to run
    an entire pipeline. The caller does not wait until the method execution
    ends, since it is an asynchronous method.

    train_dataset_name: Represent final train dataset.
    test_dataset_name: Represent final test dataset.
    modeling_code: Represent Python3 code for pyspark pre-processing model
    model_classifiers: list of initial classifiers to be used in the model
    pretty_response: if True it represents a result useful for visualization

    return: the URL to retrieve the Spark pipeline result
    """

    request_body_content = {
        self.__TRAIN_FIELD: train_dataset_name,
        self.__TEST_FIELD: test_dataset_name,
        self.__CODE_FIELD: modeling_code,
        self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
    }
    response = requests.post(url=self.__service_url,
                             json=request_body_content)

    return self.__response_treat.treatment(response, pretty_response)
def run_spark_ml_sync(self, train_dataset_name: str, test_dataset_name: str, modeling_code: str, model_classifiers: list, pretty_response: bool = False) ‑> Union[dict, str]

description: This method call runs several steps of a machine learning pipeline (transform, tune, train and evaluate, for instance) using a model code and several classifiers. It represents a way to run an entire pipeline. The caller waits until the method execution ends, since it is a synchronous method.

train_dataset_name: Represent final train dataset. test_dataset_name: Represent final test dataset. modeling_code: Represent Python3 code for pyspark pre-processing model model_classifiers: list of initial classifiers to be used in the model pretty_response: if True it represents a result useful for visualization

return: The set of predictions (URIs of them).

Expand source code
def run_spark_ml_sync(self,
                      train_dataset_name: str,
                      test_dataset_name: str,
                      modeling_code: str,
                      model_classifiers: list,
                      pretty_response: bool = False) -> Union[dict, str]:
    """
    description: This method call runs several steps of a machine
    learning pipeline (transform, tune, train and evaluate, for instance)
    using a model code and several classifiers. It represents a way to run
    an entire pipeline. The caller waits until the method execution ends,
    since it is a synchronous method.

    train_dataset_name: Represent final train dataset.
    test_dataset_name: Represent final test dataset.
    modeling_code: Represent Python3 code for pyspark pre-processing model
    model_classifiers: list of initial classifiers to be used in the model
    pretty_response: if True it represents a result useful for visualization

    return: The set of predictions (URIs of them).
    """

    request_body_content = {
        self.__TRAIN_FIELD: train_dataset_name,
        self.__TEST_FIELD: test_dataset_name,
        self.__CODE_FIELD: modeling_code,
        self.__CLASSIFIERS_LIST_FIELD: model_classifiers,
    }
    response = requests.post(url=self.__service_url,
                             json=request_body_content)

    for classifier in model_classifiers:
        self.__observer.wait(f'{test_dataset_name}{classifier}')

    return self.__response_treat.treatment(response, pretty_response)
def search_all_builders(self, pretty_response: bool = False) ‑> Union[dict, str]

description: This method retrieves all model predictions metadata. It does not retrieve the model predictions content.

pretty_response: If true it returns a string, otherwise a dictionary.

return: A list with all model predictions metadata stored in Learning Orchestra or an empty result.

Expand source code
def search_all_builders(self, pretty_response: bool = False) \
        -> Union[dict, str]:
    """
    description: This method retrieves all model predictions metadata. It
    does not retrieve the model predictions content.

    pretty_response: If true it returns a string, otherwise a dictionary.

    return: A list with all model predictions metadata stored in Learning
    Orchestra or an empty result.
    """

    response = self.__entity_reader.read_all_instances_from_entity()

    return self.__response_treat.treatment(response, pretty_response)
def search_builder(self, builder_name: str, pretty_response: bool = False) ‑> Union[dict, str]

description: This method is responsible for retrieving a specific model metadata.

pretty_response: If true return indented string, else return dict. builder_name: Represents the model predictions name. limit: Number of rows to return in pagination(default: 10) (maximum is set at 20 rows per request) skip: Number of rows to skip in pagination(default: 0)

return: Specific model prediction metadata stored in Learning Orchestra or an error if there is no such projections.

Expand source code
def search_builder(self, builder_name: str, pretty_response: bool = False) \
        -> Union[dict, str]:
    """
    description:  This method is responsible for retrieving a specific
    model metadata.

    pretty_response: If true return indented string, else return dict.
    builder_name: Represents the model predictions name.
    limit: Number of rows to return in pagination(default: 10) (maximum is
    set at 20 rows per request)
    skip: Number of rows to skip in pagination(default: 0)

    return: Specific model prediction metadata stored in Learning Orchestra
    or an error if there is no such projections.
    """
    response = self.search_builder_register_predictions(
        builder_name, limit=1,
        pretty_response=pretty_response)

    return response
def search_builder_register_predictions(self, builder_name: str, query: dict = {}, limit: int = 10, skip: int = 0, pretty_response: bool = False) ‑> Union[dict, str]

description: This method is responsible for retrieving the model predictions content.

pretty_response: If true it returns a string, otherwise a dictionary. builder_name: Represents the model predictions name. query: Query to make in MongoDB(default: empty query) limit: Number of rows to return in pagination(default: 10) (maximum is set at 20 rows per request) skip: Number of rows to skip in pagination(default: 0)

return: A page with some tuples or registers inside or an error if the pipeline runs incorrectly. The current page is also returned to be used in future content requests.

Expand source code
def search_builder_register_predictions(self,
                                        builder_name: str,
                                        query: dict = {},
                                        limit: int = 10,
                                        skip: int = 0,
                                        pretty_response: bool = False) \
        -> Union[dict, str]:
    """
    description: This method is responsible for retrieving the model
    predictions content.

    pretty_response: If true it returns a string, otherwise a dictionary.
    builder_name: Represents the model predictions name.
    query: Query to make in MongoDB(default: empty query)
    limit: Number of rows to return in pagination(default: 10) (maximum is
    set at 20 rows per request)
    skip: Number of rows to skip in pagination(default: 0)

    return: A page with some tuples or registers inside or an error if the
    pipeline runs incorrectly. The current page is also returned to be used
    in future content requests.
    """

    response = self.__entity_reader.read_entity_content(
        builder_name, query, limit, skip)

    return self.__response_treat.treatment(response, pretty_response)
def wait(self, dataset_name: str, timeout: int = None) ‑> dict

description: This method is responsible to create a synchronization barrier for the run_spark_ml_async method.

dataset_name: Represents the pipeline name. timeout: Represents the time in seconds to wait for a builder to finish its run.

return: JSON object with an error message, a warning message or a correct execution of a pipeline

Expand source code
def wait(self, dataset_name: str, timeout: int = None) -> dict:
    """
       description: This method is responsible to create a synchronization
       barrier for the run_spark_ml_async method.

       dataset_name: Represents the pipeline name.
       timeout: Represents the time in seconds to wait for a builder to
       finish its run.

       return: JSON object with an error message, a warning message or a
       correct execution of a pipeline
    """
    return self.__observer.wait(dataset_name, timeout)