Skip to content

manager

DrsAsyncManager

Bases: DrsManager

Manage DRSClient workload with asyncio threads, display progress.

Source code in drs_downloader/manager.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
class DrsAsyncManager(DrsManager):
    """Manage DRSClient workload with asyncio threads, display progress."""

    def __init__(
        self,
        drs_client: DrsClient,
        show_progress: bool = True,
        part_size: int = DEFAULT_PART_SIZE,
        max_simultaneous_object_retrievers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_RETRIEVERS,
        max_simultaneous_downloaders=DEFAULT_MAX_SIMULTANEOUS_DOWNLOADERS,
        max_simultaneous_part_handlers=DEFAULT_MAX_SIMULTANEOUS_PART_HANDLERS,
        max_simultaneous_object_signers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_SIGNERS,
    ):
        """

        Args:
            drs_client: the client that will interact with server
            show_progress: show progress bars
            part_size: tweak to optimize workload
            max_simultaneous_object_retrievers: tweak to optimize workload
            max_simultaneous_downloaders: tweak to optimize workload
            max_simultaneous_part_handlers: tweak to optimize workload
        """
        # """Implements abstract constructor."""
        super().__init__(drs_client=drs_client)
        self.max_simultaneous_object_retrievers = max_simultaneous_object_retrievers
        self.max_simultaneous_object_signers = max_simultaneous_object_signers
        self.max_simultaneous_downloaders = max_simultaneous_downloaders
        self.max_simultaneous_part_handlers = max_simultaneous_part_handlers
        self.disable = not show_progress
        self.part_size = part_size

    @staticmethod
    def _parts_generator(
        size: int, start: int = 0, part_size: int = None
    ) -> Iterator[Tuple[int, int]]:
        """Determine the start,size for each part

        Args:
            size: size of file
            start: offset into file 0 based
            part_size: desired part size

        Returns:
            iterator of tuple start, size
        """
        while size - start > part_size:
            yield start, start + part_size
            start += part_size + 1
            # start += part_size
        yield start, size

    async def wait_till_completed(self, tasks, err_function_msg):
        drs_objects_with_signed_urls = []
        while tasks:
            done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
            for t in tqdm.tqdm(done, total=len(tasks),
                               desc=f"retrieving {err_function_msg} information",
                               disable=self.disable):
                try:
                    y = await t
                    drs_objects_with_signed_urls.append(y)

                except Exception:
                    signed_url = DrsObject(
                                    self_uri="",
                                    id="",
                                    checksums=[],
                                    size=0,
                                    name=None,
                                    errors=[f"Exception in {err_function_msg} function"],
                                )

                    drs_objects_with_signed_urls.append(signed_url)
                    t.cancel()

        return drs_objects_with_signed_urls

    async def _run_download_parts(
        self, drs_object: DrsObject, destination_path: Path, verbose: bool
    ) -> DrsObject:
        """Determine number of parts for signed url and create tasks for each part, run them in batches.

        Args:
            drs_object: Information about a bucket object

        Returns:
            list of paths to files for each part, in order.
        """
        # create a list of parts
        parts = []
        for start, size in self._parts_generator(
            size=drs_object.size, part_size=self.part_size
        ):
            parts.append(
                (
                    start,
                    size,
                )
            )

        if len(parts) > 1000:
            if verbose:
                logger.warning(f'Warning: tasks > 1000 {drs_object.name} has over 1000 parts and is a large download. \
                ({len(parts)})')

        if drs_object.size > 20 * MB:
            self.disable = False
        else:
            self.disable = True

        paths = []
        # TODO - tqdm ugly here?
        for chunk_parts in tqdm.tqdm(
            DrsAsyncManager.chunker(parts, self.max_simultaneous_part_handlers),
            total=math.ceil(len(parts) / self.max_simultaneous_part_handlers),
            desc="File Download Progress",
            file=sys.stdout,
            leave=False,
            disable=self.disable,
        ):
            chunk_tasks = []
            existing_chunks = []
            for start, size in chunk_parts:
                # Check if part file exists and if so verify the expected size.
                # If size matches the expected value then return the Path of the file_name for eventual reassembly.
                # If size does not match then attempt to restart the download.
                file_name = destination_path / f"{drs_object.name}.{start}.{size}.part"
                file_path = Path(file_name)

                if self.check_existing_parts(file_path, start, size, verbose):
                    existing_chunks.append(file_path)
                    continue

                task = asyncio.create_task(
                    self._drs_client.download_part(
                        drs_object=drs_object,
                        start=start,
                        size=size,
                        destination_path=destination_path,
                        verbose=verbose
                    )
                )
                chunk_tasks.append(task)

            chunk_paths = await self.wait_till_completed(chunk_tasks, "download_parts")

            """
            Uncessesary logging message for the end user. When you take into account
            that most downloads are going to take longer than 15 minutes and this message
             will be spammed for every part that is already downlaoded when the resigning step happens.

            if len(existing_chunks) > 0:
                #logger.info(f"{drs_object.name} had {len(existing_chunks)} existing parts.")
            """

            chunk_paths.extend(existing_chunks)
            # something bad happened
            if None in chunk_paths:
                if any(
                    [
                        "RECOVERABLE in AIOHTTP" in str(error)
                        for error in drs_object.errors
                    ]
                ):
                    return drs_object
                else:
                    if verbose:
                        logger.error(f"{drs_object.name} had missing part.")
                    return drs_object

            paths.extend(chunk_paths)

        """
       print(" LIST OF DRS OBJECT ERRORS AFTER DLOAD IN RUN DOWNLOAD PARTS",drs_object.errors)
       print("RECOVERABLE? ",any(['RECOVERABLE' in str(error) for error in drs_object.errors]))

        if(any(['RECOVERABLE' in str(error) for error in drs_object.errors])):
                return drs_object
        """

        if (
            None not in chunk_paths
            and len(existing_chunks) == 0
            and self.disable is True
        ):
            if verbose:
                logger.info("%s Downloaded sucessfully", drs_object.name)

        drs_object.file_parts = paths

        i = 1
        filename = (
            f"{drs_object.name}"
            or drs_object.access_methods[0].access_url.split("/")[-1].split("?")[0]
        )
        original_file_name = Path(filename)
        while True:
            if os.path.isfile(destination_path.joinpath(filename)):
                filename = f"{original_file_name}({i})"
                i = i + 1
                continue
            break

        # re-assemble and test the file parts
        # hash function dynamic
        checksum_type = drs_object.checksums[0].type
        assert (
            checksum_type in hashlib.algorithms_available
        ), f"Checksum {checksum_type} not supported."
        checksum = hashlib.new(checksum_type)
        with open(destination_path.joinpath(filename), "wb") as wfd:
            # sort the items of the list in place - Numerically based on start i.e. "xxxxxx.start.end.part"
            drs_object.file_parts.sort(key=lambda x: int(str(x).split(".")[-3]))

            T_0 = time.time()
            for f in tqdm.tqdm(
                drs_object.file_parts,
                total=len(drs_object.file_parts),
                desc=f"       {drs_object.name:50.50} stitching",
                file=sys.stdout,
                leave=False,
                disable=self.disable,
            ):
                fd = open(f, "rb")  # NOT ASYNC
                wrapped_fd = Wrapped(fd, checksum)
                # efficient way to write
                await asyncio.to_thread(
                    shutil.copyfileobj, wrapped_fd, wfd, 1024 * 1024 * 10
                )
                # explicitly close all
                wrapped_fd.close()
                f.unlink()
                fd.close()
                wfd.flush()
            T_FIN = time.time()
            if verbose:
                logger.info(f"TOTAL 'STITCHING' (md5 10*MB no flush) TIME {T_FIN-T_0} {original_file_name}")
        actual_checksum = checksum.hexdigest()

        actual_size = os.stat(Path(destination_path.joinpath(filename))).st_size

        # compare calculated md5 vs expected
        expected_checksum = drs_object.checksums[0].checksum
        if expected_checksum != actual_checksum:
            msg = f"Actual {checksum_type} hash {actual_checksum} does not match expected {expected_checksum}"
            if verbose:
                logger.error(f"Actual {checksum_type} hash {actual_checksum} \
                             does not match expected {expected_checksum}")
            drs_object.errors.append(msg)

        if drs_object.size != actual_size:
            msg = f"The actual size {actual_size} does not match expected size {drs_object.size}"
            drs_object.errors.append(msg)

        # parts will be purposefully saved if there is an error so that
        # recovery script can have a chance to rebuild the file

        return drs_object

    async def _run_download(
        self, drs_objects: List[DrsObject], destination_path: Path, verbose: bool
    ) -> List[DrsObject]:
        """
        Create tasks to sign and download, display progress.
        Args:
            drs_objects: list of drs objects to download

        Returns:
            updated list of drs objects
        """

        # first sign the urls
        tasks = []
        for drs_object in drs_objects:
            if len(drs_object.errors) == 0:
                task = asyncio.create_task(self._drs_client.sign_url(drs_object=drs_object, verbose=verbose))
                tasks.append(task)

        drs_objects_with_signed_urls = await self.wait_till_completed(tasks, "sign_url")

        tasks = []
        for drs_object in drs_objects_with_signed_urls:
            if len(drs_object.errors) == 0:
                task = asyncio.create_task(
                    self._run_download_parts(
                        drs_object=drs_object, destination_path=destination_path, verbose=verbose
                    )
                )
                tasks.append(task)

            else:
                logger.error(
                    f"{drs_object.id} has error {drs_object.errors}, not attempting anything further"
                )

        drs_objects_with_file_parts = await self.wait_till_completed(tasks, "run_download_parts")

        return drs_objects_with_file_parts

    async def _run_get_objects(
        self, object_ids: List[str], leave: bool, verbose: bool
    ) -> List[DrsObject]:
        """Create async tasks to retrieve list DrsObject, displays progress.

        Args:
            object_ids: object_id from manifest
            leave: leave flag to keep progress indicator displayed

        Returns:

        """

        tasks = []
        for object_id in object_ids:
            task = asyncio.create_task(self._drs_client.get_object(object_id=object_id, verbose=verbose))
            tasks.append(task)

        object_information = []

        object_information = await self.wait_till_completed(tasks, "get_object")

        return object_information

    @classmethod
    def chunker(cls, seq: Collection, size: int) -> Iterator:
        """Iterate over a list in chunks.

        Args:
            seq: an iterable
            size: desired chunk size

        Returns:
            an iterator that returns lists of size or less
        """
        return (seq[pos: pos + size] for pos in range(0, len(seq), size))

    def get_objects(self, object_ids: List[str], verbose: bool) -> List[DrsObject]:
        """Create tasks for all object_ids, run them in batches, get information about the object.

        Args:
            object_ids: list of objects to fetch
        """

        drs_objects = []

        total_batches = math.ceil(
            len(object_ids) / self.max_simultaneous_object_retrievers
        )
        # rounding
        # this would imply that if batch count is 9.3, and you round down the last .3 is never
        # actually downloaded since there are only 9 batches. math.ciel would round up if there is a decimal at all

        current = 0

        for chunk_of_object_ids in DrsAsyncManager.chunker(
            object_ids, self.max_simultaneous_object_retrievers
        ):

            drs_objects.extend(
                asyncio.run(
                    self._run_get_objects(
                        object_ids=chunk_of_object_ids, leave=(current == total_batches), verbose=verbose
                    )
                )
            )
            current += 1

        return drs_objects

    def download(
        self, drs_objects: List[DrsObject], destination_path: Path, duplicate: bool, verbose: bool
    ) -> List[DrsObject]:
        """Split the drs_objects into manageable sizes, download the files.

        Args:
            drs_objects: list of DrsObject
            destination_path: directory where to write files when complete

        Returns:
            DrsObjects updated with _file_parts
        """
        while True:

            filtered_objects = self.filter_existing_files(
                drs_objects, destination_path, duplicate=duplicate, verbose=verbose
            )
            if len(filtered_objects) < len(drs_objects):
                complete_objects = [
                    obj for obj in drs_objects if obj not in filtered_objects
                ]
                for obj in complete_objects:
                    if verbose:
                        logger.info(f"{obj.name} already exists in {destination_path}. Skipping download.")

                if len(filtered_objects) == 0:
                    logger.info(
                        f"All DRS objects already present in {destination_path}."
                    )
                    return

            current = 0
            updated_drs_objects = []

            for chunk_of_drs_objects in DrsAsyncManager.chunker(
                filtered_objects, self.max_simultaneous_object_retrievers
            ):

                completed_chunk = asyncio.run(
                    self._run_download(
                        drs_objects=chunk_of_drs_objects,
                        destination_path=destination_path, verbose=verbose
                    )
                )
                current += 1
                updated_drs_objects.extend(completed_chunk)
            if verbose:
                logger.info(f"UPDATED DRS OBJECTS \n\n {updated_drs_objects}")

            if "RECOVERABLE in AIOHTTP" not in str(updated_drs_objects):
                break

            else:
                if verbose:
                    logger.info("RECURSING \n\n\n")

            for drsobject in drs_objects:
                drsobject.errors.clear()

        return updated_drs_objects

    def optimize_workload(
        self, verbose, drs_objects: List[DrsObject]
    ) -> List[DrsObject]:
        """
        Optimize the workload, sort prioritize and set thread management parameters.
        Args:
            drs_objects:

        Returns:
            same list that was passed
        """
        # Now that we have the objects to download, we have an opportunity to shape the downloads
        # e.g. are the smallest files first?  tweak MAX_* to optimize per workload
        # TODO: If part sizes changed here, would this result in an error in test recovery?
        # Going to maek all part sizes 128MB to solve the problem above except for the small files because in that case
        # there is a pytest written for them that will fail otherwise

        if len(drs_objects) == 1:
            self.max_simultaneous_part_handlers = 50
            self.part_size = 64 * MB
            self.max_simultaneous_downloaders = 10
            if verbose:
                logger.info("part_size=%s", self.part_size)

        elif any(True for drs_object in drs_objects if (int(drs_object.size) > GB)):
            self.max_simultaneous_part_handlers = 3
            self.part_size = 128 * MB
            self.max_simultaneous_downloaders = 10
            if verbose:
                logger.info("part_size=%s", self.part_size)

        elif all((drs_object.size < (5 * MB)) for drs_object in drs_objects):
            self.part_size = 1 * MB
            self.max_simultaneous_part_handlers = 2
            self.max_simultaneous_downloaders = 10
            if verbose:
                logger.info("part_size=%s", self.part_size)
                logger.info("part_handlers=%s", self.max_simultaneous_part_handlers)

        else:
            self.part_size = 128 * MB
            self.max_simultaneous_part_handlers = 10
            self.max_simultaneous_downloaders = 10
            if verbose:
                logger.info("part_size=%s", self.part_size)
                logger.info("part_handlers=%s", self.max_simultaneous_part_handlers)

        return drs_objects

    def filter_existing_files(
        self, drs_objects: List[DrsObject], destination_path: Path, duplicate: bool, verbose: bool
    ) -> List[DrsObject]:
        """Remove any DRS objects from a given list if they are already exist in the destination directory.

        Args:
            drs_objects (List[DrsObject]): The DRS objects from the manifest (some may already be downloaded)
            destination_path (Path): Download destination that may contain partially downloaded files

        Returns:
            List[DrsObject]: The DRS objects that have yet to be downloaded
        """

        if verbose:
            logger.info(f"VALUE OF duplicate {duplicate}")
        if duplicate is True:
            return drs_objects

        # Testing File filtering by size also
        """
        for drs in drs_objects:
            logger.info(f"drs.size: {drs.size}  != os.path.getsize(drs.name) {os.path.getsize(drs.name)}")
            if(drs.size != os.path.getsize(drs.name)):
                logger.warning(f"{drs.name} is the wrong size,
                consider running this command again with the --duplicate flag
                 so that your current file with the same name is
                  not overwritten by this one that is on the path to downloading")
        """
        filtered_objects = [
            drs for drs in drs_objects if (drs.name not in os.listdir(destination_path))
            #  or drs.size != os.path.getsize(drs.name) <-- this is used for filtering out wrong sized stuff
        ]
        if verbose:
            logger.info(f"VALUE OF FILTERED OBJECTS {filtered_objects}")

        return filtered_objects

    def check_existing_parts(self, file_path: Path, start: int, size: int, verbose: bool) -> bool:
        """Checks if any file parts have already been downloaded. If a file part was partially downloaded then it
           prompts a new download process for that part.

        Args:
            file_path (Path): Path of the given file part (ex. HG00536.final.cram.crai.1048577.1244278.part)
            start (int): Beginning byte of the file part (ex. 1048577)
            size (int): Final byte of the file part (ex. 1244278)

        Returns:
            bool: True if the file part exists in the destination and has the expected file size, False otherwise
        """

        if file_path.exists():
            expected_size = size - start + 1
            if verbose:
                logger.info(f"EXPTECTED PART SIZE SIZE {expected_size}")

            actual_size = file_path.stat().st_size
            sizes_match = actual_size == expected_size
            if verbose:
                logger.info(f"ACTUAL SIZE {actual_size}")

            if sizes_match is True:
                # this logger message is really redundant when you are downloading large files.
                # For the purposes of cleaning up the UI on expired signed URLS going to comment this out for now
                # logger.info(f"{file_path.name} exists and has expected size. Skipping download.")
                return True

        return False

