Skip to content

Services


Main functionality


Main business logic implementation

Services

Main business logic implementation

Parameters:

Name Type Description Default
tagger_net Classifier

tagger model predictor

tagger

exterior_styles_net : Classifier exterior style model predictor

encoder_net : Encoder the encoder model predictor

message_broker_client : AsyncRabbitClient message broker client used to publish embeddings

Source code in src/services.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
class Services:
    """
    Main business logic implementation

    Parameters
    ----------
    tagger_net : Classifier
        tagger model predictor

    exterior_styles_net : Classifier
        exterior style model predictor

    encoder_net : Encoder
        the encoder model predictor

    message_broker_client : AsyncRabbitClient
        message broker client used to publish embeddings
    """

    def __init__(
            self,
            tagger_net: Classifier = tagger,
            exterior_styles_net: Classifier = exterior_styles_classifier,
            encoder_net: Encoder = encoder,
            message_broker_client: AsyncRabbitClient = rabbit_client
    ):

        self.tagger = tagger_net
        self.exterior_styles = exterior_styles_net
        self.encoder = encoder_net
        self.message_broker_client = message_broker_client
        self.nets = [self.tagger, self.exterior_styles, self.encoder]
        self.dependencies = self.nets + [self.message_broker_client]

    async def health_check(self) -> Tuple[bool, List[Dependence]]:
        """
        Checks whether all dependencies are connected and ready to work
        """
        dependencies: list[Dependence] = []
        for net in self.nets:
            connected, message = net.check_connection()
            dependencies.append(Dependence(connected=connected, message=message, name=net.name,
                                           version=getattr(net, "version", None)))
        connected, message = await self.message_broker_client.async_check_connection()
        dependencies.append(Dependence(connected=connected, message=message, name=self.message_broker_client.name,
                                       version=getattr(self.message_broker_client, "version", None)))
        healthy = all(dependence.connected for dependence in dependencies)

        return healthy, dependencies

    def info(self) -> List[dict]:
        """
        Return information on each network by calling corresponding methods
        """

        return [net.info() for net in self.nets]

    async def generate_pred_dict(
            self, network: Predictor,
            ids: List,
            instances: List,
            ids_as_keys: bool = False,
            **kwargs
    ) -> Union[List[dict], dict]:
        """
        Make predictions with a network for multiple instances and return dictionaries with ids
        and predictions

        Parameters
        ----------
        network : api.predictors.Predictor
            network to make predictions with
        ids : List
            ids of inputs
        instances : List
            inputs to the network
        ids_as_keys : bool
            whether to use ids as keys in the returned dict or include them as a value of "id" field
        **kwargs :
            additional arguments for .predict method of the network

        Returns
        -------
        out : List of dict, dict
            predictions per image with id and status
        """
        # TODO refactor predictors so the preprocessing is done there instead of services
        valid_image_ids, valid_images, invalid_image_ids = process_image_formats(ids, instances)
        error_message = "Unknown b64 image file format. Supported formats: https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html "
        if not valid_images:
            raise Base64ValidationError(detail=[{"invalid_ids": invalid_image_ids}],
                                        msg=error_message,
                                        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)

        valid_responses = await network.predict(valid_images, **kwargs)
        invalid_responses = [PredictorResponse(status_code=422,
                                               message=error_message,
                                               scores=[])] * len(invalid_image_ids)

        raw_responses = valid_responses + invalid_responses
        ids = valid_image_ids + invalid_image_ids
        response = [item.dict() for item in raw_responses]  # TODO make use of the PredictorResponse
        if ids_as_keys:
            return {id_: {"results": {network.name: response[i]}}
                    for i, id_ in enumerate(ids)}

        results_per_image = [{"results": {network.name: pred}}
                             for pred in response]
        predictions = add_ids_to_dicts(ids=ids, dicts=results_per_image)

        return self.postprocess_prediction_dicts(predictions)

    @timing
    async def predict_tags(
            self,
            instances: list,
            ids: list,
            custom_thresholds: Optional[Union[float, list, dict]]
    ) -> List[dict]:
        """
        Call the Tagger network wih given inputs and get corresponding predictions. For
        more information on parameters, refer to Services.generate_pred_dict()
        """
        return await self.generate_pred_dict(network=self.tagger,
                                             ids=ids,
                                             instances=instances,
                                             custom_thresholds=custom_thresholds)

    @timing
    async def predict_exterior_styles(
            self,
            ids: list,
            instances: list
    ) -> List[dict]:
        """
        Call the ExteriorStyles network wih given inputs and get corresponding predictions. For
        more information on parameters, refer to Services.generate_pred_dict()
        """
        return await self.generate_pred_dict(network=self.exterior_styles,
                                             ids=ids,
                                             instances=instances)

    @timing
    async def encode(
            self,
            ids: list,
            instances: list,
    ) -> List[dict]:
        """
        Encode given images with the encoder network. For more information on parameters,
        refer to Services.generate_pred_dict()
        """
        return await self.generate_pred_dict(network=self.encoder,
                                             ids=ids,
                                             instances=instances)

    @timing
    async def classify_dht(
            self,
            instances: list,
            ids: list,
            custom_thresholds: Optional[Union[float, list, dict]]
    ) -> Tuple:
        """
        Call Tagger network, then call the Exterior Styles for the samples with "facade"
        predicted tag. Return all tags in a single dict for each sample for each network
        """
        dht_preds = await self.generate_pred_dict(network=self.tagger,
                                                  ids=ids,
                                                  instances=instances,
                                                  custom_thresholds=custom_thresholds,
                                                  ids_as_keys=True)
        facade_ids = [k for k, v in dht_preds.items()
                      if "facade" in v["results"][self.tagger.name]["scores"]]
        facade_instances = [instance for id_, instance in zip(ids, instances) if id_ in facade_ids]
        style_preds = None
        if facade_instances:
            style_preds = await self.generate_pred_dict(network=self.exterior_styles,
                                                        ids=facade_ids,
                                                        instances=facade_instances,
                                                        filtered=False,
                                                        ids_as_keys=True)
            for id_ in facade_ids:
                dht_preds[id_]["results"][self.exterior_styles.name] = style_preds[id_]["results"][
                    self.exterior_styles.name].copy()
                dht_preds[id_]["results"][self.exterior_styles.name]["scores"] = argmax_dict(
                    style_preds[id_]["results"][self.exterior_styles.name]["scores"])

        return dht_preds, style_preds

    @timing
    async def process_house_images(
            self,
            ids: list,
            instances: list,
            custom_thresholds: Optional[Union[float, list, dict]]
    ) -> Dict:
        """
        Process all images of a single house via Image Processing Networks

        Returns
        -------
        out : dict
            predictions for given house. Contains tagger's tags per image, information on present
            features and averaged scores for exterior styles
        """
        dht_preds, style_preds = await self.classify_dht(ids=ids,
                                                         instances=instances,
                                                         custom_thresholds=custom_thresholds)
        per_image = add_ids_to_dicts(ids=dht_preds.keys(), dicts=list(dht_preds.values()))
        avg_exterior_style = self.get_average_style(style_preds)
        house_status = any(d["results"][self.tagger.name]["status"] for d in dht_preds.values())
        present_features = [tag for im in dht_preds.values()
                            for tag in im["results"][self.tagger.name]["scores"]]
        unique_features = sorted(set(present_features))
        tagged = list(filter(lambda x: x["results"][self.tagger.name]["status"], per_image))
        house_info = {"exterior_styles": avg_exterior_style,
                      "prob_ranking": self.rank_images_by_class(tagged, classes=unique_features)}

        predictions = {"results_per_image": self.postprocess_prediction_dicts(per_image),
                       "house_info": house_info,
                       "status": house_status}
        return predictions

    def get_average_style(self, style_predictions_per_image: dict) -> List[dict]:
        """
        Calculate average style of the house given predictions of the ExteriorStyle Classifier

        Parameters
        ----------
        style_predictions_per_image : dict
            per image predictions for exterior styles

        Returns
        -------
        out : List of dict
            average score for each possible class
        """
        avg_exterior_style = []
        if not style_predictions_per_image:
            return avg_exterior_style

        successful = list(filter(
            lambda x: x["results"][self.exterior_styles.name]["status"],
            style_predictions_per_image.values()))
        num_facades = len(successful)
        if num_facades:
            avg_exterior_style = [{"name": k,
                                   "probability": sum(
                                       i["results"][f"{self.exterior_styles.name}"]["scores"][k]
                                       for i in successful) / num_facades} for k in
                                  self.exterior_styles.output_names]

        return avg_exterior_style

    def rank_images_by_class(
            self,
            tagger_dicts: List[dict],
            classes: Iterable[str]
    ) -> List[dict]:
        """
        Rank images by Tagger's confidence that given scene exists there

        Parameters
        ----------
        tagger_dicts : List[dict]
            valid predictions per_image
        classes : Iterable[str]
            scenes to rank

        Returns
        -------
        out : List of Dict
             Scenes and list of images containing them sorted by probability (highest first)
        """

        if not classes:
            return []
        raw_scores = {cls: {d["id"]: d["results"][self.tagger.name]["scores"].get(cls, 0)
                            for d in tagger_dicts}
                      for cls in classes}
        sorted_tuples = {cls: sorted(raw_scores[cls].items(), key=lambda x: x[1],
                                     reverse=True) for cls in classes}
        ranking = [{"name": cls,
                    "ordering": [k for k, v in sorted_tuples[cls] if v > 0]}
                   for cls in classes]

        return ranking

    @staticmethod
    def postprocess_prediction_dicts(results_per_image: List[dict]) -> List[dict]:
        """
        Postprocess network prediction dictionaries

        Parameters
        ----------
        results_per_image: List
            prediction dictionaries containing scores in format {"feature" : probability}

        Returns
        -------
        out : List
            dictionaries containing scores in format {"name" : feature, "probability" : probability}
        """
        processed = results_per_image.copy()
        for result in processed:
            for network_results in result["results"].values():
                scores_per_class = network_results["scores"]
                if isinstance(scores_per_class, dict):  # in case of encoder & similar nets
                    network_results["scores"] = [{"name": cls_name,
                                                  "probability": probability}
                                                 for cls_name, probability in
                                                 scores_per_class.items()]

        return processed

    @timing
    async def prepare_embeddings(
            self,
            instance_ids: list,
            instances: list
    ) -> list[PhotoEmbedding]:
        """
        Run the input data through Encoder model, filter and prepare the output for publishing

        Parameters
        ----------
        instance_ids : List
            identifiers of images
        instances : List
            images to embed

        Returns
        -------
        out : list[PhotoEmbedding]
            Vector embeddings and their ids
        """
        # Get valid instance ids and instances
        instance_ids, instances, invalid_ids = process_image_formats(instance_ids, instances)
        photo_embeddings: list[PhotoEmbedding] = []
        if not instance_ids:
            return photo_embeddings
        encoder_output = await self.encoder.predict_batch(data=instances)

        # Filter out the successful ones
        for photo_id, response in zip(instance_ids, encoder_output):
            if not response.status:
                continue
            photo_embeddings.append(PhotoEmbedding(photo_id=photo_id, embedding=response.scores))

        return photo_embeddings

    @timing
    async def publish_embeddings(self,
                                 core_listing_id: Any,
                                 instance_ids: list,
                                 instances: list,
                                 compression: CompressionMethod) -> PublishResults:
        """
        Calculate embeddings and publish them to message broker

        Parameters
        ----------
        core_listing_id : Any
            identifier of a house
        instance_ids : List
            identifiers of images
        instances : List
            images to embed
        compression: CompressionMethod = CompressionMethod.UNCOMPRESSED
            Compression method to use. Defaults to None

        Returns
        -------
        out : PublishResults
            Results of the publishing
        """
        embeddings = await self.prepare_embeddings(instance_ids=instance_ids,
                                                   instances=instances)
        if not embeddings:
            return PublishResults(published=False, error="No embeddings to publish. There might be problems with the "
                                                         "encoder models or the input images.")
        payload = ListingEmbeddings(core_listing_id=core_listing_id, embeddings=embeddings)

        return await self.message_broker_client.publish(payload=payload.dict(), compression=compression)

    @staticmethod
    async def trim_image(image: str, norm_thresh: float, initial_thresh: float) -> str:
        """
        Converts base64 decoded image into np.array and trim input image

        Parameters
        ----------

        image : str
            input image (base64 image)
        norm_thresh : float
            norm threshold to consider vectors the same
        initial_thresh : float
            identification thresh

        Returns
        -------
        result : str
            output image (base64 encoded )

        """
        image = b64_to_pil(image_b64=image)
        image_array = np.array(image)
        trimmed_image = trim_image_with_thresh(img=image_array, norm_thresh=norm_thresh,
                                               initial_thresh=initial_thresh)
        trimmed_image = Image.fromarray(trimmed_image)
        trimmed_image_str = pil_to_b64(image=trimmed_image)
        return trimmed_image_str

