Introduction
If you have ever written concurrent programs in Java, you know how challenging
it can be to manage multiple threads, handle exceptions, and ensure that all
resources are properly cleaned up. You may have used various tools and
techniques such as Thread, Runnable, Callable,
Future, ExecutorService, CompletableFuture,
synchronized, volatile, Atomic, Lock,
Condition, and so on. But even with these tools, writing concurrent
code can still be complex, error-prone, and hard to debug.
That’s where structured concurrency comes in. Structured concurrency is a
concept that improves the implementation, readability, and maintainability of
code for dividing a task into subtasks and processing them in parallel. It
treats multiple tasks running in different threads (forked from the same
parent thread) as a single unit of work. It ensures that all child threads are
joined to the parent thread before the parent thread exits, and that any
exception or cancellation in any child thread is propagated to the parent
thread. It also simplifies the management of thread pools and resources.
Structured concurrency is not a new idea. It has been implemented in various
languages and libraries such as Kotlin, C#, Go, Python, and Swift. But it is a
new feature in Java 19, which provides structured concurrency capabilities
through the StructuredTaskScope API. This API allows you to create
and use StructuredTaskScope objects to manage concurrent tasks,
handle exceptions and cancellation, and integrate with other concurrency APIs.
It also works well with Project Loom’s virtual threads, which are lightweight
threads that can scale to millions without consuming much memory or CPU
resources.
In this blog post, we will guide you through the usage of the
StructuredTaskScope API for structured concurrency in Java 19. We
will show you how to create and use StructuredTaskScope objects, how
to handle exceptions and cancellation, how to use
StructuredTaskScope with other concurrency APIs, and how to use
structured concurrency with Project Loom’s virtual threads. We will also
provide some examples and code snippets to illustrate the concepts and best
practices. By the end of this post, you will have a better understanding of
structured concurrency in Java 19 and how to use it to write simpler, safer,
and more efficient concurrent programs.
Body
Creating and Using StructuredTaskScope Objects
A StructuredTaskScope object represents a scope for concurrent tasks.
It allows you to fork new tasks from the current thread and join them to the
current thread before the scope exits. A task can be any Runnable or Callable
object that can be executed by a thread. A StructuredTaskScope object
can be created using one of the following methods:
-
StructuredTaskScope.current(): Returns the current scope of the
calling thread, or creates a new one if none exists.
-
StructuredTaskScope.open(): Creates a new scope that is a child
of the current scope of the calling thread.
-
StructuredTaskScope.open(Executor executor): Creates a new scope
that is a child of the current scope of the calling thread and uses the
given executor to execute tasks.
-
StructuredTaskScope.open(StructuredTaskScope parent): Creates a
new scope that is a child of the given parent scope.
- StructuredTaskScope.open(StructuredTaskScope parent, Executor executor): Creates a new scope that is a child of the given parent scope and uses the given executor to execute tasks.
Once you have a StructuredTaskScope object, you can use it to fork
new tasks using one of the following methods:
-
void fork(Runnable task): Forks a new task from the current
thread and executes it asynchronously in the scope.
-
<V> Future<V> fork(Callable<V> task): Forks a
new task from the current thread and executes it asynchronously in the
scope, returning a Future object that can be used to get the
result or cancel the task.
-
<V> Future<V> fork(Runnable task, V result): Forks a
new task from the current thread and executes it asynchronously in the
scope, returning a Future object that can be used to get the
result or cancel the task. The result is the given value or
null if the task is a Runnable.
- <V> Future<V> fork(Runnable task, Supplier<V> resultSupplier): Forks a new task from the current thread and executes it asynchronously in the scope, returning a Future object that can be used to get the result or cancel the task. The result is obtained by calling the given supplier or null if the task is a Runnable.
Here is a simple example of creating and using a
StructuredTaskScope object to fork and join two tasks:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Fork two tasks
scope.fork(() -> System.out.println("Hello from task 1"));
scope.fork(() -> System.out.println("Hello from task 2"));
// Join the tasks
scope.join();
The join() method blocks the current thread until all the tasks in
the scope are completed, or until an exception or cancellation occurs. It also
closes the scope and releases any resources associated with it. You can also
use the try-with-resources statement to automatically close the
scope:
// Create a new scope and close it automatically
try (StructuredTaskScope scope = StructuredTaskScope.open()) {
// Fork two tasks
scope.fork(() -> System.out.println("Hello from task 1"));
scope.fork(() -> System.out.println("Hello from task 2"));
}
Note that you don’t need to call join() explicitly in this case, as
it is implied by the end of the try block.
Handling Exceptions and Cancellation in Structured Concurrency
One of the benefits of structured concurrency is that it simplifies the
handling of exceptions and cancellation in concurrent tasks. If any task in
a scope throws an exception, the exception is propagated to the parent
thread and the scope is cancelled. Similarly, if the scope is cancelled by
the parent thread or by calling cancel() on the scope object, all
the tasks in the scope are cancelled and the scope is closed. In either
case, the parent thread can catch the exception or cancellation using a
try-catch block or a CompletableFuture object.
Here is an example of handling an exception in a structured concurrency
scope:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Fork a task that throws an exception
scope.fork(() -> {
System.out.println("Hello from task 1");
throw new RuntimeException("Oops!");
});
// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));
// Try to join the tasks
try {
scope.join();
} catch (Exception e){
// Handle the exception
System.out.println("Exception: " + e.getMessage());
}
The output of this code is:
Hello from task 1 Hello from task 2 Exception: Oops!
Note that both tasks are executed, but the exception from task 1 is propagated
to the parent thread and the scope is cancelled. You can also use the
cancel()
method to cancel the scope explicitly:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Fork two tasks
scope.fork(() -> System.out.println("Hello from task 1"));
scope.fork(() -> System.out.println("Hello from task 2"));
// Cancel the scope
scope.cancel();
// Try to join the tasks
try {
scope.join();
} catch (CancellationException e) {
// Handle the cancellation
System.out.println("Cancelled");
}
The output of this code is:
Hello from task 1
Hello from task 2
Cancelled
Note that both tasks are executed, but the scope is cancelled by the parent
thread and the scope is closed. You can also use the
isCancelled() method to check if the scope is cancelled.
Using StructuredTaskScope with Other Concurrency APIs
Another benefit of structured concurrency is that it can be used with other
concurrency APIs in Java, such as CompletableFuture and
ExecutorService. You can use the StructuredTaskScope object
as an argument to these APIs to execute tasks in the scope. For example, you
can use the CompletableFuture.supplyAsync() method to create a
CompletableFuture object that executes a task in the scope and
returns a result:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a CompletableFuture that executes a task in the scope and returns a result
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello from task 1");
return "Result from task 1";
}, scope);
// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));
// Join the tasks
scope.join();
// Get the result from the CompletableFuture
System.out.println("Result: " + future.get());
The output of this code is:
Hello from task 1
Hello from task 2
Result: Result from task 1
Note that the CompletableFuture object is executed in the scope and
is joined to the parent thread before the scope exits. You can also use the
ExecutorService interface to create a thread pool that executes tasks
in the scope. For example, you can use the
Executors.newFixedThreadPool() method to create a fixed-size thread
pool that executes tasks in the scope:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a fixed-size thread pool that executes tasks in the scope
ExecutorService executor = Executors.newFixedThreadPool(2, scope);
// Submit two tasks to the thread pool
executor.submit(() -> System.out.println("Hello from task 1"));
executor.submit(() -> System.out.println("Hello from task 2"));
// Shut down the thread pool
executor.shutdown();
// Join the tasks
scope.join();
The output of this code is:
Hello from task 1
Hello from task 2
Note that the thread pool is created with the scope as an argument and is shut
down before the scope exits. You can also use the
StructuredTaskScope object as a ThreadFactory to create new
threads that execute tasks in the scope. For example, you can use the
newThread() method to create a new thread that executes a task in the
scope:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a new thread that executes a task in the scope
Thread thread = scope.newThread(() -> System.out.println("Hello from task 1"));
// Start the thread
thread.start();
// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));
// Join the tasks
scope.join();
The output of this code is:
Hello from task 1
Hello from task 2
Note that the thread is created with the scope as a ThreadFactory and is
joined to the parent thread before the scope exits.
Using Structured Concurrency with Project Loom’s Virtual Threads
Project Loom is an experimental feature in Java that introduces virtual
threads, also known as fibers, as a lightweight alternative to native threads.
Virtual threads can scale to millions without consuming much memory or CPU
resources. They are managed by the Java Virtual Machine (JVM) and can be
blocked and resumed without blocking the underlying native threads. They also
support structured concurrency natively, as they are automatically scoped to
the parent thread and joined before the parent thread exits.
To use virtual threads with structured concurrency, you can use the
StructuredTaskScope object as a ThreadFactory to create new
virtual threads that execute tasks in the scope. For example, you can use the
newVirtualThread() method to create a new virtual thread that
executes a task in the scope:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a new virtual thread that executes a task in the scope
Thread thread = scope.newVirtualThread(() -> System.out.println("Hello from task 1"));
// Start the thread
thread.start();
// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));
// Join the tasks
scope.join();
The output of this code is:
Hello from task 1
Hello from task 2
Note that the virtual thread is created with the scope as a
ThreadFactory and is joined to the parent thread before the scope
exits. You can also use the StructuredTaskScope object as an argument
to the ExecutorService interface to create a thread pool that
executes tasks in the scope using virtual threads. For example, you can use
the Executors.newVirtualThreadExecutor() method to create a virtual
thread pool that executes tasks in the scope:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a virtual thread pool that executes tasks in the scope
ExecutorService executor = Executors.newVirtualThreadExecutor(scope);
// Submit two tasks to the thread pool
executor.submit(() -> System.out.println("Hello from task 1"));
executor.submit(() -> System.out.println("Hello from task 2"));
// Shut down the thread pool
executor.shutdown();
// Join the tasks
scope.join();
The output of this code is:
Hello from task 1
Hello from task 2
Note that the virtual thread pool is created with the scope as an argument and
is shut down before the scope exits. You can also use the
StructuredTaskScope object as an argument to the
CompletableFuture API to create a CompletableFuture object
that executes a task in the scope using a virtual thread. For example, you can
use the CompletableFuture.supplyAsync() method to create a
CompletableFuture object that executes a task in the scope using a virtual
thread and returns a result:
// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();
// Create a CompletableFuture that executes a task in the scope using a virtual thread and returns a result
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello from task 1");
return "Result from task 1";
}, scope);
// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));
// Join the tasks
scope.join();
// Get the result from the CompletableFuture
System.out.println("Result: " + future.get());
The output of this code is:
Hello from task 1
Hello from task 2
Result: Result from task 1
Note that the CompletableFuture object is executed in the scope using
a virtual thread and is joined to the parent thread before the scope exits.
Conclusion
Structured concurrency is a powerful concept that simplifies concurrent
programming by enforcing a clear and consistent structure for concurrent
tasks. It ensures that tasks are scoped to their parent threads, joined before
the parent threads exit, and cancelled or propagated when exceptions occur. It
also enables the use of other concurrency APIs in Java, such as
CompletableFuture and ExecutorService, with structured concurrency.
Moreover, it supports the use of virtual threads, which are a lightweight and
scalable alternative to native threads, with structured concurrency. By using
structured concurrency, you can write concurrent code that is easier to read,
write, debug, and maintain.
I hope you enjoyed this blog post and learned something new about structured
concurrency in Java. If you have any questions or feedback, please feel free
to leave a comment below. Thank you for reading!
Which of the following statements is true about structured concurrency in Java?
A) It requires the use of virtual threads
B) It allows tasks to outlive their parent threads
C) It handles exceptions by cancelling the scope
D) It creates tasks without any scope or structure
The correct answer is C) It handles exceptions by cancelling the scope. Structured concurrency in Java ensures that tasks are scoped to their parent threads, joined before the parent threads exit, and cancelled or propagated when exceptions occur.
It does not require the use of virtual threads, although it supports them. It does not allow tasks to outlive their parent threads, as that would create orphaned tasks. It does not create tasks without any scope or structure, as that would create unstructured concurrency.
Related Article: