12/08/2011

12-08-11 - Some Semaphores

In case you don't agree with Boost that Semaphore is too "error prone" , or if you don't agree with C++0x that semaphore is unnecessary because it can be implemented from condition_var (do I need to point out why that is ridiculous reasoning for a library writer?) - here are some semaphores for you.

I've posted a fastsemaphore before, but here's a more complete version that can wrap a base semaphore.


template< typename t_base_sem >
class fastsemaphore_t
{
private:
    t_base_sem m_base_sem;
    atomic<int> m_count;

public:
    fastsemaphore_t(int count = 0)
    :   m_count(count)
    {
        RL_ASSERT(count > -1);
    }

    ~fastsemaphore_t()
    {
    }

    void post()
    {
        if (m_count($).fetch_add(1,mo_acq_rel) < 0)
        {
            m_base_sem.post();
        }
    }

    void post(int count)
    {
        int prev = m_count($).fetch_add(count,mo_acq_rel);
        if ( prev < 0)
        {
            int num_waiters = -prev;
            int num_to_wake = MIN(num_waiters,count);
            // use N-wake if available in base sem :
            // m_base_sem.post(num_to_wake);
            for(int i=0;i<num_to_wake;i++)
            {
                m_base_sem.post();
            }
        }
    }
    
    bool try_wait()
    {
        // see if we can dec count before preparing the wait
        int c = m_count($).load(mo_acquire);
        while ( c > 0 )
        {
            if ( m_count($).compare_exchange_weak(c,c-1,mo_acq_rel) )
                return true;
            // c was reloaded
            // backoff here optional
        }
        return false;
    }
        
    void wait_no_spin()
    {
        if (m_count($).fetch_add(-1,mo_acq_rel) < 1)
        {
            m_base_sem.wait();
        }
    }
    
    void wait()
    {
        int spin_count = 1; // ! set this for your system
        while(spin_count--)
        {
            if ( try_wait() ) 
                return;
        }
        
        wait_no_spin();
    }
    
    
    int debug_get_count() { return m_count($).load(); }
};

when m_count is negative it's the number of waiters (plus or minus people who are about to wait, or about to be woken).

Personally I think the base semaphore that fastsem wraps should just be your OS semaphore and don't worry about it. It only gets invoked for thread wake/sleep so who cares.