health_check() async

Checks whether all dependencies are connected and ready to work

Source code in src/services.py
async def health_check(self) -> Tuple[bool, List[Dependence]]:
    """
    Checks whether all dependencies are connected and ready to work
    """
    dependencies: list[Dependence] = []
    for net in self.nets:
        connected, message = net.check_connection()
        dependencies.append(Dependence(connected=connected, message=message, name=net.name,
                                       version=getattr(net, "version", None)))
    connected, message = await self.message_broker_client.async_check_connection()
    dependencies.append(Dependence(connected=connected, message=message, name=self.message_broker_client.name,
                                   version=getattr(self.message_broker_client, "version", None)))
    healthy = all(dependence.connected for dependence in dependencies)

    return healthy, dependencies

info()

Return information on each network by calling corresponding methods

Source code in src/services.py
def info(self) -> List[dict]:
    """
    Return information on each network by calling corresponding methods
    """

    return [net.info() for net in self.nets]

generate_pred_dict(network, ids, instances, ids_as_keys=False, **kwargs) async

Make predictions with a network for multiple instances and return dictionaries with ids and predictions

Parameters:

Name Type Description Default
network Predictor

network to make predictions with

required
ids List

ids of inputs

required
instances List

inputs to the network

required
ids_as_keys bool

whether to use ids as keys in the returned dict or include them as a value of "id" field

