Skip to content

Predictors



Helpers for predictors

NetResponseBase

Bases: BaseModel

Base for the network response

Source code in src/predictors/utils.py
class NetResponseBase(BaseModel):
    """Base for the network response"""
    status_code: int = Field(description="Status code of processing the request")
    message: Optional[str] = Field(default=None, description="Message returned when processing "
                                                             "the request")

    @property
    def status(self) -> bool:
        """Return a boolean based on the status code"""
        return self.status_code == 200

    def dict(self, *args, **kwargs):
        """Add the status property to .dict"""
        dict_ = super().model_dump(*args, **kwargs)
        dict_['status'] = self.status
        return dict_

status: bool property

Return a boolean based on the status code

dict(*args, **kwargs)

Add the status property to .dict

Source code in src/predictors/utils.py
def dict(self, *args, **kwargs):
    """Add the status property to .dict"""
    dict_ = super().model_dump(*args, **kwargs)
    dict_['status'] = self.status
    return dict_

TFResponseModel

Bases: NetResponseBase

The response of a network served with tensorflow serving

Source code in src/predictors/utils.py
class TFResponseModel(NetResponseBase):
    """The response of a network served with tensorflow serving"""

    scores: list = Field(description="Raw output of the network")

PredictorResponse

Bases: NetResponseBase

A response from predictor

Source code in src/predictors/utils.py
class PredictorResponse(NetResponseBase):
    """A response from predictor"""

    scores: Union[list, dict] = Field(description="Postprocessed outputs of the network")

Call remotely served networks on given data and postprocess outputs

Predictor

Abstract class to call an endpoint with served network and return raw predictions

Attributes:

Name Type Description
predict_url str

url where the model is served

version int

version of the model. If specified network isn't served, one of the served ones will be picked instead

name str

name of the classifier. Will be used while logging

semaphore Semaphore

instance for asynchronous calls to the served network API

session ClientSession

session to make calls to the network's server

