Struct tokio::sync::Semaphore

source ·
pub struct Semaphore { /* private fields */ }
Available on crate feature sync only.
Expand description

Counting semaphore performing asynchronous permit acquisition.

A semaphore maintains a set of permits. Permits are used to synchronize access to a shared resource. A semaphore differs from a mutex in that it can allow more than one concurrent caller to access the shared resource at a time.

When acquire is called and the semaphore has remaining permits, the function immediately returns a permit. However, if no remaining permits are available, acquire (asynchronously) waits until an outstanding permit is dropped. At this point, the freed permit is assigned to the caller.

This Semaphore is fair, which means that permits are given out in the order they were requested. This fairness is also applied when acquire_many gets involved, so if a call to acquire_many at the front of the queue requests more permits than currently available, this can prevent a call to acquire from completing, even if the semaphore has enough permits complete the call to acquire.

To use the Semaphore in a poll function, you can use the PollSemaphore utility.

§Examples

Basic usage:

use tokio::sync::{Semaphore, TryAcquireError};

#[tokio::main]
async fn main() {
    let semaphore = Semaphore::new(3);

    let a_permit = semaphore.acquire().await.unwrap();
    let two_permits = semaphore.acquire_many(2).await.unwrap();

    assert_eq!(semaphore.available_permits(), 0);

    let permit_attempt = semaphore.try_acquire();
    assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
}

§Limit the number of simultaneously opened files in your program

Most operating systems have limits on the number of open file handles. Even in systems without explicit limits, resource constraints implicitly set an upper bound on the number of open files. If your program attempts to open a large number of files and exceeds this limit, it will result in an error.

This example uses a Semaphore with 100 permits. By acquiring a permit from the Semaphore before accessing a file, you ensure that your program opens no more than 100 files at a time. When trying to open the 101st file, the program will wait until a permit becomes available before proceeding to open another file.

use std::io::Result;
use tokio::fs::File;
use tokio::sync::Semaphore;
use tokio::io::AsyncWriteExt;

static PERMITS: Semaphore = Semaphore::const_new(100);

async fn write_to_file(message: &[u8]) -> Result<()> {
    let _permit = PERMITS.acquire().await.unwrap();
    let mut buffer = File::create("example.txt").await?;
    buffer.write_all(message).await?;
    Ok(()) // Permit goes out of scope here, and is available again for acquisition
}

§Limit the number of outgoing requests being sent at the same time

In some scenarios, it might be required to limit the number of outgoing requests being sent in parallel. This could be due to limits of a consumed API or the network resources of the system the application is running on.

This example uses an Arc<Semaphore> with 10 permits. Each task spawned is given a reference to the semaphore by cloning the Arc<Semaphore>. Before a task sends a request, it must acquire a permit from the semaphore by calling Semaphore::acquire. This ensures that at most 10 requests are sent in parallel at any given time. After a task has sent a request, it drops the permit to allow other tasks to send requests.

use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    // Define maximum number of parallel requests.
    let semaphore = Arc::new(Semaphore::new(10));
    // Spawn many tasks that will send requests.
    let mut jhs = Vec::new();
    for task_id in 0..100 {
        let semaphore = semaphore.clone();
        let jh = tokio::spawn(async move {
            // Acquire permit before sending request.
            let _permit = semaphore.acquire().await.unwrap();
            // Send the request.
            let response = send_request(task_id).await;
            // Drop the permit after the request has been sent.
            drop(_permit);
            // Handle response.
            // ...

            response
        });
        jhs.push(jh);
    }
    // Collect responses from tasks.
    let mut responses = Vec::new();
    for jh in jhs {
        let response = jh.await.unwrap();
        responses.push(response);
    }
    // Process responses.
    // ...
}

§Limit the number of incoming requests being handled at the same time

Similar to limiting the number of simultaneously opened files, network handles are a limited resource. Allowing an unbounded amount of requests to be processed could result in a denial-of-service, among many other issues.

This example uses an Arc<Semaphore> instead of a global variable. To limit the number of requests that can be processed at the time, we acquire a permit for each task before spawning it. Once acquired, a new task is spawned; and once finished, the permit is dropped inside of the task to allow others to spawn. Permits must be acquired via Semaphore::acquire_owned to be movable across the task boundary. (Since our semaphore is not a global variable — if it was, then acquire would be enough.)

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let semaphore = Arc::new(Semaphore::new(3));
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        // Acquire permit before accepting the next socket.
        //
        // We use `acquire_owned` so that we can move `permit` into
        // other tasks.
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Do work using the socket.
            handle_connection(&mut socket).await;
            // Drop socket while the permit is still live.
            drop(socket);
            // Drop the permit, so more tasks can be created.
            drop(permit);
        });
    }
}

