import threading |
import logging |
import random |
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" |
logging.basicConfig( format = FORMAT , level = logging.INFO) |
class Dispatcher: |
def __init__( self ): |
self .data = None |
self .event = threading.Event() |
self .cond = threading.Condition() |
def produce( self , total): |
for _ in range (total): |
data = random.randint( 0 , 100 ) |
logging.info(data) |
with self .cond: |
self .data = data |
# self.cond.notify_all() |
self .cond.notify( 2 ) |
# self.send.set() |
self .event.wait( 1 ) |
self .event. set () |
def consume( self ): |
while True : |
with self .cond: |
# self.send.wait() |
self .cond.wait() #阻塞等通知 |
data = self .data |
logging.info( 'receive {}' . format (data)) |
# self.data = None |
# self.send.clear() |
# self.event.wait(0.5) |
d = Dispatcher() |
p = threading.Thread(target = d.produce, args = ( 10 ,), name = 'producer' ) |
for _ in range ( 5 ): #广播1对5 |
c = threading.Thread(target = d.consume, name = 'consumer' ) |
c.start() |
p.start() |