Source code in src/predictors/predictors.py
class Predictor:
    """
    Abstract class to call an endpoint with served network and return raw predictions

    Attributes
    -----------
    predict_url : str
        url where the model is served
    version : int
        version of the model. If specified network isn't served, one of the served ones will be
        picked instead
    name : str
        name of the classifier. Will be used while logging
    semaphore : asyncio.Semaphore
        instance for asynchronous calls to the served network API
    session : aiohttp.ClientSession
        session to make calls to the network's server
    """

    def __init__(self,
                 config: PredictorConfig,
                 semaphore: asyncio.Semaphore,
                 session: aiohttp.ClientSession) -> None:
        self.predict_url = config.predict_url
        self.raw_url = self.predict_url.rsplit("/versions/", 1)[0]
        self.version = config.version
        self.name = config.name
        self.semaphore = semaphore
        self.session = session

    @timing
    def info(self) -> dict:
        """
        Return basic information on deployed instance

        Returns
        -------
        out : dict
            information about deployed network
        """
        return {"name": self.name,
                "network": {"predict_url": self.predict_url,
                            "version": self.version}}

    def check_connection(self) -> Tuple[bool, str]:
        """
        Check connection to the tf serving endpoint

        Returns
        -------
        connected : bool
            whether connection to the servable is established
        message : str
            message returned by tf serving endpoint
        """
        connected = False
        message = "NO CONNECTION TO THE SERVABLE API"
        try:
            response = requests.get(url=self.raw_url, timeout=60)
        except Exception as exception:
            message = str(exception)
            return connected, message
        if response.ok:
            connected = True
            version_list = response.json()["model_version_status"]
            try:
                version_info = list(filter(lambda x: int(x["version"]) == self.version,
                                           version_list))[0]
            except IndexError:
                version_info = [item["version"] for item in version_list]
                message = f"No version {self.version} available for " \
                          f"{self.name}, but there are versions {version_info}"
                return False, message

            _, message = version_info["status"].values()

        return connected, message

    async def call_network(self, request_data: dict) -> TFResponseModel:
        """
        Call the served model and get predictions for given data

        Parameters
        ----------
        request_data : dict
            dictionary with "instances" as key and list of inputs as value

        Returns
        -------
        out : TFResponseModel
            a response containing scores, status_code, message and boolean status of the request
        """
        async with self.semaphore:
            async with self.session.post(url=self.predict_url,
                                         json=request_data,
                                         timeout=60) as response:
                message = None
                scores = [[]]
                data = await response.json()
                if response.ok:
                    scores = data["predictions"]
                else:
                    message = data["error"]
                status_code = response.status
                return TFResponseModel(scores=scores, message=message, status_code=status_code)

    @timing
    async def predict_batch(self, data: list) -> List[TFResponseModel]:
        """
        Make predictions on a batch of images one by one

        Parameters
        ----------
        data : list
            list of inputs to send to the final endpoint

        Returns
        -------
        out : List[TFResponseModel]
            list of predictions per image
        """
        tasks = [self.call_network(request_data={"instances": [image]}) for image in data]

        response_batch = await gather(*tasks)
        status_codes = [response.status_code for response in response_batch]
        if status.HTTP_200_OK not in status_codes:  # Note we DON'T except anything in 100-399 range except the 200 OK
            detail = [elem.dict(exclude_none=True) for elem in response_batch]
            status_code = max(status_codes)  # Prioritize server side error codes over client side ones
            raise ProcessingError(detail=detail,
                                  msg=f"Couldn't process the images with predictor: {self.name}",
                                  status_code=status_code)
        transformed_response = [TFResponseModel(scores=score_list, message=item.message,
                                                status_code=item.status_code)
                                for item in response_batch for score_list in item.scores]
        return transformed_response

    def transform_tf_response(self, tf_response: TFResponseModel, **kwargs) -> PredictorResponse:
        """
        Main method that takes raw predictions of a single sample as input and wraps them into a
        human-readable dictionary. Must be implemented by child classes

        Parameters
        ----------
        tf_response : TFResponseModel
            Response from the served tensorflow model

        Returns
        --------
        out : PredictorResponse
            processed predictions

        """
        raise NotImplementedError

    def transform_tf_responses(self,
                               response_batch: List[TFResponseModel],
                               **kwargs) -> List[PredictorResponse]:
        """
        Transform a batch of responses from tensorflow serving

        Parameters
        ----------
        response_batch : List[TFResponseModel]
            batch of responses gotten from call_network() method
        kwargs : List[dict] = None
            optional keyword arguments for create_single_prediction_dict method

        Returns
        -------
        out : List[PredictorResponse]
            prediction dicts for each processed instance
        """

        return [self.transform_tf_response(response, **kwargs)
                for response in response_batch]

    @timing
    async def predict(self, data: list, **kwargs) -> List[PredictorResponse]:
        """
        Infer the network on given data and create human-readable prediction dictionaries for
        each sample

        Parameters
        ----------
        data : list
            A list of inputs to send to the final endpoint

        Returns
        --------
        out : List[PredictorResponse]
            prediction dictionaries
        """
        response_batch = await self.predict_batch(data=data)
        return self.transform_tf_responses(response_batch, **kwargs)

info()

Return basic information on deployed instance

Returns:

Name Type Description
out dict

information about deployed network

Source code in src/predictors/predictors.py
@timing
def info(self) -> dict:
    """
    Return basic information on deployed instance

    Returns
    -------
    out : dict
        information about deployed network
    """
    return {"name": self.name,
            "network": {"predict_url": self.predict_url,
                        "version": self.version}}

check_connection()

Check connection to the tf serving endpoint

Returns:

Name Type Description
connected bool

whether connection to the servable is established

message str

message returned by tf serving endpoint

Source code in src/predictors/predictors.py
def check_connection(self) -> Tuple[bool, str]:
    """
    Check connection to the tf serving endpoint

    Returns
    -------
    connected : bool
        whether connection to the servable is established
    message : str
        message returned by tf serving endpoint
    """
    connected = False
    message = "NO CONNECTION TO THE SERVABLE API"
    try:
        response = requests.get(url=self.raw_url, timeout=60)
    except Exception as exception:
        message = str(exception)
        return connected, message
    if response.ok:
        connected = True
        version_list = response.json()["model_version_status"]
        try:
            version_info = list(filter(lambda x: int(x["version"]) == self.version,
                                       version_list))[0]
        except IndexError:
            version_info = [item["version"] for item in version_list]
            message = f"No version {self.version} available for " \
                      f"{self.name}, but there are versions {version_info}"
            return False, message

        _, message = version_info["status"].values()

    return connected, message