§Prevent tests from running in parallel

By default, Rust runs tests in the same file in parallel. However, in some cases, running two tests in parallel may lead to problems. For example, this can happen when tests use the same database.

Consider the following scenario:

  1. test_insert: Inserts a key-value pair into the database, then retrieves the value using the same key to verify the insertion.
  2. test_update: Inserts a key, then updates the key to a new value and verifies that the value has been accurately updated.
  3. test_others: A third test that doesn’t modify the database state. It can run in parallel with the other tests.

In this example, test_insert and test_update need to run in sequence to work, but it doesn’t matter which test runs first. We can leverage a semaphore with a single permit to address this challenge.

use tokio::sync::Semaphore;

// Initialize a static semaphore with only one permit, which is used to
// prevent test_insert and test_update from running in parallel.
static PERMIT: Semaphore = Semaphore::const_new(1);

// Initialize the database that will be used by the subsequent tests.
static DB: Database = Database::setup();

#[tokio::test]
async fn test_insert() {
    // Acquire permit before proceeding. Since the semaphore has only one permit,
    // the test will wait if the permit is already acquired by other tests.
    let permit = PERMIT.acquire().await.unwrap();

    // Do the actual test stuff with database

    // Insert a key-value pair to database
    let (key, value) = ("name", 0);
    DB.insert(key, value).await;

    // Verify that the value has been inserted correctly.
    assert_eq!(DB.get(key).await, value);

    // Undo the insertion, so the database is empty at the end of the test.
    DB.delete(key).await;

    // Drop permit. This allows the other test to start running.
    drop(permit);
}

#[tokio::test]
async fn test_update() {
    // Acquire permit before proceeding. Since the semaphore has only one permit,
    // the test will wait if the permit is already acquired by other tests.
    let permit = PERMIT.acquire().await.unwrap();

    // Do the same insert.
    let (key, value) = ("name", 0);
    DB.insert(key, value).await;

    // Update the existing value with a new one.
    let new_value = 1;
    DB.update(key, new_value).await;

    // Verify that the value has been updated correctly.
    assert_eq!(DB.get(key).await, new_value);

    // Undo any modificattion.
    DB.delete(key).await;

    // Drop permit. This allows the other test to start running.
    drop(permit);
}

#[tokio::test]
async fn test_others() {
    // This test can run in parallel with test_insert and test_update,
    // so it does not use PERMIT.
}

§Rate limiting using a token bucket

This example showcases the add_permits and SemaphorePermit::forget methods.

Many applications and systems have constraints on the rate at which certain operations should occur. Exceeding this rate can result in suboptimal performance or even errors.

This example implements rate limiting using a token bucket. A token bucket is a form of rate limiting that doesn’t kick in immediately, to allow for short bursts of incoming requests that arrive at the same time.

With a token bucket, each incoming request consumes a token, and the tokens are refilled at a certain rate that defines the rate limit. When a burst of requests arrives, tokens are immediately given out until the bucket is empty. Once the bucket is empty, requests will have to wait for new tokens to be added.

Unlike the example that limits how many requests can be handled at the same time, we do not add tokens back when we finish handling a request. Instead, tokens are added only by a timer task.

Note that this implementation is suboptimal when the duration is small, because it consumes a lot of cpu constantly looping and sleeping.

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{interval, Duration};

struct TokenBucket {
    sem: Arc<Semaphore>,
    jh: tokio::task::JoinHandle<()>,
}

impl TokenBucket {
    fn new(duration: Duration, capacity: usize) -> Self {
        let sem = Arc::new(Semaphore::new(capacity));

        // refills the tokens at the end of each interval
        let jh = tokio::spawn({
            let sem = sem.clone();
            let mut interval = interval(duration);
            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            async move {
                loop {
                    interval.tick().await;

                    if sem.available_permits() < capacity {
                        sem.add_permits(1);
                    }
                }
            }
        });

        Self { jh, sem }
    }

    async fn acquire(&self) {
        // This can return an error if the semaphore is closed, but we
        // never close it, so this error can never happen.
        let permit = self.sem.acquire().await.unwrap();
        // To avoid releasing the permit back to the semaphore, we use
        // the `SemaphorePermit::forget` method.
        permit.forget();
    }
}

impl Drop for TokenBucket {
    fn drop(&mut self) {
        // Kill the background task so it stops taking up resources when we
        // don't need it anymore.
        self.jh.abort();
    }
}