__init__(drs_client, show_progress=True, part_size=DEFAULT_PART_SIZE, max_simultaneous_object_retrievers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_RETRIEVERS, max_simultaneous_downloaders=DEFAULT_MAX_SIMULTANEOUS_DOWNLOADERS, max_simultaneous_part_handlers=DEFAULT_MAX_SIMULTANEOUS_PART_HANDLERS, max_simultaneous_object_signers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_SIGNERS)

Parameters:

Name Type Description Default
drs_client DrsClient

the client that will interact with server

required
show_progress bool

show progress bars

True
part_size int

tweak to optimize workload

DEFAULT_PART_SIZE
max_simultaneous_object_retrievers

tweak to optimize workload

DEFAULT_MAX_SIMULTANEOUS_OBJECT_RETRIEVERS
max_simultaneous_downloaders

tweak to optimize workload

DEFAULT_MAX_SIMULTANEOUS_DOWNLOADERS
max_simultaneous_part_handlers

tweak to optimize workload

DEFAULT_MAX_SIMULTANEOUS_PART_HANDLERS
Source code in drs_downloader/manager.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    drs_client: DrsClient,
    show_progress: bool = True,
    part_size: int = DEFAULT_PART_SIZE,
    max_simultaneous_object_retrievers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_RETRIEVERS,
    max_simultaneous_downloaders=DEFAULT_MAX_SIMULTANEOUS_DOWNLOADERS,
    max_simultaneous_part_handlers=DEFAULT_MAX_SIMULTANEOUS_PART_HANDLERS,
    max_simultaneous_object_signers=DEFAULT_MAX_SIMULTANEOUS_OBJECT_SIGNERS,
):
    """

    Args:
        drs_client: the client that will interact with server
        show_progress: show progress bars
        part_size: tweak to optimize workload
        max_simultaneous_object_retrievers: tweak to optimize workload
        max_simultaneous_downloaders: tweak to optimize workload
        max_simultaneous_part_handlers: tweak to optimize workload
    """
    # """Implements abstract constructor."""
    super().__init__(drs_client=drs_client)
    self.max_simultaneous_object_retrievers = max_simultaneous_object_retrievers
    self.max_simultaneous_object_signers = max_simultaneous_object_signers
    self.max_simultaneous_downloaders = max_simultaneous_downloaders
    self.max_simultaneous_part_handlers = max_simultaneous_part_handlers
    self.disable = not show_progress
    self.part_size = part_size

check_existing_parts(file_path, start, size, verbose)

Checks if any file parts have already been downloaded. If a file part was partially downloaded then it prompts a new download process for that part.

Parameters:

Name Type Description Default
file_path Path

Path of the given file part (ex. HG00536.final.cram.crai.1048577.1244278.part)

required
start int

Beginning byte of the file part (ex. 1048577)

required
size int

Final byte of the file part (ex. 1244278)

required

Returns:

Name Type Description
bool bool

True if the file part exists in the destination and has the expected file size, False otherwise

Source code in drs_downloader/manager.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def check_existing_parts(self, file_path: Path, start: int, size: int, verbose: bool) -> bool:
    """Checks if any file parts have already been downloaded. If a file part was partially downloaded then it
       prompts a new download process for that part.

    Args:
        file_path (Path): Path of the given file part (ex. HG00536.final.cram.crai.1048577.1244278.part)
        start (int): Beginning byte of the file part (ex. 1048577)
        size (int): Final byte of the file part (ex. 1244278)

    Returns:
        bool: True if the file part exists in the destination and has the expected file size, False otherwise
    """

    if file_path.exists():
        expected_size = size - start + 1
        if verbose:
            logger.info(f"EXPTECTED PART SIZE SIZE {expected_size}")

        actual_size = file_path.stat().st_size
        sizes_match = actual_size == expected_size
        if verbose:
            logger.info(f"ACTUAL SIZE {actual_size}")

        if sizes_match is True:
            # this logger message is really redundant when you are downloading large files.
            # For the purposes of cleaning up the UI on expired signed URLS going to comment this out for now
            # logger.info(f"{file_path.name} exists and has expected size. Skipping download.")
            return True

    return False