call_network(request_data) async

Call the served model and get predictions for given data

Parameters:

Name Type Description Default
request_data dict

dictionary with "instances" as key and list of inputs as value

required

Returns:

Name Type Description
out TFResponseModel

a response containing scores, status_code, message and boolean status of the request

Source code in src/predictors/predictors.py
async def call_network(self, request_data: dict) -> TFResponseModel:
    """
    Call the served model and get predictions for given data

    Parameters
    ----------
    request_data : dict
        dictionary with "instances" as key and list of inputs as value

    Returns
    -------
    out : TFResponseModel
        a response containing scores, status_code, message and boolean status of the request
    """
    async with self.semaphore:
        async with self.session.post(url=self.predict_url,
                                     json=request_data,
                                     timeout=60) as response:
            message = None
            scores = [[]]
            data = await response.json()
            if response.ok:
                scores = data["predictions"]
            else:
                message = data["error"]
            status_code = response.status
            return TFResponseModel(scores=scores, message=message, status_code=status_code)

predict_batch(data) async

Make predictions on a batch of images one by one

Parameters:

Name Type Description Default
data list

list of inputs to send to the final endpoint

required

Returns:

Name Type Description
out List[TFResponseModel]

list of predictions per image

Source code in src/predictors/predictors.py
@timing
async def predict_batch(self, data: list) -> List[TFResponseModel]:
    """
    Make predictions on a batch of images one by one

    Parameters
    ----------
    data : list
        list of inputs to send to the final endpoint

    Returns
    -------
    out : List[TFResponseModel]
        list of predictions per image
    """
    tasks = [self.call_network(request_data={"instances": [image]}) for image in data]

    response_batch = await gather(*tasks)
    status_codes = [response.status_code for response in response_batch]
    if status.HTTP_200_OK not in status_codes:  # Note we DON'T except anything in 100-399 range except the 200 OK
        detail = [elem.dict(exclude_none=True) for elem in response_batch]
        status_code = max(status_codes)  # Prioritize server side error codes over client side ones
        raise ProcessingError(detail=detail,
                              msg=f"Couldn't process the images with predictor: {self.name}",
                              status_code=status_code)
    transformed_response = [TFResponseModel(scores=score_list, message=item.message,
                                            status_code=item.status_code)
                            for item in response_batch for score_list in item.scores]
    return transformed_response

transform_tf_response(tf_response, **kwargs)

Main method that takes raw predictions of a single sample as input and wraps them into a human-readable dictionary. Must be implemented by child classes

Parameters:

Name Type Description Default
tf_response TFResponseModel

Response from the served tensorflow model

required

Returns:

Name Type Description
out PredictorResponse

processed predictions

Source code in src/predictors/predictors.py
def transform_tf_response(self, tf_response: TFResponseModel, **kwargs) -> PredictorResponse:
    """
    Main method that takes raw predictions of a single sample as input and wraps them into a
    human-readable dictionary. Must be implemented by child classes

    Parameters
    ----------
    tf_response : TFResponseModel
        Response from the served tensorflow model

    Returns
    --------
    out : PredictorResponse
        processed predictions

    """
    raise NotImplementedError

transform_tf_responses(response_batch, **kwargs)

Transform a batch of responses from tensorflow serving

Parameters:

Name Type Description Default
response_batch List[TFResponseModel]

batch of responses gotten from call_network() method

required
kwargs List[dict] = None

optional keyword arguments for create_single_prediction_dict method

{}

Returns:

Name Type Description
out List[PredictorResponse]

prediction dicts for each processed instance

