Skip to content

gen3

Gen3DrsClient

Bases: DrsClient

Calls the Gen3 DRS server indexd

Source code in drs_downloader/clients/gen3.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class Gen3DrsClient(DrsClient):
    """
    Calls the Gen3 DRS server indexd
    """

    def __init__(
        self,
        api_key_path,
        endpoint,
        access_token_resource_path="/user/credentials/cdis/access_token",
        drs_api="/ga4gh/drs/v1/objects/",
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.authorized = None
        self.api_key = None
        self.endpoint = endpoint
        self.token = None
        self.access_token_resource_path = access_token_resource_path
        self.api_key_path = api_key_path
        self.drs_api = drs_api

    async def authorize(self):
        full_key_path = os.path.expanduser(self.api_key_path)
        try:
            with open(full_key_path) as f:
                self.api_key = json.load(f)
            code = await self.update_access_token()
            if code == 401:
                logger.error("Invalid access token in {}".format(full_key_path))
                self.api_key = None
            elif code != 200:
                logger.error(
                    "Error {} getting Access token for {}".format(code, self.endpoint)
                )
                logger.error("Using {}".format(full_key_path))
                self.api_key = None
        except Exception as e:
            self.api_key = None
            raise e

    # Obtain an access_token using the provided Fence API key.
    # The client object will retain the access key for subsequent calls
    async def update_access_token(self):
        headers = {"Content-Type": "application/json"}
        api_url = "{0}{1}".format(self.endpoint, self.access_token_resource_path)
        async with aiohttp.ClientSession(headers=headers) as session:
            response = await session.post(api_url, headers=headers, json=self.api_key)
            if response.status == 200:
                resp = await response.json()
                self.token = resp["access_token"]
                self.authorized = True
            else:
                self.authorized = False
        return response.status

    async def download_part(
        self, drs_object: DrsObject, start: int, size: int, destination_path: Path
    ) -> Optional[Path]:
        try:

            if not self.authorized:
                await self.authorize()

            headers = {"Range": f"bytes={start}-{size}"}

            file_name = destination_path / f"{drs_object.name}.{start}.{size}.part"
            Path(file_name).parent.mkdir(parents=True, exist_ok=True)

            async with aiohttp.ClientSession(headers=headers) as session:
                async with session.get(
                    drs_object.access_methods[0].access_url
                ) as request:
                    file = await aiofiles.open(file_name, "wb")
                    self.statistics.set_max_files_open()
                    async for data in request.content.iter_any():  # uses less memory
                        await file.write(data)
                    await file.close()
                    return Path(file_name)
        except Exception as e:
            logger.error(f"gen3.download_part {str(e)}")
            drs_object.errors.append(str(e))
            return None

    async def sign_url(self, drs_object: DrsObject) -> DrsObject:
        """Call fence's /user/data/download/ endpoint."""

        headers = {
            "authorization": "Bearer " + self.token,
            "content-type": "application/json",
        }
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(
                url=f"{self.endpoint}/user/data/download/{drs_object.id.split(':')[-1]}"
            ) as response:
                try:
                    self.statistics.set_max_files_open()
                    response.raise_for_status()
                    resp = await response.json(content_type=None)
                    assert "url" in resp, resp
                    url_ = resp["url"]
                    drs_object.access_methods = [
                        AccessMethod(access_url=url_, type="s3")
                    ]
                    return drs_object

                except ClientResponseError as e:
                    drs_object.errors.append(str(e))
                    return drs_object

    async def get_object(self, object_id: str) -> DrsObject:
        """Sends a POST request for the signed URL, hash, and file size of a given DRS object.

        Args:
            object_id (str): DRS URI

        Raises:
            Exception: The request was rejected by the server

        Returns:
            DownloadURL: The downloadable bundle ready for async download
        """
        if not self.authorized:
            await self.authorize()

        headers = {
            "authorization": "Bearer " + self.token,
            "content-type": "application/json",
        }

        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(
                url=f"{self.endpoint}{self.drs_api}/{object_id.split(':')[-1]}"
            ) as response:
                try:
                    self.statistics.set_max_files_open()
                    response.raise_for_status()
                    resp = await response.json(content_type=None)

                    assert resp["checksums"][0]["type"] == "md5", resp
                    md5_ = resp["checksums"][0]["checksum"]
                    size_ = resp["size"]
                    name_ = resp["name"]
                    return DrsObject(
                        self_uri=object_id,
                        size=size_,
                        checksums=[Checksum(checksum=md5_, type="md5")],
                        id=object_id,
                        name=name_,
                        access_methods=[AccessMethod(access_url="", type="gs")],
                    )
                except ClientResponseError as e:
                    return DrsObject(
                        self_uri=object_id,
                        id=object_id,
                        checksums=[],
                        size=0,
                        name=None,
                        errors=[str(e)],
                    )

get_object(object_id) async

Sends a POST request for the signed URL, hash, and file size of a given DRS object.

Parameters:

Name Type Description Default
object_id str

DRS URI

required

Raises:

Type Description
Exception

The request was rejected by the server

Returns:

Name Type Description
DownloadURL DrsObject

The downloadable bundle ready for async download

Source code in drs_downloader/clients/gen3.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def get_object(self, object_id: str) -> DrsObject:
    """Sends a POST request for the signed URL, hash, and file size of a given DRS object.

    Args:
        object_id (str): DRS URI

    Raises:
        Exception: The request was rejected by the server

    Returns:
        DownloadURL: The downloadable bundle ready for async download
    """
    if not self.authorized:
        await self.authorize()

    headers = {
        "authorization": "Bearer " + self.token,
        "content-type": "application/json",
    }

    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get(
            url=f"{self.endpoint}{self.drs_api}/{object_id.split(':')[-1]}"
        ) as response:
            try:
                self.statistics.set_max_files_open()
                response.raise_for_status()
                resp = await response.json(content_type=None)

                assert resp["checksums"][0]["type"] == "md5", resp
                md5_ = resp["checksums"][0]["checksum"]
                size_ = resp["size"]
                name_ = resp["name"]
                return DrsObject(
                    self_uri=object_id,
                    size=size_,
                    checksums=[Checksum(checksum=md5_, type="md5")],
                    id=object_id,
                    name=name_,
                    access_methods=[AccessMethod(access_url="", type="gs")],
                )
            except ClientResponseError as e:
                return DrsObject(
                    self_uri=object_id,
                    id=object_id,
                    checksums=[],
                    size=0,
                    name=None,
                    errors=[str(e)],
                )

sign_url(drs_object) async

Call fence's /user/data/download/ endpoint.

Source code in drs_downloader/clients/gen3.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def sign_url(self, drs_object: DrsObject) -> DrsObject:
    """Call fence's /user/data/download/ endpoint."""

    headers = {
        "authorization": "Bearer " + self.token,
        "content-type": "application/json",
    }
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get(
            url=f"{self.endpoint}/user/data/download/{drs_object.id.split(':')[-1]}"
        ) as response:
            try:
                self.statistics.set_max_files_open()
                response.raise_for_status()
                resp = await response.json(content_type=None)
                assert "url" in resp, resp
                url_ = resp["url"]
                drs_object.access_methods = [
                    AccessMethod(access_url=url_, type="s3")
                ]
                return drs_object

            except ClientResponseError as e:
                drs_object.errors.append(str(e))
                return drs_object