#[tokio::main]
async fn main() {
    let capacity = 5;
    let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
    let bucket = TokenBucket::new(update_interval, capacity);

    for _ in 0..5 {
        bucket.acquire().await;

        // do the operation
    }
}

Implementations§

source§

impl Semaphore

source

pub const MAX_PERMITS: usize = 2_305_843_009_213_693_951usize

The maximum number of permits which a semaphore can hold. It is usize::MAX >> 3.

Exceeding this limit typically results in a panic.

source

pub fn new(permits: usize) -> Self

Creates a new semaphore with the initial number of permits.

Panics if permits exceeds Semaphore::MAX_PERMITS.

source

pub const fn const_new(permits: usize) -> Self

Creates a new semaphore with the initial number of permits.

When using the tracing unstable feature, a Semaphore created with const_new will not be instrumented. As such, it will not be visible in tokio-console. Instead, Semaphore::new should be used to create an instrumented object if that is needed.

§Examples
use tokio::sync::Semaphore;

static SEM: Semaphore = Semaphore::const_new(10);
source

pub fn available_permits(&self) -> usize

Returns the current number of available permits.

source

pub fn add_permits(&self, n: usize)

Adds n new permits to the semaphore.

The maximum number of permits is Semaphore::MAX_PERMITS, and this function will panic if the limit is exceeded.

source

pub fn forget_permits(&self, n: usize) -> usize

Decrease a semaphore’s permits by a maximum of n.

If there are insufficient permits and it’s not possible to reduce by n, return the number of permits that were actually reduced.

source

pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>

Acquires a permit from the semaphore.

If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a SemaphorePermit representing the acquired permit.

§Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire makes you lose your place in the queue.

§Examples
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Semaphore::new(2);

    let permit_1 = semaphore.acquire().await.unwrap();
    assert_eq!(semaphore.available_permits(), 1);

    let permit_2 = semaphore.acquire().await.unwrap();
    assert_eq!(semaphore.available_permits(), 0);

    drop(permit_1);
    assert_eq!(semaphore.available_permits(), 1);
}
source

pub async fn acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, AcquireError>

Acquires n permits from the semaphore.

If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a SemaphorePermit representing the acquired permits.

§Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_many makes you lose your place in the queue.

§Examples
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Semaphore::new(5);

    let permit = semaphore.acquire_many(3).await.unwrap();
    assert_eq!(semaphore.available_permits(), 2);
}
source

pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>

Tries to acquire a permit from the semaphore.

If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a SemaphorePermit representing the acquired permits.

§Examples
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(2);

let permit_1 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
source

pub fn try_acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, TryAcquireError>

Tries to acquire n permits from the semaphore.

If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are not enough permits left. Otherwise, this returns a SemaphorePermit representing the acquired permits.

§Examples
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(4);

let permit_1 = semaphore.try_acquire_many(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
source

pub async fn acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, AcquireError>

Acquires a permit from the semaphore.

The semaphore must be wrapped in an Arc to call this method. If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

§Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_owned makes you lose your place in the queue.

§Examples
use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(tokio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await.unwrap();
    }
}
source

pub async fn acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>

Acquires n permits from the semaphore.

The semaphore must be wrapped in an Arc to call this method. If the semaphore has been closed, this returns an AcquireError. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

§Cancel safety

This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to acquire_many_owned makes you lose your place in the queue.

§Examples
use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(10));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
        join_handles.push(tokio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await.unwrap();
    }
}
source

pub fn try_acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, TryAcquireError>

Tries to acquire a permit from the semaphore.

The semaphore must be wrapped in an Arc to call this method. If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

§Examples
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Arc::new(Semaphore::new(2));

let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire_owned();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
source

pub fn try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>

Tries to acquire n permits from the semaphore.

The semaphore must be wrapped in an Arc to call this method. If the semaphore has been closed, this returns a TryAcquireError::Closed and a TryAcquireError::NoPermits if there are no permits left. Otherwise, this returns a OwnedSemaphorePermit representing the acquired permit.

§Examples
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Arc::new(Semaphore::new(4));

let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many_owned(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
source

pub fn close(&self)

Closes the semaphore.

This prevents the semaphore from issuing new permits and notifies all pending waiters.

§Examples
use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::sync::TryAcquireError;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(1));
    let semaphore2 = semaphore.clone();

    tokio::spawn(async move {
        let permit = semaphore.acquire_many(2).await;
        assert!(permit.is_err());
        println!("waiter received error");
    });

    println!("closing semaphore");
    semaphore2.close();

    // Cannot obtain more permits
    assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
}
source

pub fn is_closed(&self) -> bool

Returns true if the semaphore is closed

Trait Implementations§

source§

impl Debug for Semaphore

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more