Source code in src/predictors/predictors.py
def transform_tf_responses(self,
                           response_batch: List[TFResponseModel],
                           **kwargs) -> List[PredictorResponse]:
    """
    Transform a batch of responses from tensorflow serving

    Parameters
    ----------
    response_batch : List[TFResponseModel]
        batch of responses gotten from call_network() method
    kwargs : List[dict] = None
        optional keyword arguments for create_single_prediction_dict method

    Returns
    -------
    out : List[PredictorResponse]
        prediction dicts for each processed instance
    """

    return [self.transform_tf_response(response, **kwargs)
            for response in response_batch]

predict(data, **kwargs) async

Infer the network on given data and create human-readable prediction dictionaries for each sample

Parameters:

Name Type Description Default
data list

A list of inputs to send to the final endpoint

required

Returns:

Name Type Description
out List[PredictorResponse]

prediction dictionaries

Source code in src/predictors/predictors.py
@timing
async def predict(self, data: list, **kwargs) -> List[PredictorResponse]:
    """
    Infer the network on given data and create human-readable prediction dictionaries for
    each sample

    Parameters
    ----------
    data : list
        A list of inputs to send to the final endpoint

    Returns
    --------
    out : List[PredictorResponse]
        prediction dictionaries
    """
    response_batch = await self.predict_batch(data=data)
    return self.transform_tf_responses(response_batch, **kwargs)

Classifier

Bases: Predictor

Call an endpoint with served models and return postprocessed predictions

Attributes:

Name Type Description
output_names List[str]

names of the classes in the same order as in the served model

version int

version of the model. If specified network isn't served, one of the served ones will be picked instead

name str

name of the classifier. Will be used while logging

is_multilabel bool

whether the network's predictions should be interpreted independently (multilabel classification with sigmoid classification head) or not (multiclass classification with a softmax classification head)

thresholds Dict[str, float]

thresholds to determine whether the class is present or not for each class

semaphore Semaphore

instance for asynchronous calls to the served network API

session ClientSession

session to make calls to the network's server