chunker(seq, size) classmethod

Iterate over a list in chunks.

Parameters:

Name Type Description Default
seq Collection

an iterable

required
size int

desired chunk size

required

Returns:

Type Description
Iterator

an iterator that returns lists of size or less

Source code in drs_downloader/manager.py
435
436
437
438
439
440
441
442
443
444
445
446
@classmethod
def chunker(cls, seq: Collection, size: int) -> Iterator:
    """Iterate over a list in chunks.

    Args:
        seq: an iterable
        size: desired chunk size

    Returns:
        an iterator that returns lists of size or less
    """
    return (seq[pos: pos + size] for pos in range(0, len(seq), size))

download(drs_objects, destination_path, duplicate, verbose)

Split the drs_objects into manageable sizes, download the files.

Parameters:

Name Type Description Default
drs_objects List[DrsObject]

list of DrsObject

required
destination_path Path

directory where to write files when complete

required

Returns:

Type Description
List[DrsObject]

DrsObjects updated with _file_parts

Source code in drs_downloader/manager.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def download(
    self, drs_objects: List[DrsObject], destination_path: Path, duplicate: bool, verbose: bool
) -> List[DrsObject]:
    """Split the drs_objects into manageable sizes, download the files.

    Args:
        drs_objects: list of DrsObject
        destination_path: directory where to write files when complete

    Returns:
        DrsObjects updated with _file_parts
    """
    while True:

        filtered_objects = self.filter_existing_files(
            drs_objects, destination_path, duplicate=duplicate, verbose=verbose
        )
        if len(filtered_objects) < len(drs_objects):
            complete_objects = [
                obj for obj in drs_objects if obj not in filtered_objects
            ]
            for obj in complete_objects:
                if verbose:
                    logger.info(f"{obj.name} already exists in {destination_path}. Skipping download.")

            if len(filtered_objects) == 0:
                logger.info(
                    f"All DRS objects already present in {destination_path}."
                )
                return

        current = 0
        updated_drs_objects = []

        for chunk_of_drs_objects in DrsAsyncManager.chunker(
            filtered_objects, self.max_simultaneous_object_retrievers
        ):

            completed_chunk = asyncio.run(
                self._run_download(
                    drs_objects=chunk_of_drs_objects,
                    destination_path=destination_path, verbose=verbose
                )
            )
            current += 1
            updated_drs_objects.extend(completed_chunk)
        if verbose:
            logger.info(f"UPDATED DRS OBJECTS \n\n {updated_drs_objects}")

        if "RECOVERABLE in AIOHTTP" not in str(updated_drs_objects):
            break

        else:
            if verbose:
                logger.info("RECURSING \n\n\n")

        for drsobject in drs_objects:
            drsobject.errors.clear()

    return updated_drs_objects