But you can easily make Semaphore from CondVar and then put fastsemaphore on top of that. (note the semaphore from condvar wake N is not awesome because CV typically doesn't provide wake N, only wake 1 or wake all).

Wrapping fastsem around NT's Keyed Events is particularly trivial because of the semantics of the Keyed Event Release. NtReleaseKeyedEvent waits for someone to wake if there is noone. I've noted in the past that Win32 event is a lot like a semaphore with a max count of 1 ; a problem with building a Semaphore from normal Event would be that you Set it when it's already Set, you effectively run into the max count and lose your Set, but this is impossible with KeyedEvent. With KeyedEvent you get exactly one wake from Wait for each Release.

So, if we wrap up keyed_event for convenience :


struct keyed_event
{
    HANDLE  m_keyedEvent;

    enum { WAITKEY_SHIFT = 1 };

    keyed_event()
    {
        NtCreateKeyedEvent(&m_keyedEvent,EVENT_ALL_ACCESS,NULL,0);
    }
    ~keyed_event()
    {
        CloseHandle(m_keyedEvent);
    }

    void wait(intptr_t key)
    {
        RL_ASSERT( (key&1) == 0 );
        NtWaitForKeyedEvent(m_keyedEvent,(PVOID)(key),FALSE,NULL);
    }

    void post(intptr_t key)
    {
        RL_ASSERT( (key&1) == 0 );
        NtReleaseKeyedEvent(m_keyedEvent,(PVOID)(key),FALSE,NULL);
    }
};

Then the base sem from KE is trivial :


struct base_semaphore_from_keyed_event
{
    keyed_event ke;

    base_semaphore_from_keyed_event() { }
    ~base_semaphore_from_keyed_event() { }
    
    void post() { ke.release(this); }   
    void wait() { ke.wait(this); }
};

(note this is a silly way to use KE just for testing purposes; in practice it would be shared, not one per sem - that's sort of the whole point of KE).

(note that you don't ever use this base_sem directly, you use it with a fastsemaphore wrapper).

I also revisited the semaphore_from_waitset that I talked about a few posts ago. The best I can come up with is something like this :


class semaphore_from_waitset
{
    waitset_simple m_waitset;
    std::atomic<int> m_count;

public:
    semaphore_from_waitset(int count = 0)
    :   m_count(count), m_waitset()
    {
        RL_ASSERT(count >= 0);
    }

    ~semaphore_from_waitset()
    {
    }

public:
    void post()
    {
        m_count($).fetch_add(1,mo_acq_rel);
        m_waitset.notify_one();
    }

    bool try_wait()
    {
        // see if we can dec count before preparing the wait
        int c = m_count($).load(mo_acquire);
        while ( c > 0 )
        {
            if ( m_count($).compare_exchange_weak(c,c-1,mo_acq_rel) )
                return true;
            // c was reloaded
        }
        return false;
    }

    void wait(wait_thread_context * cntx)
    {
        for(;;)
        {
            // could spin a few times on this :
            if ( try_wait() )
                return;
    
            // no count available, get ready to wait
            waiter w(cntx);
            m_waitset.prepare_wait(&w);
            
            // double check :
            if ( try_wait() )
            {
                // (*1)
                m_waitset.retire_wait(&w);
                // pass on the notify :
                int signalled = w.flag($).load(mo_acquire);
                if ( signalled )
                    m_waitset.notify_one();
                return;
            }
            
            w.wait();
            m_waitset.retire_wait(&w);
            // loop and try again
        }
    }
    
    void wait()
    {
        wait_thread_context cntx;
        wait(&cntx);
    }
};

The funny bit is at (*1). Recall before we talked about a race that can happen if two threads post and two other threads pop. If one of the poppers gets through to *1 , it dec'ed the sem but is still in the waitset, one pusher might then signal this thread, which is a wasted signal, and the other waiter will not get a signal, and you have a "deadlock" (not a true deadlock, but an unexpected permanent sleep, which I will henceforth call a deadlock).

You can fix that by detecting if you recieved a signal while you were in the waitset. That's what's done here now. While it is not completely ideal from a performance perspective, it's a rare race case, and even when it happens the penalty is small. I still don't recommend using semaphore_from_waitset unless you have a comprehensive waitset-based system.

(note that in practice you would never make a wait_thread_context on the stack as in the example code ; if you have a waitset-based system it would be in the TLS)

Another note :

I have mentioned before the idea of "direct handoff" semaphores. That is, making it such that thread wakeup implies you get to dec count. For example "base_semaphore_from_keyed_event" above is a direct-handoff semaphore. This is as opposed to "optimistic" semaphores, in which the wakeup just means "you *might* get to dec count" and then you have to try_wait again when you wake up.

Direct handoff is neat because it gaurantees a minimum number of thread wakeups - you never wake up a thread which then fails to dec count. But they are in fact not awesome. The problem is that you essentially have some of your semaphore count tied up in limbo while the thread wakeup is happening (which is not a trivial amount of time).

The scenario is like this :


1. thread 1 does a sem.wait

2. thread 2 does a sem.post 
  the sem is "direct handoff" the count is given to thread 1
  thread 1 starts to wake up

3. thread 3 (or thread 2) now decides it can do some consuming
  and tries a sem.wait
  there is no sem count so it goes to sleep

4. thread 1 wakes up and processes its received count

You have actually increased latency to process the message posted by the sem, by the amount of time between steps 3 and 4.

Basically by not pre-deciding who will get the sem count, you leave the opportunity for someone else to get it sooner, and sooner is better.

Finally let's have a gander at the Linux sem : sem_post and sem_wait

If we strip away some of the gunk, it's just :


sem_post()
{

    atomic_add( & sem->value , 1);

    atomic_full_barrier (); // (*1)

    int w = sem->nwaiters; // (*2)

    if ( w > 0 )
    {
        futex_wake( & sem->value, 1 );  // wake 1
    }

}

sem_wait()
{
    if ( try_wait() ) return;

    atomic_add( & sem->waiters , 1);

    for(;;)
    {
        if ( try_wait() ) break;

        futex_wait( & sem->value, 0 ); // wait if sem value == 0
    }

    atomic_add( & sem->waiters , -1);
}

Some quick notes : I believe the barrier at (*1) is unnecessary ; they should be doing an acq_rel inc on sem->value instead. However, as noted in the previous post about "producer-consumer" failures, if your producer is not strongly synchronized it's possible that this barrier helps hide/prevent bugs. Also at (*2) in the code they load nwaiters with plain C which is very sloppy; you should always load lock-free shared variables with an explicit load() call that specifies memory ordering. I believe the ordering constraint there is the load of nwaiters needs to stay after the store to value; the easiest way is to make the inc on value be an RMW acq_rel.

The similarity with waitset should be obvious, but I'll make it super-clear :


sem_post()
{

    atomic_add( & sem->value , 1);
    atomic_full_barrier ();

    // waitset.notify_one :
    {
        int w = sem->nwaiters;
        if ( w > 0 )
        {
            futex_wake( & sem->value, 1 );  // wake 1
        }
    }
}

sem_wait()
{
    if ( try_wait() ) return;

    // waitset.prepare_wait :
    atomic_add( & sem->waiters , 1);

    for(;;)
    {
        // standard double-check :
        if ( try_wait() ) break;

        // waitset.wait()
        // (*3)
        futex_wait( & sem->value, 0 ); // wait if sem value == 0
    }

    // waitset.retire_wait :
    atomic_add( & sem->waiters , -1);
}

It's exactly the same, but with one key difference at *3 - the wait does not happen if count is not zero, which means we can not receive the wait wakeup from futex_wake if we don't need it. This removes the need for the re-pass that we had in the waitset semaphore.

This futex semaphore is fine, but you could reduce the number of atomic ops by storing count & waiters in one word.

6 comments:

Drawknob said...
This comment has been removed by the author.
Drawknob said...

I assume for a fastsemaphore_t::timed_wait() one would best simply pass a timeout value to m_base_sem.wait() and not bother to be checking for a timeout before that within the short spin loop or within try_wait() in order to avoid a performance penalty, given that even a VDSO call to clock_gettime() in Linux, albeit not having the expense of a full kernel call, is not cheap (on Windows, I'm guessing timeGetTime() is a kernel call).

cbloom said...

Sure. The only disadvantage of just passing the timeout on to the base semaphore is that you don't count the time spent in the spins. But as long as the spin count is low that time is negligible.

The only time I ever used timed waits is for debugging/robustness. Instead of an infinite wait I usually use a timed wait for 100 millis or so and print an error message if the time is hit.

FYI there are of course much cheaper ways to get the time without kernel calls.

Unknown said...

Hi, Charles. I just ran across this post after spending today putting together a fastsemaphore with timed_wait(). I haven't found an implementation of timed_wait() that avoids breaking the invariant "m_count = -waiters", and this causes some subtle complications.

Consider this timed_wait() implementation; the problem is at (*1):

....bool timed_wait(int timeout) {
........if (m_count($).fetch_add(-1) < 1) {
............bool success = m_base_sem.timed_wait(timeout);
............// (*1)
............if (success) { return true; }
............m_count($).fetch_add(1);
............return false;
........}
....}

And this chain of events:

....t0 calls wait(10)
......m_count is decremented to -1
....t0 times out, progresses to (*1)
....t1 calls post()
......m_count is incremented to 0
......m_base_sem gets posted
....t1 calls try_wait()
......m_count is 0; try_wait returns false

At the end of this sequence, t1 is justified in thinking that a resource has been successfully passed to t0; but it hasn't. There is now a resource "hidden" in m_base_sem.

This is hard for me to analyze, but I think the proper invariants are

* When m_count < 0, num resources = m_base_sem.count()
* When m_count >= 0, num resources = m_count + m_base_sem.count()
* When m_count >= 0, num waiters = 0

And therefore try_wait() needs to change:

....bool try_wait() {
........// previous code remains the same
........// return false;
........return m_base_sem.try_wait();
....}

What do you think?

cbloom said...

Off hand my guess is that the fix is to change timed_wait like this :


....bool timed_wait(int timeout) {
........if (m_count($).fetch_add(-1) < 1) {
............bool success = m_base_sem.timed_wait(timeout);
............// (*1)
............if (success) { return true; }
............post();
............return false;
........}
....}



However, I don't really like to even talk about this because I consider all code that uses timed waits to be broken for various reasons.

Drawknob said...

Hi Paul, I'm not sure I understand the semantics of this argument: "At the end of this sequence, t1 is justified in thinking that a resource has been successfully passed to t0; but it hasn't."

I don't think t0 can assume anything as far as a resource being passed onto another thread; the way I see it, correctness only needs to guarantee the conditions when a thread can safely assume it has obtained the semaphore-protected resource for itself. If it doesn't have the resource for itself, that a necessary but not sufficient condition to guarantee that another thread has already obtained it -- I don't think the change of hands has to be atomic; it just has to be mutually exclusive (to borrow mutex terminology).

I'm not sure cbloom's "fix" would work anyway, since when a keyed event wait times out, if I recall correctly, it won't eat up a matching keyed event wake, and that would cause the wake to block.

As for cbloom's belief that timed waits are unsavory, I'd say an argument on the other side is to try to have feature parity with C++11 synchronization primitives, all of which have timed waits (mutexes, condvars, and upcoming shared mutexes).

old rants