Source code in src/predictors/predictors.py
class Classifier(Predictor):
    """
    Call an endpoint with served models and return postprocessed predictions

    Attributes
    -----------
    output_names : List[str]
        names of the classes in the same order as in the served model
    version : int
        version of the model. If specified network isn't served, one of the served ones will be
        picked instead
    name : str
        name of the classifier. Will be used while logging
    is_multilabel : bool
        whether the network's predictions should be interpreted independently (multilabel
        classification with sigmoid classification head) or not (multiclass classification with
        a softmax classification head)
    thresholds : Dict[str, float]
        thresholds to determine whether the class is present or not for each class
    semaphore : asyncio.Semaphore
        instance for asynchronous calls to the served network API
    session : aiohttp.ClientSession
        session to make calls to the network's server
    """

    def __init__(self, config: ClassifierConfig,
                 **kwargs) -> None:
        super().__init__(config, **kwargs)
        self.output_names = config.output_names
        self.all_classes = config.all_classes
        self.is_multilabel = config.is_multilabel
        if self.is_multilabel:
            self.thresholds = config.thresholds
        self.postprocessing_config = None
        if config.postprocessing_config is not None:
            self.postprocessing_config = config.postprocessing_config.model_dump(exclude_none=True)

    def get_thresholds(self,
                       custom_thresholds: Optional[Union[float, Dict[str, float]]]) -> np.ndarray:
        """
        Return thresholds for multilabel classification

        Parameters
        ----------
        custom_thresholds : Union[float, Dict[str, float]]
            custom thresholds for current network call. If float, it will be used for all the
            classes. If dict, custom thresholds will be used for specified classes,
            and the defaults for the rest

        Returns
        -------
        out : np.ndarray
            thresholds to use when filtering network predictions during current call
        """

        thresholds = self.thresholds.copy()
        if custom_thresholds is not None:
            if isinstance(custom_thresholds, dict):
                thresholds.update(custom_thresholds)
            elif isinstance(custom_thresholds, (int, float)):
                thresholds = dict(
                    zip(self.output_names, [custom_thresholds] * len(self.output_names)))
        # persist the original order of classes
        threshold_array = np.array([thresholds[cls_name] for cls_name in self.output_names])
        return threshold_array

    @staticmethod
    def remove_all_keys_except(score_dict: Dict[str, float], condition: str,
                               exceptions: List[str] = None) -> Dict[str, float]:
        """
        Remove all keys from the dictionary except for the specified ones if condition is
        present in keys

        Parameters
        ----------
        score_dict : Dict[str, float]
            the dictionary to process
        condition : str
            key to check. The dict will be processed only if condition is present in the dict
        exceptions : List[str]
            keys to ignore when processing the dictionary

        Returns
        -------
        out : Dict[str, float]
            processed dictionary
        """
        if exceptions is None:
            exceptions = []
        if condition not in score_dict:
            return score_dict
        exceptions.append(condition)
        return {exc: score_dict[exc] for exc in exceptions if exc in score_dict}

    @staticmethod
    def remove_keys_by_condition(score_dict: Dict[str, float], condition: str,
                                 to_remove: List[str]) -> Dict[str, float]:
        """Remove given keys from the dictionary if condition key is present

        Parameters
        ----------
        score_dict : Dict[str, float]
            dictionary to process
        condition : str
            key to check. Processing is done only if this key is present
        to_remove : List[str]
            keys to remove if condition key is present

        Returns
        --------
        out : Dict[str, float]
            processed dictionary
        """
        if to_remove and (condition in score_dict):
            [score_dict.pop(item, None) for item in to_remove]
        return score_dict

    @staticmethod
    def keep_only_by_conditions(score_dict: Dict[str, float], conditions: List[str],
                                to_filter: List[str]) -> Dict[str, float]:
        """
        Keep the elements in dictionary if any of conditions is present, else remove them

        score_dict: dict
            dictionary with names of classes and their probabilities
        conditions: List[str]
            conditions to check
        to_filter: List[str]
            elements to remove if none of the conditions is present
        """

        if to_filter and (all(condition not in score_dict for condition in conditions)):
            [score_dict.pop(item, None) for item in to_filter]
        return score_dict

    @staticmethod
    def rename_key_by_condition(score_dict: Dict[str, float],
                                to_rename: str,
                                new_name: str,
                                conditions: list[str]) -> Dict[str, float]:
        """
        Rename given key if condition is present in the dictionary

        Parameters
        ----------
        score_dict : Dict[str, float]
            dictionary to process
        conditions : list[str]
            key to check. Processing is done only if this key is present
        to_rename : str
            key to rename
        new_name : str
            new name for the renamed key

        Returns
        -------
        out : Dict[str, float]
            processed dictionary
        """
        if (to_rename in score_dict) and (any(condition in score_dict for condition in conditions)):
            score_dict[new_name] = score_dict.pop(to_rename)
        return score_dict

    def postprocess_score_dict(self, score_dict: Dict[str, float]) -> dict:
        """Postprocess the score dictionary with model predictions based on config"""

        for name, subconfig in self.postprocessing_config.items():
            preprocessing_fn = getattr(self, name)
            for single_config in subconfig:
                score_dict = preprocessing_fn(score_dict=score_dict, **single_config)

        return score_dict

    def transform_tf_response(self, tf_response: TFResponseModel,
                              custom_thresholds: Union[float, Dict[str, float]] = None,
                              filtered: bool = True) -> PredictorResponse:
        """
        Create a dictionary containing scores for each class and a list of classes that are present

        Parameters
        ----------
        tf_response : TFResponseModel
            a response from the model served via tf serving
        custom_thresholds : float, dict
            thresholds to use to extract tags. If none, default thresholds self.threshold defined
            when creating Classifier instance will be used.
            if a single float is sent, it will be used as threshold for all the classes.
            If it's a dict, custom thresholds will be used for specified classes,
            and the defaults for the rest
        filtered : bool
            whether to return all labels and scores or only the ones that are present(thresholded)

        Returns
        -------
        out : PredictorResponse
            predicted scores, status and message of each request
        """
        if not tf_response.status:
            return PredictorResponse(scores={}, message=tf_response.message,
                                     status_code=tf_response.status_code)
        if not filtered:  # return unfiltered
            return PredictorResponse(scores=dict(zip(self.output_names, tf_response.scores)),
                                     status_code=tf_response.status_code)
        if self.is_multilabel:
            thresholds = self.get_thresholds(custom_thresholds=custom_thresholds)
            indices = np.where(np.array(tf_response.scores) > thresholds)
        else:
            indices = [np.argmax(tf_response.scores)]
        thresholded = np.array(tf_response.scores)[indices].tolist()
        tags = np.array(self.output_names)[indices].tolist()
        score_dict = dict(zip(tags, thresholded))
        if self.postprocessing_config:
            score_dict = self.postprocess_score_dict(score_dict=score_dict)

        return PredictorResponse(scores=score_dict, status_code=tf_response.status_code)

    def info(self) -> dict:
        """
        Return information on network outputs

        Returns
        -------
        out : dict
            information on network's output classes & task type
        """
        info = super().info()
        net_specific_info = {"num_outputs": len(self.output_names),
                             "raw_output_names": self.output_names,
                             "is_multilabel": self.is_multilabel}
        info["network"].update(net_specific_info)
        info["outputs"] = {"num_classes": len(self.all_classes),
                           "all_classes": self.all_classes}
        if self.is_multilabel:
            info["outputs"]["default_thresholds"] = self.thresholds

        return info

