How to Use Structured Concurrency in Java 19: A Practical Guide

How to Use Structured Concurrency in Java 19: A Practical Guide


Introduction

If you have ever written concurrent programs in Java, you know how challenging it can be to manage multiple threads, handle exceptions, and ensure that all resources are properly cleaned up. You may have used various tools and techniques such as Thread, Runnable, Callable, Future, ExecutorService, CompletableFuture, synchronized, volatile, Atomic, Lock, Condition, and so on. But even with these tools, writing concurrent code can still be complex, error-prone, and hard to debug.

That’s where structured concurrency comes in. Structured concurrency is a concept that improves the implementation, readability, and maintainability of code for dividing a task into subtasks and processing them in parallel. It treats multiple tasks running in different threads (forked from the same parent thread) as a single unit of work. It ensures that all child threads are joined to the parent thread before the parent thread exits, and that any exception or cancellation in any child thread is propagated to the parent thread. It also simplifies the management of thread pools and resources.

Structured concurrency is not a new idea. It has been implemented in various languages and libraries such as Kotlin, C#, Go, Python, and Swift. But it is a new feature in Java 19, which provides structured concurrency capabilities through the StructuredTaskScope API. This API allows you to create and use StructuredTaskScope objects to manage concurrent tasks, handle exceptions and cancellation, and integrate with other concurrency APIs. It also works well with Project Loom’s virtual threads, which are lightweight threads that can scale to millions without consuming much memory or CPU resources.

In this blog post, we will guide you through the usage of the StructuredTaskScope API for structured concurrency in Java 19. We will show you how to create and use StructuredTaskScope objects, how to handle exceptions and cancellation, how to use StructuredTaskScope with other concurrency APIs, and how to use structured concurrency with Project Loom’s virtual threads. We will also provide some examples and code snippets to illustrate the concepts and best practices. By the end of this post, you will have a better understanding of structured concurrency in Java 19 and how to use it to write simpler, safer, and more efficient concurrent programs.



Body


Creating and Using StructuredTaskScope Objects

A StructuredTaskScope object represents a scope for concurrent tasks. It allows you to fork new tasks from the current thread and join them to the current thread before the scope exits. A task can be any Runnable or Callable object that can be executed by a thread. A StructuredTaskScope object can be created using one of the following methods:

  • StructuredTaskScope.current(): Returns the current scope of the calling thread, or creates a new one if none exists.

  • StructuredTaskScope.open(): Creates a new scope that is a child of the current scope of the calling thread.

  • StructuredTaskScope.open(Executor executor): Creates a new scope that is a child of the current scope of the calling thread and uses the given executor to execute tasks.

  • StructuredTaskScope.open(StructuredTaskScope parent): Creates a new scope that is a child of the given parent scope.

  • StructuredTaskScope.open(StructuredTaskScope parent, Executor executor): Creates a new scope that is a child of the given parent scope and uses the given executor to execute tasks.

Once you have a StructuredTaskScope object, you can use it to fork new tasks using one of the following methods:

  • void fork(Runnable task): Forks a new task from the current thread and executes it asynchronously in the scope.

  • <V> Future<V> fork(Callable<V> task): Forks a new task from the current thread and executes it asynchronously in the scope, returning a Future object that can be used to get the result or cancel the task.

  • <V> Future<V> fork(Runnable task, V result): Forks a new task from the current thread and executes it asynchronously in the scope, returning a Future object that can be used to get the result or cancel the task. The result is the given value or null if the task is a Runnable.

  • <V> Future<V> fork(Runnable task, Supplier<V> resultSupplier): Forks a new task from the current thread and executes it asynchronously in the scope, returning a Future object that can be used to get the result or cancel the task. The result is obtained by calling the given supplier or null if the task is a Runnable.

Here is a simple example of creating and using a StructuredTaskScope object to fork and join two tasks:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Fork two tasks
scope.fork(() -> System.out.println("Hello from task 1"));
scope.fork(() -> System.out.println("Hello from task 2"));

// Join the tasks
scope.join();

The join() method blocks the current thread until all the tasks in the scope are completed, or until an exception or cancellation occurs. It also closes the scope and releases any resources associated with it. You can also use the try-with-resources statement to automatically close the scope:

// Create a new scope and close it automatically
try (StructuredTaskScope scope = StructuredTaskScope.open()) {
  // Fork two tasks
  scope.fork(() -> System.out.println("Hello from task 1"));
  scope.fork(() -> System.out.println("Hello from task 2"));
}

Note that you don’t need to call join() explicitly in this case, as it is implied by the end of the try block.



Handling Exceptions and Cancellation in Structured Concurrency

One of the benefits of structured concurrency is that it simplifies the handling of exceptions and cancellation in concurrent tasks. If any task in a scope throws an exception, the exception is propagated to the parent thread and the scope is cancelled. Similarly, if the scope is cancelled by the parent thread or by calling cancel() on the scope object, all the tasks in the scope are cancelled and the scope is closed. In either case, the parent thread can catch the exception or cancellation using a try-catch block or a CompletableFuture object.

Here is an example of handling an exception in a structured concurrency scope:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Fork a task that throws an exception
scope.fork(() -> {
  System.out.println("Hello from task 1");
  throw new RuntimeException("Oops!");
});

// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));

// Try to join the tasks
try {
  scope.join();
} catch (Exception e){ 
// Handle the exception
System.out.println("Exception: " + e.getMessage());
}

The output of this code is:

Hello from task 1 Hello from task 2 Exception: Oops!

Note that both tasks are executed, but the exception from task 1 is propagated to the parent thread and the scope is cancelled. You can also use the cancel() method to cancel the scope explicitly:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Fork two tasks
scope.fork(() -> System.out.println("Hello from task 1"));
scope.fork(() -> System.out.println("Hello from task 2"));

// Cancel the scope
scope.cancel();

// Try to join the tasks
try {
  scope.join();
} catch (CancellationException e) {
  // Handle the cancellation
  System.out.println("Cancelled");
}

The output of this code is:

Hello from task 1
Hello from task 2
Cancelled

Note that both tasks are executed, but the scope is cancelled by the parent thread and the scope is closed. You can also use the isCancelled() method to check if the scope is cancelled.



Using StructuredTaskScope with Other Concurrency APIs


Another benefit of structured concurrency is that it can be used with other concurrency APIs in Java, such as CompletableFuture and ExecutorService. You can use the StructuredTaskScope object as an argument to these APIs to execute tasks in the scope. For example, you can use the CompletableFuture.supplyAsync() method to create a CompletableFuture object that executes a task in the scope and returns a result:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a CompletableFuture that executes a task in the scope and returns a result
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  System.out.println("Hello from task 1");
  return "Result from task 1";
}, scope);

// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));

// Join the tasks
scope.join();

// Get the result from the CompletableFuture
System.out.println("Result: " + future.get());


The output of this code is:

Hello from task 1
Hello from task 2
Result: Result from task 1

Note that the CompletableFuture object is executed in the scope and is joined to the parent thread before the scope exits. You can also use the ExecutorService interface to create a thread pool that executes tasks in the scope. For example, you can use the Executors.newFixedThreadPool() method to create a fixed-size thread pool that executes tasks in the scope:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a fixed-size thread pool that executes tasks in the scope
ExecutorService executor = Executors.newFixedThreadPool(2, scope);

// Submit two tasks to the thread pool
executor.submit(() -> System.out.println("Hello from task 1"));
executor.submit(() -> System.out.println("Hello from task 2"));

// Shut down the thread pool
executor.shutdown();

// Join the tasks
scope.join();

The output of this code is:

Hello from task 1
Hello from task 2

Note that the thread pool is created with the scope as an argument and is shut down before the scope exits. You can also use the StructuredTaskScope object as a ThreadFactory to create new threads that execute tasks in the scope. For example, you can use the newThread() method to create a new thread that executes a task in the scope:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a new thread that executes a task in the scope
Thread thread = scope.newThread(() -> System.out.println("Hello from task 1"));

// Start the thread
thread.start();

// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));

// Join the tasks
scope.join();

The output of this code is:

Hello from task 1
Hello from task 2

Note that the thread is created with the scope as a ThreadFactory and is joined to the parent thread before the scope exits.



