Skip to content

Commit

Permalink
finish changes for download batch
Browse files Browse the repository at this point in the history
  • Loading branch information
datawhores committed Jul 3, 2024
1 parent 6c59683 commit a114977
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 58 deletions.
14 changes: 14 additions & 0 deletions ofscraper/classes/sessionmanager/chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class responseChunk:
def __init__(self, response,total):
self._response = response
self._total=total
self._count=0


async def read_chunk(self,chunk_size):
r=self._response
if r.eof():
return None
chunk=await r.read_(chunk_size)
return chunk

1 change: 1 addition & 0 deletions ofscraper/classes/sessionmanager/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ async def _aio_funct(self, method, *args, **kwargs):
r.request = r.request_info
r.status_code = r.status
r.read_ = r.content.read
r.eof=r.content.at_eof
return r

async def factoryasync(self, input):
Expand Down
6 changes: 4 additions & 2 deletions ofscraper/const/values/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@
MPD_FORCE_KEY = True
API_FORCE_KEY = False
PROFILE_FORCE_KEY = False
MULTIPROGRESS_JOB_UPDATE_FREQ=3
OVERALL_MULTI_PROGRESS_THREAD_SLEEP=.2
MULTIPROGRESS_JOB_UPDATE_FREQ=2
PROGRESS_JOB_UPDATE_FREQ=5
OVERALL_MULTI_PROGRESS_THREAD_SLEEP=.1
JOB_MULTI_PROGRESS_THREAD_SLEEP=.1
5 changes: 3 additions & 2 deletions ofscraper/const/values/req/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MAX_CHUNK_SIZE = 1024 * 1024 * 10
MIN_CHUNK_SIZE = 4 * 1024
CHUNK_UPDATE_COUNT = 12
CHUNK_SIZE_UPDATE_COUNT=15

REQ_SEMAPHORE_MULTI = 5
SCRAPE_PAID_SEMS = 10
Expand Down Expand Up @@ -49,5 +50,5 @@


# ideal chunk
CHUNK_MEMORY_SPLIT = 64
CHUNK_FILE_SPLIT = 64
CHUNK_MEMORY_SPLIT = 128
CHUNK_FILE_SPLIT = 128
6 changes: 3 additions & 3 deletions ofscraper/download/alt_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
temp_file_logger,
)
from ofscraper.download.utils.progress.chunk import (
get_ideal_chunk_size,
get_update_count,
get_ideal_chunk_size
)
from ofscraper.download.utils.retries import get_download_retries
from ofscraper.download.utils.send.chunk import send_chunk_msg
Expand Down Expand Up @@ -233,12 +233,12 @@ async def download_fileobject_writer(total, l, ele, placeholderObj):
fileobject = await aiofiles.open(placeholderObj.tempfilepath, "ab").__aenter__()
download_sleep = constants.getattr("DOWNLOAD_SLEEP")
chunk_size = get_ideal_chunk_size(total, placeholderObj.tempfilepath)
update_count = get_update_count(total, placeholderObj.tempfilepath, chunk_size)
update_count = get_update_count(total, placeholderObj.tempfilepath,chunk_size)
count = 1
try:
async for chunk in l.iter_chunked(chunk_size):
send_chunk_msg(ele, total, placeholderObj)
await fileobject.write(chunk)
send_chunk_msg(ele, total, placeholderObj)
await send_bar_msg(
partial(
progress_updater.update_download_job_task,
Expand Down
4 changes: 2 additions & 2 deletions ofscraper/download/alt_downloadbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from ofscraper.download.utils.progress.chunk import (
get_ideal_chunk_size,
get_update_count,
get_update_count
)
from ofscraper.download.utils.resume import get_resume_header, get_resume_size
from ofscraper.download.utils.retries import get_download_retries
Expand Down Expand Up @@ -259,7 +259,7 @@ async def download_fileobject_writer(total, req, ele, placeholderObj):
)
)
chunk_size = get_ideal_chunk_size(total, placeholderObj.tempfilepath)
update_count = get_update_count(total, placeholderObj.tempfilepath, chunk_size)
update_count = get_update_count(total, placeholderObj.tempfilepath,chunk_size)