False
**kwargs

additional arguments for .predict method of the network

{}

Returns:

Name Type Description
out List of dict, dict

predictions per image with id and status

Source code in src/services.py
async def generate_pred_dict(
        self, network: Predictor,
        ids: List,
        instances: List,
        ids_as_keys: bool = False,
        **kwargs
) -> Union[List[dict], dict]:
    """
    Make predictions with a network for multiple instances and return dictionaries with ids
    and predictions

    Parameters
    ----------
    network : api.predictors.Predictor
        network to make predictions with
    ids : List
        ids of inputs
    instances : List
        inputs to the network
    ids_as_keys : bool
        whether to use ids as keys in the returned dict or include them as a value of "id" field
    **kwargs :
        additional arguments for .predict method of the network

    Returns
    -------
    out : List of dict, dict
        predictions per image with id and status
    """
    # TODO refactor predictors so the preprocessing is done there instead of services
    valid_image_ids, valid_images, invalid_image_ids = process_image_formats(ids, instances)
    error_message = "Unknown b64 image file format. Supported formats: https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html "
    if not valid_images:
        raise Base64ValidationError(detail=[{"invalid_ids": invalid_image_ids}],
                                    msg=error_message,
                                    status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)

    valid_responses = await network.predict(valid_images, **kwargs)
    invalid_responses = [PredictorResponse(status_code=422,
                                           message=error_message,
                                           scores=[])] * len(invalid_image_ids)

    raw_responses = valid_responses + invalid_responses
    ids = valid_image_ids + invalid_image_ids
    response = [item.dict() for item in raw_responses]  # TODO make use of the PredictorResponse
    if ids_as_keys:
        return {id_: {"results": {network.name: response[i]}}
                for i, id_ in enumerate(ids)}

    results_per_image = [{"results": {network.name: pred}}
                         for pred in response]
    predictions = add_ids_to_dicts(ids=ids, dicts=results_per_image)

    return self.postprocess_prediction_dicts(predictions)