Using Structured Concurrency with Project Loom’s Virtual Threads


Project Loom is an experimental feature in Java that introduces virtual threads, also known as fibers, as a lightweight alternative to native threads. Virtual threads can scale to millions without consuming much memory or CPU resources. They are managed by the Java Virtual Machine (JVM) and can be blocked and resumed without blocking the underlying native threads. They also support structured concurrency natively, as they are automatically scoped to the parent thread and joined before the parent thread exits.

To use virtual threads with structured concurrency, you can use the StructuredTaskScope object as a ThreadFactory to create new virtual threads that execute tasks in the scope. For example, you can use the newVirtualThread() method to create a new virtual thread that executes a task in the scope:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a new virtual thread that executes a task in the scope
Thread thread = scope.newVirtualThread(() -> System.out.println("Hello from task 1"));

// Start the thread
thread.start();

// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));

// Join the tasks
scope.join();

The output of this code is:

Hello from task 1
Hello from task 2

Note that the virtual thread is created with the scope as a ThreadFactory and is joined to the parent thread before the scope exits. You can also use the StructuredTaskScope object as an argument to the ExecutorService interface to create a thread pool that executes tasks in the scope using virtual threads. For example, you can use the Executors.newVirtualThreadExecutor() method to create a virtual thread pool that executes tasks in the scope:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a virtual thread pool that executes tasks in the scope
ExecutorService executor = Executors.newVirtualThreadExecutor(scope);

// Submit two tasks to the thread pool
executor.submit(() -> System.out.println("Hello from task 1"));
executor.submit(() -> System.out.println("Hello from task 2"));

// Shut down the thread pool
executor.shutdown();

// Join the tasks
scope.join();

The output of this code is:

Hello from task 1
Hello from task 2

Note that the virtual thread pool is created with the scope as an argument and is shut down before the scope exits. You can also use the StructuredTaskScope object as an argument to the CompletableFuture API to create a CompletableFuture object that executes a task in the scope using a virtual thread. For example, you can use the CompletableFuture.supplyAsync() method to create a CompletableFuture object that executes a task in the scope using a virtual thread and returns a result:

// Create a new scope
StructuredTaskScope scope = StructuredTaskScope.open();

// Create a CompletableFuture that executes a task in the scope using a virtual thread and returns a result
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  System.out.println("Hello from task 1");
  return "Result from task 1";
}, scope);

// Fork another task
scope.fork(() -> System.out.println("Hello from task 2"));

// Join the tasks
scope.join();

// Get the result from the CompletableFuture
System.out.println("Result: " + future.get());

The output of this code is:

Hello from task 1
Hello from task 2
Result: Result from task 1

Note that the CompletableFuture object is executed in the scope using a virtual thread and is joined to the parent thread before the scope exits.



Conclusion


Structured concurrency is a powerful concept that simplifies concurrent programming by enforcing a clear and consistent structure for concurrent tasks. It ensures that tasks are scoped to their parent threads, joined before the parent threads exit, and cancelled or propagated when exceptions occur. It also enables the use of other concurrency APIs in Java, such as CompletableFuture and ExecutorService, with structured concurrency. 

Moreover, it supports the use of virtual threads, which are a lightweight and scalable alternative to native threads, with structured concurrency. By using structured concurrency, you can write concurrent code that is easier to read, write, debug, and maintain.

I hope you enjoyed this blog post and learned something new about structured concurrency in Java. If you have any questions or feedback, please feel free to leave a comment below. Thank you for reading!

Which of the following statements is true about structured concurrency in Java?

A) It requires the use of virtual threads 
B) It allows tasks to outlive their parent threads 
C) It handles exceptions by cancelling the scope 
D) It creates tasks without any scope or structure


The correct answer is C) It handles exceptions by cancelling the scope. Structured concurrency in Java ensures that tasks are scoped to their parent threads, joined before the parent threads exit, and cancelled or propagated when exceptions occur. 

It does not require the use of virtual threads, although it supports them. It does not allow tasks to outlive their parent threads, as that would create orphaned tasks. It does not create tasks without any scope or structure, as that would create unstructured concurrency.


Related Article: 

Previous Post Next Post