Module learning_orchestra_client.observe.observe
Expand source code
from pymongo import MongoClient, change_stream
class Observer:
__TIMEOUT_TIME_MULTIPLICATION = 1000
__MAX_WAIT_TIME = 240
def __init__(self, cluster_ip: str):
cluster_ip = cluster_ip.replace("http://", "")
mongo_url = f'mongodb://root:owl45%2321@{cluster_ip}'
mongo_client = MongoClient(
mongo_url
)
self.__database = mongo_client.database
def wait(self, name: str, timeout: int = None) -> dict:
"""
:description: Observe the end of a pipe for a timeout seconds or
until the pipe finishes its execution.
name: Represents the pipe name. Any tune, train, predict service can
wait its finish with a
wait method call.
timeout: the maximum time to wait the observed step, in seconds.
Default time is 4 minutes.
:return: If True it returns a String. Otherwise, it returns
a dictionary with the content of a mongo collection, representing
any pipe result
"""
dataset_collection = self.__database[name]
metadata_query = {"_id": 0}
dataset_metadata = dataset_collection.find_one(metadata_query)
if dataset_metadata["finished"]:
return dataset_metadata
observer_query = [
{'$match': {
'$and':
[
{'operationType': 'update'},
{'fullDocument.finished': {'$eq': True}}
]
}}
]
timeout = self.__MAX_WAIT_TIME if timeout is None else timeout
return dataset_collection.watch(
observer_query,
full_document='updateLookup',
max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION
).next()['fullDocument']
def observe_pipe(self, name: str, timeout: int = None) -> \
change_stream.CollectionChangeStream:
"""
:description: It waits until a pipe change its content
(replace, insert, update and delete mongoDB collection operation
types), so it is a bit different
from wait method with a timeout and a finish explicit condition.
:name: the name of the pipe to be observed. A train, predict, explore,
transform or any
other pipe can be observed.
timeout: the maximum time to wait the observed step, in milliseconds.
:return: A pymongo CollectionChangeStream object. You must use the
builtin next() method to iterate over changes.
"""
observer_query = [
{'$match': {
'$or': [
{'operationType': 'replace'},
{'operationType': 'insert'},
{'operationType': 'update'},
{'operationType': 'delete'}
]
}}
]
return self.__database[name].watch(
observer_query,
max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION,
full_document='updateLookup')
Classes
class Observer (cluster_ip: str)
-
Expand source code
class Observer: __TIMEOUT_TIME_MULTIPLICATION = 1000 __MAX_WAIT_TIME = 240 def __init__(self, cluster_ip: str): cluster_ip = cluster_ip.replace("http://", "") mongo_url = f'mongodb://root:owl45%2321@{cluster_ip}' mongo_client = MongoClient( mongo_url ) self.__database = mongo_client.database def wait(self, name: str, timeout: int = None) -> dict: """ :description: Observe the end of a pipe for a timeout seconds or until the pipe finishes its execution. name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call. timeout: the maximum time to wait the observed step, in seconds. Default time is 4 minutes. :return: If True it returns a String. Otherwise, it returns a dictionary with the content of a mongo collection, representing any pipe result """ dataset_collection = self.__database[name] metadata_query = {"_id": 0} dataset_metadata = dataset_collection.find_one(metadata_query) if dataset_metadata["finished"]: return dataset_metadata observer_query = [ {'$match': { '$and': [ {'operationType': 'update'}, {'fullDocument.finished': {'$eq': True}} ] }} ] timeout = self.__MAX_WAIT_TIME if timeout is None else timeout return dataset_collection.watch( observer_query, full_document='updateLookup', max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION ).next()['fullDocument'] def observe_pipe(self, name: str, timeout: int = None) -> \ change_stream.CollectionChangeStream: """ :description: It waits until a pipe change its content (replace, insert, update and delete mongoDB collection operation types), so it is a bit different from wait method with a timeout and a finish explicit condition. :name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed. timeout: the maximum time to wait the observed step, in milliseconds. :return: A pymongo CollectionChangeStream object. You must use the builtin next() method to iterate over changes. """ observer_query = [ {'$match': { '$or': [ {'operationType': 'replace'}, {'operationType': 'insert'}, {'operationType': 'update'}, {'operationType': 'delete'} ] }} ] return self.__database[name].watch( observer_query, max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION, full_document='updateLookup')
Methods
def observe_pipe(self, name: str, timeout: int = None) ‑> pymongo.change_stream.CollectionChangeStream
-
:description: It waits until a pipe change its content (replace, insert, update and delete mongoDB collection operation types), so it is a bit different from wait method with a timeout and a finish explicit condition.
:name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed. timeout: the maximum time to wait the observed step, in milliseconds.
:return: A pymongo CollectionChangeStream object. You must use the builtin next() method to iterate over changes.
Expand source code
def observe_pipe(self, name: str, timeout: int = None) -> \ change_stream.CollectionChangeStream: """ :description: It waits until a pipe change its content (replace, insert, update and delete mongoDB collection operation types), so it is a bit different from wait method with a timeout and a finish explicit condition. :name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed. timeout: the maximum time to wait the observed step, in milliseconds. :return: A pymongo CollectionChangeStream object. You must use the builtin next() method to iterate over changes. """ observer_query = [ {'$match': { '$or': [ {'operationType': 'replace'}, {'operationType': 'insert'}, {'operationType': 'update'}, {'operationType': 'delete'} ] }} ] return self.__database[name].watch( observer_query, max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION, full_document='updateLookup')
def wait(self, name: str, timeout: int = None) ‑> dict
-
:description: Observe the end of a pipe for a timeout seconds or until the pipe finishes its execution.
name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call. timeout: the maximum time to wait the observed step, in seconds. Default time is 4 minutes.
:return: If True it returns a String. Otherwise, it returns a dictionary with the content of a mongo collection, representing any pipe result
Expand source code
def wait(self, name: str, timeout: int = None) -> dict: """ :description: Observe the end of a pipe for a timeout seconds or until the pipe finishes its execution. name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call. timeout: the maximum time to wait the observed step, in seconds. Default time is 4 minutes. :return: If True it returns a String. Otherwise, it returns a dictionary with the content of a mongo collection, representing any pipe result """ dataset_collection = self.__database[name] metadata_query = {"_id": 0} dataset_metadata = dataset_collection.find_one(metadata_query) if dataset_metadata["finished"]: return dataset_metadata observer_query = [ {'$match': { '$and': [ {'operationType': 'update'}, {'fullDocument.finished': {'$eq': True}} ] }} ] timeout = self.__MAX_WAIT_TIME if timeout is None else timeout return dataset_collection.watch( observer_query, full_document='updateLookup', max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION ).next()['fullDocument']