predict_tags(instances, ids, custom_thresholds) async

Call the Tagger network wih given inputs and get corresponding predictions. For more information on parameters, refer to Services.generate_pred_dict()

Source code in src/services.py
@timing
async def predict_tags(
        self,
        instances: list,
        ids: list,
        custom_thresholds: Optional[Union[float, list, dict]]
) -> List[dict]:
    """
    Call the Tagger network wih given inputs and get corresponding predictions. For
    more information on parameters, refer to Services.generate_pred_dict()
    """
    return await self.generate_pred_dict(network=self.tagger,
                                         ids=ids,
                                         instances=instances,
                                         custom_thresholds=custom_thresholds)

predict_exterior_styles(ids, instances) async

Call the ExteriorStyles network wih given inputs and get corresponding predictions. For more information on parameters, refer to Services.generate_pred_dict()

Source code in src/services.py
@timing
async def predict_exterior_styles(
        self,
        ids: list,
        instances: list
) -> List[dict]:
    """
    Call the ExteriorStyles network wih given inputs and get corresponding predictions. For
    more information on parameters, refer to Services.generate_pred_dict()
    """
    return await self.generate_pred_dict(network=self.exterior_styles,
                                         ids=ids,
                                         instances=instances)

encode(ids, instances) async

Encode given images with the encoder network. For more information on parameters, refer to Services.generate_pred_dict()