async for chunk in req.iter_chunked(chunk_size):
await fileobject.write(chunk)
Expand Down
61 changes: 59 additions & 2 deletions ofscraper/download/downloadbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@
from ofscraper.download.utils.metadata import metadata
from ofscraper.download.utils.paths.paths import addGlobalDir, setDirectoriesDate
from ofscraper.download.utils.progress.progress import convert_num_bytes
from ofscraper.download.utils.send.message import send_msg
from ofscraper.download.utils.send.message import send_msg,send_msg_alt
from ofscraper.download.utils.workers import get_max_workers
from ofscraper.utils.context.run_async import run
from ofscraper.utils.live.progress import multi_download_job_progress
import ofscraper.utils.constants as constants


Expand All @@ -73,6 +72,7 @@ def process_dicts(username, model_id, filtered_medialist):
split_val = min(4, num_proc)
log.debug(f"Number of download threads: {num_proc}")
connect_tuples = [AioPipe() for _ in range(num_proc)]
alt_connect_tuples = [AioPipe() for _ in range(num_proc)]
shared = list(
more_itertools.chunked([i for i in range(num_proc)], split_val)
)
Expand Down Expand Up @@ -101,6 +101,7 @@ def process_dicts(username, model_id, filtered_medialist):
logqueues_[i // split_val],
otherqueues_[i // split_val],
connect_tuples[i][1],
alt_connect_tuples[i][1],
dates.getLogDate(),
selector.get_ALL_SUBS_DICT(),
read_args.retriveArgs(),
Expand Down Expand Up @@ -138,7 +139,18 @@ def process_dicts(username, model_id, filtered_medialist):
)
for i in range(num_proc)
]
download_queue_threads = [
threading.Thread(
target=asyncio.run,
args=(download_progress(
alt_connect_tuples[i][0],
),),
daemon=True,
)
for i in range(num_proc)
]
[thread.start() for thread in queue_threads]
[thread.start() for thread in download_queue_threads]

log.debug(f"Initial Queue Threads: {queue_threads}")
log.debug(f"Number of initial Queue Threads: {len(queue_threads)}")
Expand All @@ -155,6 +167,19 @@ def process_dicts(username, model_id, filtered_medialist):
for thread in queue_threads:
thread.join(timeout=0.1)
time.sleep(0.5)
while True:
newqueue_threads = list(
filter(lambda x: x and x.is_alive(), download_queue_threads)
)
if len(newqueue_threads) != len(queue_threads):
log.debug(f"Remaining Download Queue Threads: {download_queue_threads}")
log.debug(f"Number of Download Queue Threads: {len(download_queue_threads)}")
if len(download_queue_threads) == 0:
break
download_queue_threads = newqueue_threads
for thread in download_queue_threads:
thread.join(timeout=0.1)
time.sleep(0.5)
log.debug(f"Intial Log Threads: {log_threads}")
log.debug(f"Number of intial Log Threads: {len(log_threads)}")
while True:
Expand Down Expand Up @@ -212,6 +237,20 @@ def process_dicts(username, model_id, filtered_medialist):



async def download_progress(pipe_):
count = 0
# shared globals
sleep = constants.getattr("JOB_MULTI_PROGRESS_THREAD_SLEEP")
while True:
time.sleep(sleep)
if count == 1:
break
results = pipe_.recv()
if not isinstance(results, list):
results = [results]
for result in results:
await ajob_progress_helper(result)