filter_existing_files(drs_objects, destination_path, duplicate, verbose)

Remove any DRS objects from a given list if they are already exist in the destination directory.

Parameters:

Name Type Description Default
drs_objects List[DrsObject]

The DRS objects from the manifest (some may already be downloaded)

required
destination_path Path

Download destination that may contain partially downloaded files

required

Returns:

Type Description
List[DrsObject]

List[DrsObject]: The DRS objects that have yet to be downloaded

Source code in drs_downloader/manager.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def filter_existing_files(
    self, drs_objects: List[DrsObject], destination_path: Path, duplicate: bool, verbose: bool
) -> List[DrsObject]:
    """Remove any DRS objects from a given list if they are already exist in the destination directory.

    Args:
        drs_objects (List[DrsObject]): The DRS objects from the manifest (some may already be downloaded)
        destination_path (Path): Download destination that may contain partially downloaded files

    Returns:
        List[DrsObject]: The DRS objects that have yet to be downloaded
    """

    if verbose:
        logger.info(f"VALUE OF duplicate {duplicate}")
    if duplicate is True:
        return drs_objects

    # Testing File filtering by size also
    """
    for drs in drs_objects:
        logger.info(f"drs.size: {drs.size}  != os.path.getsize(drs.name) {os.path.getsize(drs.name)}")
        if(drs.size != os.path.getsize(drs.name)):
            logger.warning(f"{drs.name} is the wrong size,
            consider running this command again with the --duplicate flag
             so that your current file with the same name is
              not overwritten by this one that is on the path to downloading")
    """
    filtered_objects = [
        drs for drs in drs_objects if (drs.name not in os.listdir(destination_path))
        #  or drs.size != os.path.getsize(drs.name) <-- this is used for filtering out wrong sized stuff
    ]
    if verbose:
        logger.info(f"VALUE OF FILTERED OBJECTS {filtered_objects}")

    return filtered_objects

