Skip to content

Commit

Permalink
use lock instead of queue
Browse files Browse the repository at this point in the history
  • Loading branch information
datawhores committed Jul 5, 2024
1 parent 3c1530b commit 444c8d5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
21 changes: 6 additions & 15 deletions ofscraper/download/downloadbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ async def ajob_progress_helper(funct):
except Exception as E:
logging.getLogger("shared").debug(E)

async def consumer(queue):
async def consumer(lock,aws):
while True:
data = await queue.get()
async with lock:
data = aws.pop()
if data is None:
queue.task_done()
break
else:
try:
Expand All @@ -393,18 +393,9 @@ async def consumer(queue):
media_type = "skipped"
num_bytes_downloaded = 0
await send_msg((media_type, num_bytes_downloaded, 0))
queue.task_done()
await asyncio.sleep(1)


async def producer(queue, aws, concurrency_limit):
for data in aws:
await queue.put(data)
for _ in range(concurrency_limit):
await queue.put(None)
await queue.join() # Wait for all tasks to finish


@run
async def process_dicts_split(username, model_id, medialist):
common_globals.log.debug(f"{pid_log_helper()} start inner thread for other loggers")
Expand All @@ -426,11 +417,11 @@ async def process_dicts_split(username, model_id, medialist):
for ele in medialist:
aws.append((c, ele, model_id, username))
concurrency_limit = get_max_workers()
queue = asyncio.Queue(maxsize=concurrency_limit)
lock = asyncio.Lock()
consumers = [
asyncio.create_task(consumer(queue)) for _ in range(concurrency_limit)
asyncio.create_task(consumer(lock,aws)) for _ in range(concurrency_limit)
]
await producer(queue, aws, concurrency_limit)

await asyncio.gather(*consumers)
common_globals.log.debug(f"{pid_log_helper()} download process thread closing")
# send message directly
Expand Down
10 changes: 9 additions & 1 deletion ofscraper/filters/media/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def filtermediaFinal(media, username, model_id):
if "download" not in actions and not scrape_paid:
log.debug("Skipping filtering because download not in actions")
return media
log.info(f"finalizing media filtering {username} {model_id} for downlaod")


count = 1
trace_log_media(count, media, "initial media no filter:")
Expand Down Expand Up @@ -48,12 +50,13 @@ def filtermediaFinal(media, username, model_id):


def filtermediaAreas(media, **kwargs):

actions = read_args.retriveArgs().action
scrape_paid = read_args.retriveArgs().scrape_paid
if "download" not in actions and not scrape_paid:
log.debug("Skipping filtering because download not in actions")
return media

log.info("Initial media filtering for download")
count = 1

trace_log_media(count, media, "initial media no filter:")
Expand Down Expand Up @@ -128,9 +131,12 @@ def filterPostFinal(post):
if "download" not in actions and not scrape_paid:
log.debug("Skipping filtering because download not in actions")
return post

if "Text" not in settings.get_mediatypes():
log.info("Skipping filtering Text not in mediatypes")
return post
log.info("Filtering posts fot text")

count = 1
trace_log_post(count, post, "initial posts no filter:")
log.debug(f"filter {count}-> initial posts no filter count: {len(post)}")
Expand Down Expand Up @@ -171,10 +177,12 @@ def filterPostFinal(post):


def post_filter_for_like(post, like=False):

actions = read_args.retriveArgs().action
if "like" not in actions and "unlike" not in actions:
log.debug("Skipping filtering because like and unlike not in actions")
return post
log.info("Filtering posts for like action")
log.debug(f"initial number of posts for {actions}")

post = helpers.temp_post_filter(post)
Expand Down

0 comments on commit 444c8d5

Please sign in to comment.