get_thresholds(custom_thresholds)

Return thresholds for multilabel classification

Parameters:

Name Type Description Default
custom_thresholds Union[float, Dict[str, float]]

custom thresholds for current network call. If float, it will be used for all the classes. If dict, custom thresholds will be used for specified classes, and the defaults for the rest

required

Returns:

Name Type Description
out ndarray

thresholds to use when filtering network predictions during current call

Source code in src/predictors/predictors.py
def get_thresholds(self,
                   custom_thresholds: Optional[Union[float, Dict[str, float]]]) -> np.ndarray:
    """
    Return thresholds for multilabel classification

    Parameters
    ----------
    custom_thresholds : Union[float, Dict[str, float]]
        custom thresholds for current network call. If float, it will be used for all the
        classes. If dict, custom thresholds will be used for specified classes,
        and the defaults for the rest

    Returns
    -------
    out : np.ndarray
        thresholds to use when filtering network predictions during current call
    """

    thresholds = self.thresholds.copy()
    if custom_thresholds is not None:
        if isinstance(custom_thresholds, dict):
            thresholds.update(custom_thresholds)
        elif isinstance(custom_thresholds, (int, float)):
            thresholds = dict(
                zip(self.output_names, [custom_thresholds] * len(self.output_names)))
    # persist the original order of classes
    threshold_array = np.array([thresholds[cls_name] for cls_name in self.output_names])
    return threshold_array

remove_all_keys_except(score_dict, condition, exceptions=None) staticmethod

Remove all keys from the dictionary except for the specified ones if condition is present in keys

Parameters:

Name Type Description Default
score_dict Dict[str, float]

the dictionary to process

required
condition str

key to check. The dict will be processed only if condition is present in the dict

required
exceptions List[str]

keys to ignore when processing the dictionary

None

Returns:

Name Type Description
out Dict[str, float]

processed dictionary

Source code in src/predictors/predictors.py
@staticmethod
def remove_all_keys_except(score_dict: Dict[str, float], condition: str,
                           exceptions: List[str] = None) -> Dict[str, float]:
    """
    Remove all keys from the dictionary except for the specified ones if condition is
    present in keys

    Parameters
    ----------
    score_dict : Dict[str, float]
        the dictionary to process
    condition : str
        key to check. The dict will be processed only if condition is present in the dict
    exceptions : List[str]
        keys to ignore when processing the dictionary

    Returns
    -------
    out : Dict[str, float]
        processed dictionary
    """
    if exceptions is None:
        exceptions = []
    if condition not in score_dict:
        return score_dict
    exceptions.append(condition)
    return {exc: score_dict[exc] for exc in exceptions if exc in score_dict}

remove_keys_by_condition(score_dict, condition, to_remove) staticmethod

Remove given keys from the dictionary if condition key is present

Parameters:

Name Type Description Default
score_dict Dict[str, float]

dictionary to process

required
condition str

key to check. Processing is done only if this key is present

required
to_remove List[str]

keys to remove if condition key is present

required

Returns:

Name Type Description
out Dict[str, float]