get_objects(object_ids, verbose)

Create tasks for all object_ids, run them in batches, get information about the object.

Parameters:

Name Type Description Default
object_ids List[str]

list of objects to fetch

required
Source code in drs_downloader/manager.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def get_objects(self, object_ids: List[str], verbose: bool) -> List[DrsObject]:
    """Create tasks for all object_ids, run them in batches, get information about the object.

    Args:
        object_ids: list of objects to fetch
    """

    drs_objects = []

    total_batches = math.ceil(
        len(object_ids) / self.max_simultaneous_object_retrievers
    )
    # rounding
    # this would imply that if batch count is 9.3, and you round down the last .3 is never
    # actually downloaded since there are only 9 batches. math.ciel would round up if there is a decimal at all

    current = 0

    for chunk_of_object_ids in DrsAsyncManager.chunker(
        object_ids, self.max_simultaneous_object_retrievers
    ):

        drs_objects.extend(
            asyncio.run(
                self._run_get_objects(
                    object_ids=chunk_of_object_ids, leave=(current == total_batches), verbose=verbose
                )
            )
        )
        current += 1

    return drs_objects

optimize_workload(verbose, drs_objects)

Optimize the workload, sort prioritize and set thread management parameters. Args: drs_objects:

Returns:

Type Description
List[DrsObject]