Source code in src/services.py
@timing
async def encode(
        self,
        ids: list,
        instances: list,
) -> List[dict]:
    """
    Encode given images with the encoder network. For more information on parameters,
    refer to Services.generate_pred_dict()
    """
    return await self.generate_pred_dict(network=self.encoder,
                                         ids=ids,
                                         instances=instances)

classify_dht(instances, ids, custom_thresholds) async

Call Tagger network, then call the Exterior Styles for the samples with "facade" predicted tag. Return all tags in a single dict for each sample for each network

Source code in src/services.py
@timing
async def classify_dht(
        self,
        instances: list,
        ids: list,
        custom_thresholds: Optional[Union[float, list, dict]]
) -> Tuple:
    """
    Call Tagger network, then call the Exterior Styles for the samples with "facade"
    predicted tag. Return all tags in a single dict for each sample for each network
    """
    dht_preds = await self.generate_pred_dict(network=self.tagger,
                                              ids=ids,
                                              instances=instances,
                                              custom_thresholds=custom_thresholds,
                                              ids_as_keys=True)
    facade_ids = [k for k, v in dht_preds.items()
                  if "facade" in v["results"][self.tagger.name]["scores"]]
    facade_instances = [instance for id_, instance in zip(ids, instances) if id_ in facade_ids]
    style_preds = None
    if facade_instances:
        style_preds = await self.generate_pred_dict(network=self.exterior_styles,
                                                    ids=facade_ids,
                                                    instances=facade_instances,
                                                    filtered=False,
                                                    ids_as_keys=True)
        for id_ in facade_ids:
            dht_preds[id_]["results"][self.exterior_styles.name] = style_preds[id_]["results"][
                self.exterior_styles.name].copy()
            dht_preds[id_]["results"][self.exterior_styles.name]["scores"] = argmax_dict(
                style_preds[id_]["results"][self.exterior_styles.name]["scores"])

    return dht_preds, style_preds

process_house_images(ids, instances, custom_thresholds) async

Process all images of a single house via Image Processing Networks

Returns:

Name Type Description
out dict

predictions for given house. Contains tagger's tags per image, information on present features and averaged scores for exterior styles

Source code in src/services.py
@timing
async def process_house_images(
        self,
        ids: list,
        instances: list,
        custom_thresholds: Optional[Union[float, list, dict]]
) -> Dict:
    """
    Process all images of a single house via Image Processing Networks

    Returns
    -------
    out : dict
        predictions for given house. Contains tagger's tags per image, information on present
        features and averaged scores for exterior styles
    """
    dht_preds, style_preds = await self.classify_dht(ids=ids,
                                                     instances=instances,
                                                     custom_thresholds=custom_thresholds)
    per_image = add_ids_to_dicts(ids=dht_preds.keys(), dicts=list(dht_preds.values()))
    avg_exterior_style = self.get_average_style(style_preds)
    house_status = any(d["results"][self.tagger.name]["status"] for d in dht_preds.values())
    present_features = [tag for im in dht_preds.values()
                        for tag in im["results"][self.tagger.name]["scores"]]
    unique_features = sorted(set(present_features))
    tagged = list(filter(lambda x: x["results"][self.tagger.name]["status"], per_image))
    house_info = {"exterior_styles": avg_exterior_style,
                  "prob_ranking": self.rank_images_by_class(tagged, classes=unique_features)}

    predictions = {"results_per_image": self.postprocess_prediction_dicts(per_image),
                   "house_info": house_info,
                   "status": house_status}
    return predictions

get_average_style(style_predictions_per_image)

Calculate average style of the house given predictions of the ExteriorStyle Classifier

Parameters:

Name Type Description Default
style_predictions_per_image dict

per image predictions for exterior styles

required

Returns:

Name Type Description
out List of dict

average score for each possible class