def queue_process(pipe_, task1, total):
count = 0
Expand Down Expand Up @@ -300,6 +339,7 @@ def process_dict_starter(
p_logqueue_,
p_otherqueue_,
pipe_,
pipe2_,
dateDict,
userNameList,
argsCopy,
Expand All @@ -308,6 +348,7 @@ def process_dict_starter(
dateDict,
userNameList,
pipe_,
pipe2_,
logger.get_shared_logger(
main_=p_logqueue_, other_=p_otherqueue_, name=f"shared_{os.getpid()}"
),
Expand Down Expand Up @@ -336,10 +377,25 @@ def process_dict_starter(
def job_progress_helper(funct):
try:
funct()
#probably handle by other thread
except KeyError:
pass
except Exception as E:
logging.getLogger("shared").debug(E)


async def ajob_progress_helper(funct):
try:
await asyncio.get_event_loop().run_in_executor(
None,
funct,
)
#probably handle by other thread
except KeyError:
pass
except Exception as E:
logging.getLogger("shared").debug(E)

def setpriority():
os_used = platform.system()
process = psutil.Process(
Expand Down Expand Up @@ -422,6 +478,7 @@ async def process_dicts_split(username, model_id, medialist):
common_globals.log.debug("other thread closed")
await send_msg({"dir_update": common_globals.localDirSet})
await send_msg(None)
await send_msg_alt(None)


def pid_log_helper():
Expand Down
4 changes: 2 additions & 2 deletions ofscraper/download/main_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
)
from ofscraper.download.utils.metadata import force_download
from ofscraper.download.utils.progress.chunk import (
get_ideal_chunk_size,
get_update_count,
get_ideal_chunk_size
)
from ofscraper.download.utils.resume import get_resume_header, get_resume_size
from ofscraper.download.utils.retries import get_download_retries
Expand Down Expand Up @@ -210,7 +210,7 @@ async def download_fileobject_writer(r, ele, tempholderObj, placeholderObj, tota
fileobject = await aiofiles.open(tempholderObj.tempfilepath, "ab").__aenter__()
download_sleep = constants.getattr("DOWNLOAD_SLEEP")
chunk_size = get_ideal_chunk_size(total, tempholderObj.tempfilepath)
update_count = get_update_count(total, tempholderObj.tempfilepath, chunk_size)
update_count = get_update_count(total, tempholderObj.tempfilepath,chunk_size)
count = 1
async for chunk in r.iter_chunked(chunk_size):
await fileobject.write(chunk)
Expand Down
41 changes: 22 additions & 19 deletions ofscraper/download/main_downloadbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
)
from ofscraper.download.utils.metadata import force_download
from ofscraper.download.utils.progress.chunk import (
get_ideal_chunk_size,
change_get_ideal_chunk_size,
get_update_count,
get_ideal_chunk_size,
)
from ofscraper.download.utils.resume import get_resume_header, get_resume_size
from ofscraper.download.utils.retries import get_download_retries
Expand Down Expand Up @@ -233,26 +234,28 @@ async def download_fileobject_writer(r, ele, total, tempholderObj, placeholderOb
partial(progress_updater.update_download_multi_job_task, ele.id, visible=True)
)
chunk_size = get_ideal_chunk_size(total, tempholderObj.tempfilepath)
update_count = get_update_count(total, tempholderObj.tempfilepath, chunk_size)
update_count = get_update_count(total, tempholderObj.tempfilepath,chunk_size)
count = 1

async for chunk in r.iter_chunked(chunk_size):
send_chunk_msg(ele, total, tempholderObj)
await fileobject.write(chunk)
await send_bar_msg_batch(
partial(
progress_updater.update_download_multi_job_task,
ele.id,
completed=pathlib.Path(tempholderObj.tempfilepath)
.absolute()
.stat()
.st_size,
),
count,
update_count,
)
count += 1
(await asyncio.sleep(download_sleep)) if download_sleep else None
try:
await fileobject.write(chunk)
send_chunk_msg(ele, total, tempholderObj)
await send_bar_msg_batch(
partial(
progress_updater.update_download_multi_job_task,
ele.id,
completed=pathlib.Path(tempholderObj.tempfilepath)
.absolute()
.stat()
.st_size,
),
count,
update_count,
)
count += 1
(await asyncio.sleep(download_sleep)) if download_sleep else None
except EOFError:
break
except Exception as E:
# reset download data
raise E
Expand Down
2 changes: 1 addition & 1 deletion ofscraper/download/utils/alt/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def resume_data_handler_alt(data, item, ele, placeholderObj, batch=False):
elif not batch:
total_change_helper(None, total)
elif batch:
batch_total_change_helper(None, total)
await batch_total_change_helper(None, total)
return item, True
elif total != resume_size:
return item, False
Expand Down
4 changes: 2 additions & 2 deletions ofscraper/download/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def add_additional_data(placeholderObj, ele):
add_path(placeholderObj, ele)


def subProcessVariableInit(dateDict, userList, pipeCopy, logCopy, argsCopy):
def subProcessVariableInit(dateDict, userList, pipeCopy, pipeAltCopy, logCopy, argsCopy):
common_globals.reset_globals()
write_args.setArgs(argsCopy)
dates.setLogDate(dateDict)
selector.set_ALL_SUBS_DICT(userList)
common_globals.process_split_globals(pipeCopy, logCopy)
common_globals.process_split_globals(pipeCopy, pipeAltCopy,logCopy)
set_send_msg()


Expand Down
6 changes: 5 additions & 1 deletion ofscraper/download/utils/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ def set_up_contexvars():
innerlog = contextvars.ContextVar("innerlog")


def process_split_globals(pipeCopy, logCopy):
def process_split_globals(pipeCopy, pipeAltCopy,logCopy):
global pipe
global log
global pipe_lock
global pipe_alt_lock
global lock_pool
global pipe_alt
pipe = pipeCopy
pipe_alt=pipeAltCopy
log = logCopy
pipe_lock = threading.Lock()
pipe_alt_lock = threading.Lock()
lock_pool = ThreadPoolExecutor(max_workers=1)
2 changes: 1 addition & 1 deletion ofscraper/download/utils/main/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ async def resume_data_handler_main(data, ele, tempholderObj, batch=False):
elif not batch:
total_change_helper(None, total)
elif batch:
batch_total_change_helper(None, total)
await batch_total_change_helper(None, total)
check = True
return total, placeholderObj, check
10 changes: 8 additions & 2 deletions ofscraper/download/utils/progress/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

import ofscraper.utils.constants as constants

def change_get_ideal_chunk_size(total_size, curr_file,count=None,chunk_size=None,update_count=None):
if not chunk_size:
chunk_size = get_ideal_chunk_size(total_size, curr_file)
elif count%(update_count*5)==0:
chunk_size = get_ideal_chunk_size(total_size, curr_file)
return chunk_size

def get_ideal_chunk_size(total_size, curr_file):
"""
Expand All @@ -42,10 +48,10 @@ def get_ideal_chunk_size(total_size, curr_file):

# Target a chunk size that utilizes a reasonable portion of available memory
max_chunk_size = min(
available_memory // 512, constants.getattr("MAX_CHUNK_SIZE")
available_memory // constants.getattr("CHUNK_MEMORY_SPLIT"), constants.getattr("MAX_CHUNK_SIZE")
) # Max 10MB
# Adjust chunk size based on file size (consider smaller sizes for larger files, with minimum)
ideal_chunk_size = min(max_chunk_size, file_size // 512)
ideal_chunk_size = min(max_chunk_size, file_size // constants.getattr("CHUNK_FILE_SPLIT"))
ideal_chunk_size = max(
ideal_chunk_size - (ideal_chunk_size % 4096),
constants.getattr("MIN_CHUNK_SIZE"),
Expand Down
Loading

0 comments on commit a114977

Please sign in to comment.