@@ -116,6 +116,12 @@ def test_resource(cls, filename):
116116 return path
117117 return os .path .join (cls .project_root , "servers" , "resources" , "default" , filename )
118118
119+ @classmethod
120+ def run_script (cls , script , * args ):
121+ result = [os .path .join (cls .kafka_root , 'bin' , script )]
122+ result .extend ([str (arg ) for arg in args ])
123+ return result
124+
119125 @classmethod
120126 def kafka_run_class_args (cls , * args ):
121127 result = [os .path .join (cls .kafka_root , 'bin' , 'kafka-run-class.sh' )]
@@ -202,6 +208,7 @@ def open(self):
202208 # Configure Zookeeper child process
203209 template = self .test_resource ("zookeeper.properties" )
204210 properties = self .tmp_dir .join ("zookeeper.properties" )
211+ # Consider replacing w/ run_script('zookeper-server-start.sh', ...)
205212 args = self .kafka_run_class_args ("org.apache.zookeeper.server.quorum.QuorumPeerMain" ,
206213 properties .strpath )
207214 env = self .kafka_run_class_env ()
@@ -334,32 +341,30 @@ def _jaas_config(self):
334341
335342 elif self .sasl_mechanism == 'PLAIN' :
336343 jaas_config = (
337- 'org.apache.kafka.common.security.plain.PlainLoginModule required\n '
338- ' username="{user}" password="{password}" user_{user}="{password}";\n '
344+ 'org.apache.kafka.common.security.plain.PlainLoginModule required'
345+ ' username="{user}" password="{password}" user_{user}="{password}";\n '
339346 )
340347 elif self .sasl_mechanism in ("SCRAM-SHA-256" , "SCRAM-SHA-512" ):
341348 jaas_config = (
342- 'org.apache.kafka.common.security.scram.ScramLoginModule required\n '
343- ' username="{user}" password="{password}";\n '
349+ 'org.apache.kafka.common.security.scram.ScramLoginModule required'
350+ ' username="{user}" password="{password}";\n '
344351 )
345352 else :
346353 raise ValueError ("SASL mechanism {} currently not supported" .format (self .sasl_mechanism ))
347354 return jaas_config .format (user = self .broker_user , password = self .broker_password )
348355
349356 def _add_scram_user (self ):
350357 self .out ("Adding SCRAM credentials for user {} to zookeeper." .format (self .broker_user ))
351- args = self .kafka_run_class_args (
352- "kafka.admin.ConfigCommand" ,
353- "--zookeeper" ,
354- "%s:%d/%s" % (self .zookeeper .host ,
355- self .zookeeper .port ,
356- self .zk_chroot ),
357- "--alter" ,
358- "--entity-type" , "users" ,
359- "--entity-name" , self .broker_user ,
360- "--add-config" ,
361- "{}=[password={}]" .format (self .sasl_mechanism , self .broker_password ),
362- )
358+ args = self .run_script ('kafka-configs.sh' ,
359+ '--zookeeper' ,
360+ '%s:%d/%s' % (self .zookeeper .host ,
361+ self .zookeeper .port ,
362+ self .zk_chroot ),
363+ '--alter' ,
364+ '--entity-type' , 'users' ,
365+ '--entity-name' , self .broker_user ,
366+ '--add-config' ,
367+ '{}=[password={}]' .format (self .sasl_mechanism , self .broker_password ))
363368 env = self .kafka_run_class_env ()
364369 proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
365370
@@ -390,13 +395,12 @@ def out(self, message):
390395
391396 def _create_zk_chroot (self ):
392397 self .out ("Creating Zookeeper chroot node..." )
393- args = self .kafka_run_class_args ("org.apache.zookeeper.ZooKeeperMain" ,
394- "-server" ,
395- "%s:%d" % (self .zookeeper .host ,
396- self .zookeeper .port ),
397- "create" ,
398- "/%s" % (self .zk_chroot ,),
399- "kafka-python" )
398+ args = self .run_script ('zookeeper-shell.sh' ,
399+ '%s:%d' % (self .zookeeper .host ,
400+ self .zookeeper .port ),
401+ 'create' ,
402+ '/%s' % (self .zk_chroot ,),
403+ 'kafka-python' )
400404 env = self .kafka_run_class_env ()
401405 proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
402406
@@ -416,6 +420,7 @@ def start(self):
416420 properties_template = self .test_resource ("kafka.properties" )
417421 jaas_conf_template = self .test_resource ("kafka_server_jaas.conf" )
418422
423+ # Consider replacing w/ run_script('kafka-server-start.sh', ...)
419424 args = self .kafka_run_class_args ("kafka.Kafka" , properties .strpath )
420425 env = self .kafka_run_class_env ()
421426 if self .sasl_enabled :
@@ -590,17 +595,15 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
590595 raise errors .for_code (error_code )
591596
592597 def _create_topic_via_cli (self , topic_name , num_partitions , replication_factor ):
593- args = self .kafka_run_class_args ('kafka.admin.TopicCommand' ,
594- '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
595- self .zookeeper .port ,
596- self .zk_chroot ),
597- '--create' ,
598- '--topic' , topic_name ,
599- '--partitions' , self .partitions \
600- if num_partitions is None else num_partitions ,
601- '--replication-factor' , self .replicas \
602- if replication_factor is None \
603- else replication_factor )
598+ args = self .run_script ('kafka-topics.sh' ,
599+ '--create' ,
600+ '--topic' , topic_name ,
601+ '--partitions' , self .partitions \
602+ if num_partitions is None else num_partitions ,
603+ '--replication-factor' , self .replicas \
604+ if replication_factor is None \
605+ else replication_factor ,
606+ * self ._cli_connect_args ())
604607 if env_kafka_version () >= (0 , 10 ):
605608 args .append ('--if-not-exists' )
606609 env = self .kafka_run_class_env ()
@@ -613,16 +616,23 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
613616 self .out (stderr )
614617 raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
615618
619+ def _cli_connect_args (self ):
620+ if env_kafka_version () < (3 , 0 , 0 ):
621+ return ['--zookeeper' , '%s:%s/%s' % (self .zookeeper .host , self .zookeeper .port , self .zk_chroot )]
622+ else :
623+ args = ['--bootstrap-server' , '%s:%s' % (self .host , self .port )]
624+ if self .sasl_enabled :
625+ command_conf = self .tmp_dir .join ("sasl_command.conf" )
626+ self .render_template (self .test_resource ("sasl_command.conf" ), command_conf , vars (self ))
627+ args .append ('--command-config' )
628+ args .append (command_conf .strpath )
629+ return args
630+
616631 def get_topic_names (self ):
617- args = self .kafka_run_class_args ('kafka.admin.TopicCommand' ,
618- '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
619- self .zookeeper .port ,
620- self .zk_chroot ),
621- '--list'
622- )
632+ cmd = self .run_script ('kafka-topics.sh' , '--list' , * self ._cli_connect_args ())
623633 env = self .kafka_run_class_env ()
624634 env .pop ('KAFKA_LOG4J_OPTS' )
625- proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
635+ proc = subprocess .Popen (cmd , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
626636 stdout , stderr = proc .communicate ()
627637 if proc .returncode != 0 :
628638 self .out ("Failed to list topics!" )
0 commit comments