Skip to content

models

AccessMethod dataclass

Bases: object

See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_accessmethod

Source code in drs_downloader/models.py
10
11
12
13
14
15
16
17
@dataclass
class AccessMethod(object):
    """See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_accessmethod"""

    access_url: str
    """An AccessURL that can be used to fetch the actual object bytes."""
    type: str
    """Type of the access method. enum (s3, gs, ftp, gsiftp, globus, htsget, https, file)"""

access_url: str instance-attribute

An AccessURL that can be used to fetch the actual object bytes.

type: str instance-attribute

Type of the access method. enum (s3, gs, ftp, gsiftp, globus, htsget, https, file)

AccessURL dataclass

Bases: object

See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_accessurl

Source code in drs_downloader/models.py
20
21
22
23
24
25
26
27
@dataclass
class AccessURL(object):
    """See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_accessurl"""

    headers: Dict[str, str]
    """An optional list of headers to include in the HTTP request to url."""
    url: str
    """A fully resolvable URL that can be used to fetch the actual object bytes."""

headers: Dict[str, str] instance-attribute

An optional list of headers to include in the HTTP request to url.

url: str instance-attribute

A fully resolvable URL that can be used to fetch the actual object bytes.

Checksum dataclass

Bases: object

See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_checksum

Source code in drs_downloader/models.py
30
31
32
33
34
35
36
37
@dataclass
class Checksum(object):
    """See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_checksum"""

    checksum: str
    """The hex-string encoded checksum for the data."""
    type: str
    """The digest method used to create the checksum."""

checksum: str instance-attribute

The hex-string encoded checksum for the data.

type: str instance-attribute

The digest method used to create the checksum.

DrsClient

Bases: ABC

Interact with DRS service.

Source code in drs_downloader/models.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class DrsClient(ABC):
    """Interact with DRS service."""

    def __init__(self, statistics: Statistics = Statistics()):
        self.statistics = statistics

    @abstractmethod
    async def download_part(
        self, drs_object: DrsObject, start: int, size: int, destination_path: Path, verbose: bool = False
    ) -> Optional[Path]:

        """Download and save part of a file to disk; on error, update drs_object.errors return None

        Args:
            destination_path: where to save the part
            drs_object: state of download
            start: segment start
            size: segment end
        """
        pass

    @abstractmethod
    async def sign_url(self, drs_object: DrsObject) -> DrsObject:
        """Retrieve signed url from service return populated DrsObject AccessMethod

        Args:
            drs_object:

        Returns:
            populated DrsObject
        """
        pass

    @abstractmethod
    async def get_object(self, object_id: str) -> DrsObject:
        """Retrieve size, checksums, etc. populate DrsObject."""
        pass

download_part(drs_object, start, size, destination_path, verbose=False) abstractmethod async

Download and save part of a file to disk; on error, update drs_object.errors return None

Parameters:

Name Type Description Default
destination_path Path

where to save the part

required
drs_object DrsObject

state of download

required
start int

segment start

required
size int

segment end

required
Source code in drs_downloader/models.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@abstractmethod
async def download_part(
    self, drs_object: DrsObject, start: int, size: int, destination_path: Path, verbose: bool = False
) -> Optional[Path]:

    """Download and save part of a file to disk; on error, update drs_object.errors return None

    Args:
        destination_path: where to save the part
        drs_object: state of download
        start: segment start
        size: segment end
    """
    pass

get_object(object_id) abstractmethod async

Retrieve size, checksums, etc. populate DrsObject.

Source code in drs_downloader/models.py
122
123
124
125
@abstractmethod
async def get_object(self, object_id: str) -> DrsObject:
    """Retrieve size, checksums, etc. populate DrsObject."""
    pass

sign_url(drs_object) abstractmethod async

Retrieve signed url from service return populated DrsObject AccessMethod

Parameters:

Name Type Description Default
drs_object DrsObject
required

Returns:

Type Description
DrsObject

populated DrsObject

Source code in drs_downloader/models.py
110
111
112
113
114
115
116
117
118
119
120
@abstractmethod
async def sign_url(self, drs_object: DrsObject) -> DrsObject:
    """Retrieve signed url from service return populated DrsObject AccessMethod

    Args:
        drs_object:

    Returns:
        populated DrsObject
    """
    pass

DrsObject dataclass

Bases: object

See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_drsobject

Source code in drs_downloader/models.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@dataclass
class DrsObject(object):
    """See https://ga4gh.github.io/data-repository-service-schemas/preview/release/drs-1.0.0/docs/#_drsobject"""

    id: str
    """An identifier unique to this DrsObject."""
    self_uri: str
    """A drs:// URI, as defined in the DRS documentation, that tells clients how to access this object."""
    checksums: List[Checksum]
    """Needed for integrity check."""
    size: int
    """Needed for multi part download."""
    name: str
    """A string that can be used to name a DrsObject."""
    file_parts: List[Path] = field(default_factory=list)
    """List of file parts in order of assembly."""
    errors: List[str] = field(default_factory=list)
    """List of errors."""
    access_methods: List[AccessMethod] = field(default_factory=list)
    """Signed url."""

access_methods: List[AccessMethod] = field(default_factory=list) class-attribute instance-attribute

Signed url.

checksums: List[Checksum] instance-attribute

Needed for integrity check.

errors: List[str] = field(default_factory=list) class-attribute instance-attribute

List of errors.

file_parts: List[Path] = field(default_factory=list) class-attribute instance-attribute

List of file parts in order of assembly.

id: str instance-attribute

An identifier unique to this DrsObject.

name: str instance-attribute

A string that can be used to name a DrsObject.

self_uri: str instance-attribute

A drs:// URI, as defined in the DRS documentation, that tells clients how to access this object.

size: int instance-attribute

Needed for multi part download.

Statistics dataclass

This is where we can share data between threads

Source code in drs_downloader/models.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass
class Statistics:
    """This is where we can share data between threads"""

    lock: threading.Lock = threading.Lock()
    max_files_open: int = 0
    pid: object = os.getpid()

    def set_max_files_open(self):
        """Threadsafe way to determine current number of open files.

        Note: only tested on Mac
        """
        self.lock.acquire()
        system = platform.system()
        if system == "Darwin":
            open_fd = len(set(os.listdir("/dev/fd/")))
        elif system == "Windows":
            # TODO install psutils - len(Process.open_files())
            open_fd = 0
        else:
            open_fd = len(set(os.listdir(f"/proc/{self.pid}/fd/")))
        if open_fd > self.max_files_open:
            self.max_files_open = open_fd
        self.lock.release()

set_max_files_open()

Threadsafe way to determine current number of open files.

Note: only tested on Mac

Source code in drs_downloader/models.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def set_max_files_open(self):
    """Threadsafe way to determine current number of open files.

    Note: only tested on Mac
    """
    self.lock.acquire()
    system = platform.system()
    if system == "Darwin":
        open_fd = len(set(os.listdir("/dev/fd/")))
    elif system == "Windows":
        # TODO install psutils - len(Process.open_files())
        open_fd = 0
    else:
        open_fd = len(set(os.listdir(f"/proc/{self.pid}/fd/")))
    if open_fd > self.max_files_open:
        self.max_files_open = open_fd
    self.lock.release()