11# """
22# celery_emitter.py
33
4- # Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
4+ # Copyright (c) 2013-2023 Snowplow Analytics Ltd. All rights reserved.
55
66# This program is licensed to you under the Apache License Version 2.0,
77# and you may not use this file except in compliance with the Apache License
1313# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
1414# express or implied. See the Apache License Version 2.0 for the specific
1515# language governing permissions and limitations there under.
16-
17- # Authors: Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18- # Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19- # License: Apache License Version 2.0
2016# """
2117
2218import logging
3935
4036class CeleryEmitter (Emitter ):
4137 """
42- Uses a Celery worker to send HTTP requests asynchronously.
43- Works like the base Emitter class,
44- but on_success and on_failure callbacks cannot be set.
38+ Uses a Celery worker to send HTTP requests asynchronously.
39+ Works like the base Emitter class,
40+ but on_success and on_failure callbacks cannot be set.
4541 """
42+
4643 if _CELERY_OPT :
4744
4845 celery_app = None
4946
5047 def __init__ (
51- self ,
52- endpoint : str ,
53- protocol : HttpProtocol = "http" ,
54- port : Optional [int ] = None ,
55- method : Method = "post" ,
56- buffer_size : Optional [int ] = None ,
57- byte_limit : Optional [int ] = None ) -> None :
58- super (CeleryEmitter , self ).__init__ (endpoint , protocol , port , method , buffer_size , None , None , byte_limit )
48+ self ,
49+ endpoint : str ,
50+ protocol : HttpProtocol = "http" ,
51+ port : Optional [int ] = None ,
52+ method : Method = "post" ,
53+ batch_size : Optional [int ] = None ,
54+ byte_limit : Optional [int ] = None ,
55+ ) -> None :
56+ super (CeleryEmitter , self ).__init__ (
57+ endpoint , protocol , port , method , batch_size , None , None , byte_limit
58+ )
5959
6060 try :
6161 # Check whether a custom Celery configuration module named "snowplow_celery_config" exists
6262 import snowplow_celery_config
63+
6364 self .celery_app = Celery ()
6465 self .celery_app .config_from_object (snowplow_celery_config )
6566 except ImportError :
@@ -80,6 +81,10 @@ def async_flush(self) -> None:
8081
8182 else :
8283
83- def __new__ (cls , * args : Any , ** kwargs : Any ) -> 'CeleryEmitter' :
84- logger .error ("CeleryEmitter is not available. Please install snowplow-tracker with celery extra dependency." )
85- raise RuntimeError ('CeleryEmitter is not available. To use: `pip install snowplow-tracker[celery]`' )
84+ def __new__ (cls , * args : Any , ** kwargs : Any ) -> "CeleryEmitter" :
85+ logger .error (
86+ "CeleryEmitter is not available. Please install snowplow-tracker with celery extra dependency."
87+ )
88+ raise RuntimeError (
89+ "CeleryEmitter is not available. To use: `pip install snowplow-tracker[celery]`"
90+ )
0 commit comments