Tasks
A task defines a unit of work:
- metadata
- input files to download
- a sequence of Docker containers + commands to run,
- output files to upload
- state
- logs
The example task below downloads a file named hello.txt
from S3 and calls cat hello.txt
using the alpine container. This task also writes the executor’s stdout to a file, and uploads the stdout to s3.
{
"name": "Hello world",
"inputs": [{
# URL to download file from.
"url": "s3://funnel-bucket/hello.txt",
# Path to download file to.
"path": "/inputs/hello.txt"
}],
"outputs": [{
# URL to upload file to.
"url": "s3://funnel-bucket/output.txt",
# Local path to upload file from.
"path": "/outputs/stdout"
}],
"executors": [{
# Container image name.
"image": "alpine",
# Command to run (argv).
"command": ["cat", "/inputs/hello.txt"],
# Capture the stdout of the command to /outputs/stdout
"stdout": "/outputs/stdout"
}]
}
Tasks have multiple “executors”; containers and commands run in a sequence. Funnel runs executors via Docker.
Tasks also have state and logs:
{
"id": "b85khc2rl6qkqbhg8vig",
"state": "COMPLETE",
"name": "Hello world",
"inputs": [
{
"url": "s3://funnel-bucket/hello.txt",
"path": "/inputs/hello.txt"
}
],
"outputs": [
{
"url": "s3://funnel-bucket/output.txt",
"path": "/outputs/stdout"
}
],
"executors": [
{
"image": "alpine",
"command": [
"cat",
"/inputs/hello.txt"
],
"stdout": "/outputs/stdout"
}
],
"logs": [
{
"logs": [
{
"startTime": "2017-11-14T11:49:05.127885125-08:00",
"endTime": "2017-11-14T11:49:08.484461502-08:00",
"stdout": "Hello, Funnel!\n"
}
],
"startTime": "2017-11-14T11:49:04.433593468-08:00",
"endTime": "2017-11-14T11:49:08.487707039-08:00"
}
],
"creationTime": "2017-11-14T11:49:04.427163701-08:00"
}
There are logs for each task attempt and each executor. Notice that the stdout is
conveniently captured by logs[0].logs[0].stdout
.
Task API
The API lets you create, get, list, and cancel tasks.
Create
POST /v1/tasks
{
"name": "Hello world",
"inputs": [{
"url": "s3://funnel-bucket/hello.txt",
"path": "/inputs/hello.txt"
}],
"outputs": [{
"url": "s3://funnel-bucket/output.txt",
"path": "/outputs/stdout"
}],
"executors": [{
"image": "alpine",
"command": ["cat", "/inputs/hello.txt"],
"stdout": "/outputs/stdout"
}]
}
# The response is a task ID:
b85khc2rl6qkqbhg8vig
Get
GET /v1/tasks/b85khc2rl6qkqbhg8vig
{"id": "b85khc2rl6qkqbhg8vig", "state": "COMPLETE"}
By default, the minimal task view is returned which describes only the ID and state. In order to get the original task with some basic logs, use the “BASIC” task view:
GET /v1/tasks/b85khc2rl6qkqbhg8vig?view=BASIC
{
"id": "b85khc2rl6qkqbhg8vig",
"state": "COMPLETE",
"name": "Hello world",
"inputs": [
{
"url": "gs://funnel-bucket/hello.txt",
"path": "/inputs/hello.txt"
}
],
"outputs": [
{
"url": "s3://funnel-bucket/output.txt",
"path": "/outputs/stdout"
}
],
"executors": [
{
"image": "alpine",
"command": [
"cat",
"/inputs/hello.txt"
],
"stdout": "/outputs/stdout",
}
],
"logs": [
{
"logs": [
{
"startTime": "2017-11-14T11:49:05.127885125-08:00",
"endTime": "2017-11-14T11:49:08.484461502-08:00",
}
],
"startTime": "2017-11-14T11:49:04.433593468-08:00",
"endTime": "2017-11-14T11:49:08.487707039-08:00"
}
],
"creationTime": "2017-11-14T11:49:04.427163701-08:00"
}
The “BASIC” doesn’t include some fields such as stdout/err logs, because these fields may be potentially large. In order to get everything, use the “FULL” view:
GET /v1/tasks/b85khc2rl6qkqbhg8vig?view=FULL
{
"id": "b85khc2rl6qkqbhg8vig",
"state": "COMPLETE",
"name": "Hello world",
"inputs": [
{
"url": "gs://funnel-bucket/hello.txt",
"path": "/inputs/hello.txt"
}
],
"executors": [
{
"image": "alpine",
"command": [
"cat",
"/inputs/hello.txt"
],
"stdout": "/outputs/stdout",
}
],
"logs": [
{
"logs": [
{
"startTime": "2017-11-14T11:49:05.127885125-08:00",
"endTime": "2017-11-14T11:49:08.484461502-08:00",
"stdout": "Hello, Funnel!\n"
}
],
"startTime": "2017-11-14T11:49:04.433593468-08:00",
"endTime": "2017-11-14T11:49:08.487707039-08:00"
}
],
"creationTime": "2017-11-14T11:49:04.427163701-08:00"
}
List
GET /v1/tasks
{
"tasks": [
{
"id": "b85l8tirl6qkqbhg8vj0",
"state": "COMPLETE"
},
{
"id": "b85khc2rl6qkqbhg8vig",
"state": "COMPLETE"
},
{
"id": "b85kgt2rl6qkpuptua70",
"state": "SYSTEM_ERROR"
},
{
"id": "b857gnirl6qjfou61fh0",
"state": "SYSTEM_ERROR"
}
]
}
List has the same task views as Get: MINIMAL, BASIC, and FULL.
The task list is paginated:
GET /v1/tasks?page_token=1h123h12j2h3k
{
"next_page_token": "1n3n1j23k12n3k123",
"tasks": [
{
"id": "b85l8tirl6qkqbhg8vj0",
"state": "COMPLETE"
},
# ... more tasks here ...
]
}
Cancel
Tasks cannot be modified by the user after creation, with one exception – they can be canceled.
POST /v1/tasks/b85l8tirl6qkqbhg8vj0:cancel
Full task spec
Here’s a more detailed description of a task.
For a full, in-depth spec, read the TES standard’s task_execution.proto.
{
# The task's ID. Set by the server.
# Output only.
"id": "1234567",
# The task's state. Possible states:
# QUEUED
# INITILIZING
# RUNNING
# PAUSED
# COMPLETE
# EXECUTOR_ERROR
# SYSTEM_ERROR
# CANCELED
#
# Output only.
"state": "QUEUED",
# Metadata
"name": "Task name.",
"description": "Task description.",
"tags": {
"custom-tag-1": "tag-value-1",
"custom-tag-2": "tag-value-2",
},
# Resource requests
"resources": {
# Number of CPU cores requested.
"cpuCores": 1,
# RAM request, in gigabytes.
"ramGb": 1.0,
# Disk space request, in gigabytes.
"diskGb": 100.0,
# Request preemptible machines,
# e.g. preemptible VM in Google Cloud, an instance from the AWS Spot Market, etc.
"preemptible": false,
# Request that the task run in these compute zones.
"zones": ["zone1", "zone2"],
},
# Input files will be downloaded by the worker.
# This example uses s3, but Funnel supports multiple filesystems.
"inputs": [
{
"name": "Input file.",
"description": "Input file description.",
# URL to download file from.
"url": "s3://my-bucket/object/path/file.txt",
# Path to download file to.
"path": "/container/input.txt"
},
{
"name": "Input directory.",
"description": "Directories are also supported.",
"url": "s3://my-bucket/my-data/",
"path": "/inputs/my-data/",
"type": "DIRECTORY"
},
# A task may include the file content directly in the task message.
# This is sometimes useful for small files such as scripts,
# which you want to include without talking directly to the filesystem.
{
"path": "/inputs/script.py",
"content": "import socket; print socket.gethostname()"
}
],
# Output files will be uploaded to storage by the worker.
"outputs": [
{
"name": "Output file.",
"description": "Output file description.",
"url": "s3://my-bucket/output-data/results.txt",
"path": "/outputs/results.txt"
},
{
"name": "Output directory.",
"description": "Directories are also supported.",
"url": "s3://my-bucket/output-data/output-dir/",
"path": "/outputs/data-dir/",
"type": "DIRECTORY"
}
],
# Executors define a sequence of containers + commands to run.
# Execution stop on the first non-zero exit code.
"executors": [
{
# Container image name.
# Funnel supports running executor containers via Docker.
"image": "ubuntu",
# Command arguments (argv).
# The first item is the executable to run.
"command": ["my-tool-1", "/container/input"],
# Local file path to read stdin from.
"stdin": "/inputs/stdin.txt",
# Local file path to write stdout to.
"stdout": "/container/output",
# Local file path to write stderr to.
"stderr": "/container/stderr",
# Set the working directory before executing the command.
"workdir": "/data/workdir",
# Environment variables
"env": {
"ENV1": "value1",
"ENV2": "value2",
}
},
# Second executor runs after the first completes, on the same machine.
{
"image": "ubuntu",
"command": ["cat", "/container/input"],
"stdout": "/container/output",
"stderr": "/container/stderr",
"workdir": "/tmp"
}
]
# Date/time the task was created.
# Set the the server.
# Output only.
"creationTime": "2017-11-14T11:49:04.427163701-08:00"
# Task logs.
# Output only.
#
# If there's a system error, the task may be attempted multiple times,
# so this field is a list of attempts. In most cases, there will be only
# one or zero entries here.
"logs": [
# Attempt start/end times, in RFC3339 format.
"startTime": "2017-11-14T11:49:04.433593468-08:00",
"endTime": "2017-11-14T11:49:08.487707039-08:00"
# Arbitrary metadata set by Funnel.
"metadata": {
"hostname": "worker-1",
},
# Arbitrary system logs which Funnel thinks are useful to the user.
"systemLogs": [
"task was assigned to worker 1",
"docker command: docker run -v /vol:/data alpine cmd arg1 arg2",
],
# Log of files uploaded to storage by the worker,
# including all files in directories, with file sizes.
"outputs": [
{
"url": "s3://my-bucket/output-data/results.txt",
"path": "/outputs/results.txt",
"sizeBytes": 123
},
{
"url": "s3://my-bucket/output-data/output-dir/file1.txt",
"path": "/outputs/data-dir/file1.txt",
"sizeBytes": 123
},
{
"url": "s3://my-bucket/output-data/output-dir/file2.txt",
"path": "/outputs/data-dir/file2.txt",
"sizeBytes": 123
}
{
"url": "s3://my-bucket/output-data/output-dir/subdir/file3.txt",
"path": "/outputs/data-dir/subdir/file3.txt",
"sizeBytes": 123
}
],
# Executor logs. One entry per executor.
"logs": [
{
# Executor start/end time, in RFC3339 format.
"startTime": "2017-11-14T11:49:05.127885125-08:00",
"endTime": "2017-11-14T11:49:08.484461502-08:00",
# Executor stdout/err. Only available in the FULL task view.
#
# There is a size limit for these fields, which is configurable
# and defaults to 10KB. If more than 10KB is generated, only the
# tail will be logged. If the full output is needed, the task
# may use Executor.stdout and an output to upload the full content
# to storage.
"stdout": "Hello, Funnel!",
"stderr": "",
# Exit code
"exit_code": 0,
},
{
"startTime": "2017-11-14T11:49:05.127885125-08:00",
"endTime": "2017-11-14T11:49:08.484461502-08:00",
"stdout": "Hello, Funnel!\n"
}
],
}
],
}