processed dictionary

Source code in src/predictors/predictors.py
@staticmethod
def remove_keys_by_condition(score_dict: Dict[str, float], condition: str,
                             to_remove: List[str]) -> Dict[str, float]:
    """Remove given keys from the dictionary if condition key is present

    Parameters
    ----------
    score_dict : Dict[str, float]
        dictionary to process
    condition : str
        key to check. Processing is done only if this key is present
    to_remove : List[str]
        keys to remove if condition key is present

    Returns
    --------
    out : Dict[str, float]
        processed dictionary
    """
    if to_remove and (condition in score_dict):
        [score_dict.pop(item, None) for item in to_remove]
    return score_dict

keep_only_by_conditions(score_dict, conditions, to_filter) staticmethod

Keep the elements in dictionary if any of conditions is present, else remove them

score_dict: dict dictionary with names of classes and their probabilities conditions: List[str] conditions to check to_filter: List[str] elements to remove if none of the conditions is present

Source code in src/predictors/predictors.py
@staticmethod
def keep_only_by_conditions(score_dict: Dict[str, float], conditions: List[str],
                            to_filter: List[str]) -> Dict[str, float]:
    """
    Keep the elements in dictionary if any of conditions is present, else remove them

    score_dict: dict
        dictionary with names of classes and their probabilities
    conditions: List[str]
        conditions to check
    to_filter: List[str]
        elements to remove if none of the conditions is present
    """

    if to_filter and (all(condition not in score_dict for condition in conditions)):
        [score_dict.pop(item, None) for item in to_filter]
    return score_dict

rename_key_by_condition(score_dict, to_rename, new_name, conditions) staticmethod

Rename given key if condition is present in the dictionary

Parameters:

Name Type Description Default
score_dict Dict[str, float]

dictionary to process

required
conditions list[str]

key to check. Processing is done only if this key is present

required
to_rename str

key to rename

required
new_name str

new name for the renamed key

required

Returns:

Name Type Description
out Dict[str, float]

processed dictionary

Source code in src/predictors/predictors.py
@staticmethod
def rename_key_by_condition(score_dict: Dict[str, float],
                            to_rename: str,
                            new_name: str,
                            conditions: list[str]) -> Dict[str, float]:
    """
    Rename given key if condition is present in the dictionary

    Parameters
    ----------
    score_dict : Dict[str, float]
        dictionary to process
    conditions : list[str]
        key to check. Processing is done only if this key is present
    to_rename : str
        key to rename
    new_name : str
        new name for the renamed key

    Returns
    -------
    out : Dict[str, float]
        processed dictionary
    """
    if (to_rename in score_dict) and (any(condition in score_dict for condition in conditions)):
        score_dict[new_name] = score_dict.pop(to_rename)
    return score_dict

postprocess_score_dict(score_dict)

Postprocess the score dictionary with model predictions based on config

Source code in src/predictors/predictors.py
def postprocess_score_dict(self, score_dict: Dict[str, float]) -> dict:
    """Postprocess the score dictionary with model predictions based on config"""

    for name, subconfig in self.postprocessing_config.items():
        preprocessing_fn = getattr(self, name)
        for single_config in subconfig:
            score_dict = preprocessing_fn(score_dict=score_dict, **single_config)

    return score_dict

transform_tf_response(tf_response, custom_thresholds=None, filtered=True)

Create a dictionary containing scores for each class and a list of classes that are present

Parameters:

Name Type Description Default
tf_response TFResponseModel

a response from the model served via tf serving

required
custom_thresholds (float, dict)

thresholds to use to extract tags. If none, default thresholds self.threshold defined when creating Classifier instance will be used. if a single float is sent, it will be used as threshold for all the classes. If it's a dict, custom thresholds will be used for specified classes, and the defaults for the rest

None
filtered bool

whether to return all labels and scores or only the ones that are present(thresholded)

True

Returns:

Name Type Description
out PredictorResponse

predicted scores, status and message of each request