same list that was passed

Source code in drs_downloader/manager.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def optimize_workload(
    self, verbose, drs_objects: List[DrsObject]
) -> List[DrsObject]:
    """
    Optimize the workload, sort prioritize and set thread management parameters.
    Args:
        drs_objects:

    Returns:
        same list that was passed
    """
    # Now that we have the objects to download, we have an opportunity to shape the downloads
    # e.g. are the smallest files first?  tweak MAX_* to optimize per workload
    # TODO: If part sizes changed here, would this result in an error in test recovery?
    # Going to maek all part sizes 128MB to solve the problem above except for the small files because in that case
    # there is a pytest written for them that will fail otherwise

    if len(drs_objects) == 1:
        self.max_simultaneous_part_handlers = 50
        self.part_size = 64 * MB
        self.max_simultaneous_downloaders = 10
        if verbose:
            logger.info("part_size=%s", self.part_size)

    elif any(True for drs_object in drs_objects if (int(drs_object.size) > GB)):
        self.max_simultaneous_part_handlers = 3
        self.part_size = 128 * MB
        self.max_simultaneous_downloaders = 10
        if verbose:
            logger.info("part_size=%s", self.part_size)

    elif all((drs_object.size < (5 * MB)) for drs_object in drs_objects):
        self.part_size = 1 * MB
        self.max_simultaneous_part_handlers = 2
        self.max_simultaneous_downloaders = 10
        if verbose:
            logger.info("part_size=%s", self.part_size)
            logger.info("part_handlers=%s", self.max_simultaneous_part_handlers)

    else:
        self.part_size = 128 * MB
        self.max_simultaneous_part_handlers = 10
        self.max_simultaneous_downloaders = 10
        if verbose:
            logger.info("part_size=%s", self.part_size)
            logger.info("part_handlers=%s", self.max_simultaneous_part_handlers)

    return drs_objects

