Skip to content

Commit

Permalink
Improve pipeline concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
allegroai committed Apr 18, 2022
1 parent 8314074 commit 49e5acb
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions examples/pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List


# Notice Preprocess class Must be named "Preprocess"
class Preprocess(object):
def __init__(self):
# set internal state, this will be called only once. (i.e. not per request)
pass
self.executor = ThreadPoolExecutor(max_workers=32)

def postprocess(self, data: List[dict], collect_custom_statistics_fn=None) -> dict:
# we will here average the results and return the new value
Expand All @@ -19,8 +20,12 @@ def process(self, data: Any, collect_custom_statistics_fn=None) -> Any:
do something with the actual data, return any type of object.
The returned object will be passed as is to the postprocess function engine
"""
predict_a = self.send_request(endpoint="/test_model_sklearn_a/", version=None, data=data)
predict_b = self.send_request(endpoint="/test_model_sklearn_b/", version=None, data=data)
predict_a = self.executor.submit(self.send_request, endpoint="/test_model_sklearn_a/", version=None, data=data)
predict_b = self.executor.submit(self.send_request, endpoint="/test_model_sklearn_b/", version=None, data=data)

predict_a = predict_a.result()
predict_b = predict_b.result()

if not predict_b or not predict_a:
raise ValueError("Error requesting inference endpoint test_model_sklearn a/b")

Expand Down

0 comments on commit 49e5acb

Please sign in to comment.