Source code in src/predictors/predictors.py
def transform_tf_response(self, tf_response: TFResponseModel,
                          custom_thresholds: Union[float, Dict[str, float]] = None,
                          filtered: bool = True) -> PredictorResponse:
    """
    Create a dictionary containing scores for each class and a list of classes that are present

    Parameters
    ----------
    tf_response : TFResponseModel
        a response from the model served via tf serving
    custom_thresholds : float, dict
        thresholds to use to extract tags. If none, default thresholds self.threshold defined
        when creating Classifier instance will be used.
        if a single float is sent, it will be used as threshold for all the classes.
        If it's a dict, custom thresholds will be used for specified classes,
        and the defaults for the rest
    filtered : bool
        whether to return all labels and scores or only the ones that are present(thresholded)

    Returns
    -------
    out : PredictorResponse
        predicted scores, status and message of each request
    """
    if not tf_response.status:
        return PredictorResponse(scores={}, message=tf_response.message,
                                 status_code=tf_response.status_code)
    if not filtered:  # return unfiltered
        return PredictorResponse(scores=dict(zip(self.output_names, tf_response.scores)),
                                 status_code=tf_response.status_code)
    if self.is_multilabel:
        thresholds = self.get_thresholds(custom_thresholds=custom_thresholds)
        indices = np.where(np.array(tf_response.scores) > thresholds)
    else:
        indices = [np.argmax(tf_response.scores)]
    thresholded = np.array(tf_response.scores)[indices].tolist()
    tags = np.array(self.output_names)[indices].tolist()
    score_dict = dict(zip(tags, thresholded))
    if self.postprocessing_config:
        score_dict = self.postprocess_score_dict(score_dict=score_dict)

    return PredictorResponse(scores=score_dict, status_code=tf_response.status_code)

info()

Return information on network outputs

Returns:

Name Type Description
out dict

information on network's output classes & task type

Source code in src/predictors/predictors.py
def info(self) -> dict:
    """
    Return information on network outputs

    Returns
    -------
    out : dict
        information on network's output classes & task type
    """
    info = super().info()
    net_specific_info = {"num_outputs": len(self.output_names),
                         "raw_output_names": self.output_names,
                         "is_multilabel": self.is_multilabel}
    info["network"].update(net_specific_info)
    info["outputs"] = {"num_classes": len(self.all_classes),
                       "all_classes": self.all_classes}
    if self.is_multilabel:
        info["outputs"]["default_thresholds"] = self.thresholds

    return info

Encoder

Bases: Predictor

Class to communicate with an encoder network. Unlike classifier, it has no concept of classes and returns just vectors representing the input

Source code in src/predictors/predictors.py
class Encoder(Predictor):
    """
    Class to communicate with an encoder network. Unlike classifier,  it has no concept of
    classes and returns just vectors representing the input
    """

    def transform_tf_response(self,
                              tf_response: TFResponseModel,
                              **kwargs) -> PredictorResponse:
        """
        Create a dictionary containing scores for each class and a list of classes that are present

        Parameters
        ----------
        tf_response : TFResponseModel
            the response of the network

        Returns
        -------
        out: dict
            predicted scores, status and message of each request
        """
        if not tf_response.status:
            return PredictorResponse(scores={}, message=tf_response.message,
                                     status_code=tf_response.status_code)
        return PredictorResponse(scores=tf_response.scores, message=tf_response.message,
                                 status_code=tf_response.status_code)

transform_tf_response(tf_response, **kwargs)

Create a dictionary containing scores for each class and a list of classes that are present

Parameters:

Name Type Description Default
tf_response TFResponseModel

the response of the network

required

Returns:

Name Type Description
out dict

predicted scores, status and message of each request

Source code in src/predictors/predictors.py
def transform_tf_response(self,
                          tf_response: TFResponseModel,
                          **kwargs) -> PredictorResponse:
    """
    Create a dictionary containing scores for each class and a list of classes that are present

    Parameters
    ----------
    tf_response : TFResponseModel
        the response of the network

    Returns
    -------
    out: dict
        predicted scores, status and message of each request
    """
    if not tf_response.status:
        return PredictorResponse(scores={}, message=tf_response.message,
                                 status_code=tf_response.status_code)
    return PredictorResponse(scores=tf_response.scores, message=tf_response.message,
                             status_code=tf_response.status_code)