Source code in src/services.py
def get_average_style(self, style_predictions_per_image: dict) -> List[dict]:
    """
    Calculate average style of the house given predictions of the ExteriorStyle Classifier

    Parameters
    ----------
    style_predictions_per_image : dict
        per image predictions for exterior styles

    Returns
    -------
    out : List of dict
        average score for each possible class
    """
    avg_exterior_style = []
    if not style_predictions_per_image:
        return avg_exterior_style

    successful = list(filter(
        lambda x: x["results"][self.exterior_styles.name]["status"],
        style_predictions_per_image.values()))
    num_facades = len(successful)
    if num_facades:
        avg_exterior_style = [{"name": k,
                               "probability": sum(
                                   i["results"][f"{self.exterior_styles.name}"]["scores"][k]
                                   for i in successful) / num_facades} for k in
                              self.exterior_styles.output_names]

    return avg_exterior_style

rank_images_by_class(tagger_dicts, classes)

Rank images by Tagger's confidence that given scene exists there

Parameters:

Name Type Description Default
tagger_dicts List[dict]

valid predictions per_image

required
classes Iterable[str]

scenes to rank

required

Returns:

Name Type Description
out List of Dict

Scenes and list of images containing them sorted by probability (highest first)

Source code in src/services.py
def rank_images_by_class(
        self,
        tagger_dicts: List[dict],
        classes: Iterable[str]
) -> List[dict]:
    """
    Rank images by Tagger's confidence that given scene exists there

    Parameters
    ----------
    tagger_dicts : List[dict]
        valid predictions per_image
    classes : Iterable[str]
        scenes to rank

    Returns
    -------
    out : List of Dict
         Scenes and list of images containing them sorted by probability (highest first)
    """

    if not classes:
        return []
    raw_scores = {cls: {d["id"]: d["results"][self.tagger.name]["scores"].get(cls, 0)
                        for d in tagger_dicts}
                  for cls in classes}
    sorted_tuples = {cls: sorted(raw_scores[cls].items(), key=lambda x: x[1],
                                 reverse=True) for cls in classes}
    ranking = [{"name": cls,
                "ordering": [k for k, v in sorted_tuples[cls] if v > 0]}
               for cls in classes]

    return ranking

postprocess_prediction_dicts(results_per_image) staticmethod

Postprocess network prediction dictionaries

Parameters:

Name Type Description Default
results_per_image List[dict]

prediction dictionaries containing scores in format {"feature" : probability}

required

Returns:

Name Type Description
out List

dictionaries containing scores in format {"name" : feature, "probability" : probability}

Source code in src/services.py
@staticmethod
def postprocess_prediction_dicts(results_per_image: List[dict]) -> List[dict]:
    """
    Postprocess network prediction dictionaries

    Parameters
    ----------
    results_per_image: List
        prediction dictionaries containing scores in format {"feature" : probability}

    Returns
    -------
    out : List
        dictionaries containing scores in format {"name" : feature, "probability" : probability}
    """
    processed = results_per_image.copy()
    for result in processed:
        for network_results in result["results"].values():
            scores_per_class = network_results["scores"]
            if isinstance(scores_per_class, dict):  # in case of encoder & similar nets
                network_results["scores"] = [{"name": cls_name,
                                              "probability": probability}
                                             for cls_name, probability in
                                             scores_per_class.items()]

    return processed

prepare_embeddings(instance_ids, instances) async

Run the input data through Encoder model, filter and prepare the output for publishing

Parameters:

Name Type Description Default
instance_ids List

identifiers of images

required
instances List

images to embed

required

Returns:

Name Type Description
out list[PhotoEmbedding]

Vector embeddings and their ids

Source code in src/services.py
@timing
async def prepare_embeddings(
        self,
        instance_ids: list,
        instances: list
) -> list[PhotoEmbedding]:
    """
    Run the input data through Encoder model, filter and prepare the output for publishing

    Parameters
    ----------
    instance_ids : List
        identifiers of images
    instances : List
        images to embed

    Returns
    -------
    out : list[PhotoEmbedding]
        Vector embeddings and their ids
    """
    # Get valid instance ids and instances
    instance_ids, instances, invalid_ids = process_image_formats(instance_ids, instances)
    photo_embeddings: list[PhotoEmbedding] = []
    if not instance_ids:
        return photo_embeddings
    encoder_output = await self.encoder.predict_batch(data=instances)

    # Filter out the successful ones
    for photo_id, response in zip(instance_ids, encoder_output):
        if not response.status:
            continue
        photo_embeddings.append(PhotoEmbedding(photo_id=photo_id, embedding=response.scores))

    return photo_embeddings

