Is there an easy way in Python to wait until certain condition is true?

Each Answer to this Q is separated by one/two green lines.

I need to wait in a script until a certain number of conditions become true?

I know I can roll my own eventing using condition variables and friends, but I don’t want to go through all the trouble of implementing it, since some object property changes come from external thread in a wrapped C++ library (Boost.Python), so I can’t just hijack __setattr__ in a class and put a condition variable there, which leaves me with either trying to create and signal a Python condition variable from C++, or wrap a native one and wait on it in Python, both of which sound fiddly, needlessly complicated and boring.

Is there an easier way to do it, barring continuous polling of the condition?

Ideally it would be along the lines of

res = wait_until(lambda: some_predicate, timeout)
if (not res):
    print 'timed out'

Unfortunately the only possibility to meet your constraints is to periodically poll, e.g….:

import time

def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
  mustend = time.time() + timeout
  while time.time() < mustend:
    if somepredicate(*args, **kwargs): return True
    time.sleep(period)
  return False

or the like. This can be optimized in several ways if somepredicate can be decomposed (e.g. if it’s known to be an and of several clauses, especially if some of the clauses are in turn subject to optimization by being detectable via threading.Events or whatever, etc, etc), but in the general terms you ask for, this inefficient approach is the only way out.

Another nice package is waitinghttps://pypi.org/project/waiting/

install:

pip install waiting

Usage:
You pass a function that will be called every time as a condition, a timeout, and (this is useful) you can pass a description for the waiting, which will be displayed if you get TimeoutError.

using function:

from waiting import wait


def is_something_ready(something):
    if something.ready():
        return True
    return False


# wait for something to be ready
something = # whatever

wait(lambda: is_something_ready(something), timeout_seconds=120, waiting_for="something to be ready")

# this code will only execute after "something" is ready
print("Done")

Note: the function must return a boolean – True when the wait is over, False otherwise

Here is another solution. The goal was to make threads to wait on each other before doing some work in a very precise order. The work can take unknown amount of time. Constant polling is not good for two reasons: it eats CPU time and action does not start immediately after condition is met.

class Waiter():

    def __init__(self, init_value):
        self.var = init_value
        self.var_mutex = threading.Lock()
        self.var_event = threading.Event()

    def WaitUntil(self, v):
        while True:
            self.var_mutex.acquire()
            if self.var == v:
                self.var_mutex.release()
                return # Done waiting
            self.var_mutex.release()
            self.var_event.wait(1) # Wait 1 sec

    def Set(self, v):
        self.var_mutex.acquire()
        self.var = v
        self.var_mutex.release()
        self.var_event.set() # In case someone is waiting
        self.var_event.clear()

And the way to test it

class TestWaiter():

    def __init__(self):
        self.waiter = Waiter(0)
        threading.Thread(name="Thread0", target=self.Thread0).start()
        threading.Thread(name="Thread1", target=self.Thread1).start()
        threading.Thread(name="Thread2", target=self.Thread2).start()

    def Thread0(self):
        while True:
            self.waiter.WaitUntil(0)
            # Do some work
            time.sleep(np.random.rand()*2)
            self.waiter.Set(1)

    def Thread1(self):
        while True:
            self.waiter.WaitUntil(1)
            # Do some work
            time.sleep(np.random.rand())
            self.waiter.Set(2)

    def Thread2(self):
        while True:
            self.waiter.WaitUntil(2)
            # Do some work
            time.sleep(np.random.rand()/10)
            self.waiter.Set(0)

Waiter for multiprocessing:

import multiprocessing as mp
import ctypes

class WaiterMP():
    def __init__(self, init_value, stop_value=-1):
        self.var = mp.Value(ctypes.c_int, init_value)
        self.stop_value = stop_value
        self.event = mp.Event()

    def Terminate(self):
        self.Set(self.stop_value)

    def Restart(self):
        self.var.value = self.init_value

    def WaitUntil(self, v):
        while True:
            if self.var.value == v or self.var.value == self.stop_value:
                return
            # Wait 1 sec and check aiagn (in case event was missed)
            self.event.wait(1)

    def Set(self, v):
        exit = self.var.value == self.stop_value
        if not exit: # Do not set var if threads are exiting
            self.var.value = v
        self.event.set() # In case someone is waiting
        self.event.clear()

Please comment if this is still not the best solution.

You’ve basically answered your own question: no.

Since you’re dealing with external libraries in boost.python, which may change objects at their leisure, you need to either have those routines call an event handler refresh, or work with a condition.

Here is the threading extention to Alex’s solution:

import time
import threading

# based on https://stackoverflow.com/a/2785908/1056345                                                                                                                                                                                                                                                                         
def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
    must_end = time.time() + timeout
    while time.time() < must_end:
        if somepredicate(*args, **kwargs):
            return True
        time.sleep(period)
    return False

def wait_until_par(*args, **kwargs):
    t = threading.Thread(target=wait_until, args=args, kwargs=kwargs)
    t.start()
    print ('wait_until_par exits, thread runs in background')

def test():
    print('test')

wait_until_par(test, 5)

From the computational perspective there must be a check for all conditions somewhere, sometime. If you have two parts of code, one that generates conditions changes and the other one that should be executed when some are true, you can do the following:

Have the code that changes conditions in, say, main thread, and the code that should be launched when some conditions are true, in a worker thread.

from threading import Thread,Event

locker = Event()

def WhenSomeTrue(locker):
    locker.clear() # To prevent looping, see manual, link below
    locker.wait(2.0) # Suspend the thread until woken up, or 2s timeout is reached
    if not locker.is_set(): # when is_set() false, means timeout was reached
        print('TIMEOUT')
    else:
    #
    # Code when some conditions are true
    #
worker_thread = Thread(target=WhenSomeTrue, args=(locker,))
worker_thread.start()

cond1 = False
cond2 = False
cond3 = False

def evaluate():
    true_conditions = 0

    for i in range(1,4):
        if globals()["cond"+str(i)]: #access a global condition variable one by one
            true_conditions += 1     #increment at each true value
    if true_conditions > 1:
        locker.set() # Resume the worker thread executing the else branch
    #Or just if true_conditions > 1: locker.set();
    #true_conditions would need be incremented when 'True' is written to any of those variables

#
# some condition change code
#
evaluate()

For more information concerning this method, visit: https://docs.python.org/3/library/threading.html#event-objects

Proposed solution:

def wait_until(delegate, timeout: int):
    end = time.time() + timeout
    while time.time() < end:
        if delegate():
            return True
        else:
            time.sleep(0.1)
    return False

Usage:

wait_until(lambda: True, 2)

In 2022 now you could use https://trio-util.readthedocs.io/en/latest/#trio_util.AsyncValue

I think this comes closest to what you want in its “smoothest” form

This worked for me

direction = ''
t = 0
while direction == '' and t <= 1:
    sleep(0.1)
    t += 0.1

This is for waiting for a signal while making sure time limit of 1 second

here’s how:

import time

i = false

while i == false:
    if (condition):
        i = true
        break

Here’s my Code I used during one of my Projects :

import time
def no() :
    if (Condition !!!) :
        it got true
        oh()
    else:
        time.sleep(1) /Don't remove or don't blame me if ur system gets ""DEAD""
        no()
def oh() :   /Ur main program 
    while True:
        if(bla) :
            .......
            no()
        else :
            time.sleep(1)
            oh()

oh()

Hope it Helps


The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .