问题描述:

I'm running into "replication factor: 1 larger than available brokers: 0 on a multi Node cluster" in the multi node kafka cluster, when trying to create topics using the createTopics function mentioned below. I have 3 kafka brokers, I'm trying to create 2 partitions per topic and keeping replication factor to 1. Not sure why getting this error. The same works perfectly fine in a single node setup (1 zookeeper and 1 kafka broker).

Any help would be much appreciated/

Error:

kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0

at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)

at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)

at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)

at io.confluent.examples.producer.ZookeeperUtil.createTopics(ZookeeperUtil.java:98)

at io.confluent.examples.producer.ProducerGroup.<init>(ProducerGroup.java:50)

at io.confluent.examples.producer.ProducerGroup.main(ProducerGroup.java:124)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)

at java.lang.Thread.run(Thread.java:745)

private static final int DEFAULT_SESSION_TIMEOUT = 10 * 1000;

private static final int DEFAULT_CONNECTION_TIMEOUT = 8 * 1000;

private static final String ZOOKEEPER_CONNECT = "zNode01:2181,zNode02:2181,zNode03:2181";

/**

* Opens a new ZooKeeper client to access the Kafka broker.

*/

private static ZkClient connectToZookeeper ()

{

return new ZkClient(ZOOKEEPER_CONNECT,

DEFAULT_SESSION_TIMEOUT,

DEFAULT_CONNECTION_TIMEOUT,

ZKStringSerializer$.MODULE$);

}

/**

* Given a ZooKeeper client instance, accesses the broker and returns

* information about Kafka's contents.

*

* @param zookeeperClient A ZooKeeper client to access broker information

* through.

*/

private static ZkUtils zookeeperUtility (ZkClient zookeeperClient)

{

boolean isSecureCluster = false;

return new ZkUtils(zookeeperClient,

new ZkConnection(ZOOKEEPER_CONNECT),

isSecureCluster);

}

public static void createTopics (ArrayList<String> names, int partitions, int replication)

{

ZkClient zkClient = connectToZookeeper();

ZkUtils zkUtils = zookeeperUtility(zkClient);

try{

for (String name: names)

{

if (existsTopic(name))

continue;

AdminUtils.createTopic(zkUtils, name, partitions, replication, new Properties(),RackAwareMode.Disabled$.MODULE$);

}

} catch (Exception ex) {

ex.printStackTrace();

} finally {

if (zkClient != null) {

zkClient.close();

}

}

}

I followed the following instructions to set up a multi node kafka cluster.

Setting a multi node Apache ZooKeeper cluster

On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties

 server.1=zNode01:2888:3888

server.2=zNode02:2888:3888

server.3=zNode03:2888:3888

#add here more servers if you want

initLimit=5

syncLimit=2

On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (1 for zNode01, 2 for ZNode02, etc )

Setting a multi broker Apache Kafka cluster

On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:

 zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

On every node of the cluster modify the property host.name from the file kafka/config/server.properties:

host.name=zNode0x

On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)

相关阅读:
Top