publish_embeddings(core_listing_id, instance_ids, instances, compression) async

Calculate embeddings and publish them to message broker

Parameters:

Name Type Description Default
core_listing_id Any

identifier of a house

required
instance_ids List

identifiers of images

required
instances List

images to embed

required
compression CompressionMethod

Compression method to use. Defaults to None

required

Returns:

Name Type Description
out PublishResults

Results of the publishing

Source code in src/services.py
@timing
async def publish_embeddings(self,
                             core_listing_id: Any,
                             instance_ids: list,
                             instances: list,
                             compression: CompressionMethod) -> PublishResults:
    """
    Calculate embeddings and publish them to message broker

    Parameters
    ----------
    core_listing_id : Any
        identifier of a house
    instance_ids : List
        identifiers of images
    instances : List
        images to embed
    compression: CompressionMethod = CompressionMethod.UNCOMPRESSED
        Compression method to use. Defaults to None

    Returns
    -------
    out : PublishResults
        Results of the publishing
    """
    embeddings = await self.prepare_embeddings(instance_ids=instance_ids,
                                               instances=instances)
    if not embeddings:
        return PublishResults(published=False, error="No embeddings to publish. There might be problems with the "
                                                     "encoder models or the input images.")
    payload = ListingEmbeddings(core_listing_id=core_listing_id, embeddings=embeddings)

    return await self.message_broker_client.publish(payload=payload.dict(), compression=compression)

trim_image(image, norm_thresh, initial_thresh) async staticmethod

Converts base64 decoded image into np.array and trim input image

Parameters:

Name Type Description Default
image str

input image (base64 image)

required
norm_thresh float

norm threshold to consider vectors the same

required
initial_thresh float

identification thresh

required

Returns:

Name Type Description
result str

output image (base64 encoded )

Source code in src/services.py
@staticmethod
async def trim_image(image: str, norm_thresh: float, initial_thresh: float) -> str:
    """
    Converts base64 decoded image into np.array and trim input image

    Parameters
    ----------

    image : str
        input image (base64 image)
    norm_thresh : float
        norm threshold to consider vectors the same
    initial_thresh : float
        identification thresh

    Returns
    -------
    result : str
        output image (base64 encoded )

    """
    image = b64_to_pil(image_b64=image)
    image_array = np.array(image)
    trimmed_image = trim_image_with_thresh(img=image_array, norm_thresh=norm_thresh,
                                           initial_thresh=initial_thresh)
    trimmed_image = Image.fromarray(trimmed_image)
    trimmed_image_str = pil_to_b64(image=trimmed_image)
    return trimmed_image_str

Middlewares for pre- and post- processing of requests/responses

LoggingMiddleware

Bases: BaseHTTPMiddleware

Basic logging middleware inherited from starlette.BaseHTTPMiddleware

Source code in src/app/middlewares.py
class LoggingMiddleware(BaseHTTPMiddleware):
    """
    Basic logging middleware inherited from starlette.BaseHTTPMiddleware
    """

    def __init__(self, app, logger):
        """
        Init object with src and logger

        Parameters
        ----------
        app : fast.FastAPI
            application object where middleware need to be added
        logger : logging.loger
            already configured logger for logging requests
        """
        super().__init__(app)
        self.logger = logger

    async def dispatch(self, request: Request, call_next):
        """
        Overriding BaseHTTPMiddleware.dispatch method to implement logging logic

        Parameters
        ----------
        request : starlette.middleware.base.Request
            current request
        call_next : starlette.middleware.base.RequestResponseEndpoint
            call function

        Returns
        -------
        streaming_response : starlette.middleware.base.StreamingResponse
            streaming response for the endpoint
        """
        guid = generate_guid()
        self.logger.info(f"rid={guid} start request path={request.url.path}")
        start_time = time()

        streaming_response = await call_next(request)
        status_code = streaming_response.status_code
        process_time = (time() - start_time) * 1000
        formatted_process_time = '{0:.2f}'.format(process_time)

        # collect errors and log also error messages
        if status_code != status.HTTP_200_OK:
            response_body = [
                section async for section in streaming_response.body_iterator
            ]
            streaming_response.body_iterator = iterate_in_threadpool(
                iter(response_body)
            )
            msg = response_body[0].decode()
            self.logger.info(
                f"rid={guid} completed_in={formatted_process_time}ms"
                f" status_code={status_code}"
                f" response = {response_body},"
                f" message = {msg}"
            )
        # for 200 OK requests logging only event
        else:
            self.logger.info(
                f"rid={guid} completed_in={formatted_process_time}ms status_code={status_code}"
            )

        return streaming_response