DrsManager

Bases: ABC

Manage DRSClient workload.

Source code in drs_downloader/manager.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class DrsManager(ABC):
    """Manage DRSClient workload."""

    @abstractmethod
    def __init__(self, drs_client: DrsClient):
        self._drs_client = drs_client

    @abstractmethod
    def get_objects(self, object_ids: List[str]) -> List[DrsObject]:
        """Fetch list of DRSObject from passed ids.

        Args:
            object_ids: list of objects to fetch
        """
        pass

    @abstractmethod
    def download(
        self, drs_objects: List[DrsObject], destination_path: Path
    ) -> List[DrsObject]:
        """Split the drs_objects into manageable parts, download the files.

        Args:
            drs_objects: objects to download
            destination_path: directory where to write files when complete

        Returns:
            list of updated DrsObjects
        """

    @abstractmethod
    async def optimize_workload(self, drs_objects: List[DrsObject]) -> List[DrsObject]:
        """
        Optimize the workload, sort prioritize and set thread management parameters.
        Args:
            drs_objects:

        Returns:

        """
        # TODO - now that we have the objects to download, we have an opportunity to shape the downloads
        # TODO - e.g. smallest files first?  tweak MAX_* to optimize per workload
        return drs_objects

download(drs_objects, destination_path) abstractmethod

Split the drs_objects into manageable parts, download the files.

Parameters:

Name Type Description Default
drs_objects List[DrsObject]

objects to download

required
destination_path Path

directory where to write files when complete

required

Returns:

Type Description
List[DrsObject]

list of updated DrsObjects

Source code in drs_downloader/manager.py
62
63
64
65
66
67
68
69
70
71
72
73
74
@abstractmethod
def download(
    self, drs_objects: List[DrsObject], destination_path: Path
) -> List[DrsObject]:
    """Split the drs_objects into manageable parts, download the files.

    Args:
        drs_objects: objects to download
        destination_path: directory where to write files when complete

    Returns:
        list of updated DrsObjects
    """

get_objects(object_ids) abstractmethod

Fetch list of DRSObject from passed ids.

Parameters:

Name Type Description Default
object_ids List[str]

list of objects to fetch

required
Source code in drs_downloader/manager.py
53
54
55
56
57
58
59
60
@abstractmethod
def get_objects(self, object_ids: List[str]) -> List[DrsObject]:
    """Fetch list of DRSObject from passed ids.

    Args:
        object_ids: list of objects to fetch
    """
    pass

optimize_workload(drs_objects) abstractmethod async

Optimize the workload, sort prioritize and set thread management parameters. Args: drs_objects:

Returns:

Source code in drs_downloader/manager.py
76
77
78
79
80
81
82
83
84
85
86
87
88
@abstractmethod
async def optimize_workload(self, drs_objects: List[DrsObject]) -> List[DrsObject]:
    """
    Optimize the workload, sort prioritize and set thread management parameters.
    Args:
        drs_objects:

    Returns:

    """
    # TODO - now that we have the objects to download, we have an opportunity to shape the downloads
    # TODO - e.g. smallest files first?  tweak MAX_* to optimize per workload
    return drs_objects

Wrapped

Bases: object

Source code in drs_downloader/manager.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class Wrapped(object):
    def __init__(self, file, hash_method):
        """
        Wrap the read() method and calculate hash
        Args:
            file: destination file
            hash_method: instantiated hash_method
        """
        self._file = file
        self._hash_method = hash_method

    def read(self, size):
        buffer = self._file.read(size)
        self._hash_method.update(buffer)
        return buffer

    def __getattr__(self, attr):
        return getattr(self._file, attr)

__init__(file, hash_method)

Wrap the read() method and calculate hash Args: file: destination file hash_method: instantiated hash_method

Source code in drs_downloader/manager.py
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(self, file, hash_method):
    """
    Wrap the read() method and calculate hash
    Args:
        file: destination file
        hash_method: instantiated hash_method
    """
    self._file = file
    self._hash_method = hash_method