Shell (dagster-shell)

The Dagster shell library provides utilities and op factories for executing inline shell scripts or script files.

APIs

dagster_shell.create_shell_command_op(shell_command, name, description=None, required_resource_keys=None, tags=None)[source]

This function is a factory that constructs ops to execute a shell command.

Note that you can only use shell_command_op if you know the command you’d like to execute at pipeline construction time. If you’d like to construct shell commands dynamically during pipeline execution and pass them between ops, you should use shell_op instead.

Examples:

# pylint: disable=no-value-for-parameter
from dagster_shell import create_shell_command_op

from dagster import graph


@graph
def my_graph():
    a = create_shell_command_op('echo "hello, world!"', name="a")
    a()
Parameters:
  • shell_command (str) – The shell command that the constructed op will execute.

  • name (str) – The name of the constructed op.

  • description (Optional[str]) – Human-readable description of this op.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by this op. Setting this ensures that resource spin up for the required resources will occur before the shell command is executed.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to a op. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

Raises:

Failure – Raised when the shell command returns a non-zero exit code.

Returns:

Returns the constructed op definition.

Return type:

OpDefinition

dagster_shell.create_shell_script_op(shell_script_path, name='create_shell_script_op', ins=None, **kwargs)[source]

This function is a factory which constructs an op that will execute a shell command read from a script file.

Any kwargs passed to this function will be passed along to the underlying @op decorator. However, note that overriding config or output_defs is not supported.

You might consider using @graph to wrap this op in the cases where you’d like to configure the shell op with different config fields.

Examples:

# pylint: disable=no-value-for-parameter
from dagster_shell import create_shell_script_op

from dagster import file_relative_path, graph


@graph
def my_graph():
    a = create_shell_script_op(file_relative_path(__file__, "hello_world.sh"), name="a")
    a()
Parameters:
  • shell_script_path (str) – The script file to execute.

  • name (Optional[str]) – The name of this op. Defaults to “create_shell_script_op”.

  • ins (Optional[Mapping[str, In]]) – Ins for the op. Defaults to a single Nothing input.

Raises:

Failure – Raised when the shell command returns a non-zero exit code.

Returns:

Returns the constructed op definition.

Return type:

OpDefinition

dagster_shell.shell_op(context, shell_command)[source]
dagster_shell.execute_shell_command(shell_command, output_logging, log, cwd=None, env=None)

This function is a utility for executing shell commands from within a Dagster op (or from Python in general). It can be used to execute shell commands on either op input data, or any data generated within a generic python op.

Internally, it executes a shell script specified by the argument shell_command. The script will be written to a temporary file first and invoked via subprocess.Popen(['bash', shell_script_path], ...).

In the Popen invocation, stdout=PIPE, stderr=STDOUT is used, and the combined stdout/stderr output is retrieved.

Examples

# pylint: disable=no-value-for-parameter
from dagster_shell import execute_shell_command

from dagster import OpExecutionContext, op


@op
def my_shell_op(context: OpExecutionContext, data: str):
    temp_file = "/tmp/data.txt"
    with open(temp_file, "w", encoding="utf-8") as temp_file_writer:
        temp_file_writer.write(data)
        execute_shell_command(f"cat {temp_file}", output_logging="STREAM", log=context.log)
Parameters:
  • shell_command (str) – The shell command to execute

  • output_logging (str) – The logging mode to use. Supports STREAM, BUFFER, and NONE.

  • log (Union[logging.Logger, DagsterLogManager]) – Any logger which responds to .info()

  • cwd (str, optional) – Working directory for the shell command to use. Defaults to the temporary path where we store the shell command in a script file.

  • env (Dict[str, str], optional) – Environment dictionary to pass to subprocess.Popen. Unused by default.

Returns:

A tuple where the first element is the combined stdout/stderr output of running the shell command and the second element is the return code.

Return type:

Tuple[str, int]

dagster_shell.execute_shell_script(shell_script_path, output_logging, log, cwd=None, env=None)

Execute a shell script file specified by the argument shell_script_path. The script will be invoked via subprocess.Popen(['bash', shell_script_path], ...).

In the Popen invocation, stdout=PIPE, stderr=STDOUT is used, and the combined stdout/stderr output is retrieved.

Examples

# pylint: disable=no-value-for-parameter
from dagster_shell import execute_shell_script

from dagster import OpExecutionContext, op


@op
def my_shell_op(context: OpExecutionContext, data: str):
    temp_file = "/tmp/echo_data.sh"
    with open(temp_file, "w", encoding="utf-8") as temp_file_writer:
        temp_file_writer.write(f"echo {data}")
        execute_shell_script(temp_file, output_logging="STREAM", log=context.log)
Parameters:
  • shell_script_path (str) – The shell script to execute.

  • output_logging (str) – The logging mode to use. Supports STREAM, BUFFER, and NONE.

  • log (Union[logging.Logger, DagsterLogManager]) – Any logger which responds to .info()

  • cwd (str, optional) – Working directory for the shell command to use. Defaults to the temporary path where we store the shell command in a script file.

  • env (Dict[str, str], optional) – Environment dictionary to pass to subprocess.Popen. Unused by default.

Raises:

Exception – When an invalid output_logging is selected. Unreachable from op-based invocation since the config system will check output_logging against the config enum.

Returns:

A tuple where the first element is the combined stdout/stderr output of running the shell command and the second element is the return code.

Return type:

Tuple[str, int]