__init__(app, logger)

Init object with src and logger

Parameters:

Name Type Description Default
app FastAPI

application object where middleware need to be added

required
logger loger

already configured logger for logging requests

required
Source code in src/app/middlewares.py
def __init__(self, app, logger):
    """
    Init object with src and logger

    Parameters
    ----------
    app : fast.FastAPI
        application object where middleware need to be added
    logger : logging.loger
        already configured logger for logging requests
    """
    super().__init__(app)
    self.logger = logger

dispatch(request, call_next) async

Overriding BaseHTTPMiddleware.dispatch method to implement logging logic

Parameters:

Name Type Description Default
request Request

current request

required
call_next RequestResponseEndpoint

call function

required

Returns:

Name Type Description
streaming_response StreamingResponse

streaming response for the endpoint

Source code in src/app/middlewares.py
async def dispatch(self, request: Request, call_next):
    """
    Overriding BaseHTTPMiddleware.dispatch method to implement logging logic

    Parameters
    ----------
    request : starlette.middleware.base.Request
        current request
    call_next : starlette.middleware.base.RequestResponseEndpoint
        call function

    Returns
    -------
    streaming_response : starlette.middleware.base.StreamingResponse
        streaming response for the endpoint
    """
    guid = generate_guid()
    self.logger.info(f"rid={guid} start request path={request.url.path}")
    start_time = time()

    streaming_response = await call_next(request)
    status_code = streaming_response.status_code
    process_time = (time() - start_time) * 1000
    formatted_process_time = '{0:.2f}'.format(process_time)

    # collect errors and log also error messages
    if status_code != status.HTTP_200_OK:
        response_body = [
            section async for section in streaming_response.body_iterator
        ]
        streaming_response.body_iterator = iterate_in_threadpool(
            iter(response_body)
        )
        msg = response_body[0].decode()
        self.logger.info(
            f"rid={guid} completed_in={formatted_process_time}ms"
            f" status_code={status_code}"
            f" response = {response_body},"
            f" message = {msg}"
        )
    # for 200 OK requests logging only event
    else:
        self.logger.info(
            f"rid={guid} completed_in={formatted_process_time}ms status_code={status_code}"
        )

    return streaming_response

ExceptionHandlerMiddleware

Bases: BaseHTTPMiddleware

A middleware to handle errors

Source code in src/app/middlewares.py
class ExceptionHandlerMiddleware(BaseHTTPMiddleware):
    """A middleware to handle errors"""

    async def dispatch(self, request: Request, call_next):
        """Try to process the request. If failed, return details about the exception"""

        try:
            return await call_next(request)
        except DetailedExceptionBase as e:
            info = self.extract_info(e)
            info["additional_information"] = e.detail
            status_code = e.status_code
        except Exception as e:
            info = self.extract_info(e)
            status_code = 500
        content = {"detail": [info]}
        return JSONResponse(status_code=status_code, content=content)

    @staticmethod
    def extract_info(error: BaseException):
        """Extract the type and the message of an error and return as a dict"""
        return {"type": type(error).__name__,
                "msg": str(error)}

dispatch(request, call_next) async

Try to process the request. If failed, return details about the exception

Source code in src/app/middlewares.py
async def dispatch(self, request: Request, call_next):
    """Try to process the request. If failed, return details about the exception"""

    try:
        return await call_next(request)
    except DetailedExceptionBase as e:
        info = self.extract_info(e)
        info["additional_information"] = e.detail
        status_code = e.status_code
    except Exception as e:
        info = self.extract_info(e)
        status_code = 500
    content = {"detail": [info]}
    return JSONResponse(status_code=status_code, content=content)

extract_info(error) staticmethod

Extract the type and the message of an error and return as a dict

Source code in src/app/middlewares.py
@staticmethod
def extract_info(error: BaseException):
    """Extract the type and the message of an error and return as a dict"""
    return {"type": type(error).__name__,
            "msg": str(error)}