diff --git a/kombu/pidbox.py b/kombu/pidbox.py index ee639b3c9..b38c23a02 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -236,7 +236,7 @@ def get_reply_queue(self): f'{oid}.{self.reply_exchange.name}', exchange=self.reply_exchange, routing_key=oid, - durable=False, + durable=True, auto_delete=True, expires=self.reply_queue_expires, message_ttl=self.reply_queue_ttl, @@ -250,7 +250,7 @@ def get_queue(self, hostname): return Queue( f'{hostname}.{self.namespace}.pidbox', exchange=self.exchange, - durable=False, + durable=True, auto_delete=True, expires=self.queue_expires, message_ttl=self.queue_ttl, @@ -269,9 +269,10 @@ def producer_or_acquire(self, producer=None, channel=None): def _publish_reply(self, reply, exchange, routing_key, ticket, channel=None, producer=None, **opts): chan = channel or self.connection.default_channel - exchange = Exchange(exchange, exchange_type='direct', - delivery_mode='transient', - durable=False) + exchange = Exchange(exchange, + exchange_type='direct', + delivery_mode='persistent', + durable=True) with self.producer_or_acquire(producer, chan) as producer: try: producer.publish( @@ -395,14 +396,14 @@ def on_message(body, message): def _get_exchange(self, namespace, type): return Exchange(self.exchange_fmt % namespace, type=type, - durable=False, - delivery_mode='transient') + durable=True, + delivery_mode='persistent') def _get_reply_exchange(self, namespace): return Exchange(self.reply_exchange_fmt % namespace, type='direct', - durable=False, - delivery_mode='transient') + durable=True, + delivery_mode='persistent') @property def oid(self): diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py index cfd864d8a..6b562562b 100644 --- a/kombu/transport/qpid.py +++ b/kombu/transport/qpid.py @@ -588,7 +588,7 @@ def _has_queue(self, queue, **kwargs): else: return False - def queue_declare(self, queue, passive=False, durable=False, + def queue_declare(self, queue, passive=False, durable=True, exclusive=False, auto_delete=True, nowait=False, arguments=None): """Create a new queue specified by name. @@ -708,7 +708,7 @@ def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs): return self._delete(queue) - def exchange_declare(self, exchange='', type='direct', durable=False, + def exchange_declare(self, exchange='', type='direct', durable=True, **kwargs): """Create a new exchange.