问题描述:

I am trying to implement Trident+DRPC. I have designed the topology in a way that it does not run indefinitely. I have two separate classes, one for spout implementation and the another one to implement DRPC and Trident. My spout class (a spout that extends IRichSpout) emits the id of the customer. i.e.

public class TriSpout implements IRichSpout{

//some logic here

spoutOutputCollector.emit(new Values(id))

}

Now I got the values from the output collector in another class which implements Trident with DRPC.

public class TriDrpc{

.....

TriSpout spout=new TriSpout1();

TridentTopology topology = new TridentTopology();

TridentState wordCounts =

topology.newStream("spout1",spout)

.parallelismHint(1)

.each(new Fields("id"), new Compute(), new Fields("value"))

.persistentAggregate(new MemoryMapState.Factory(),

new Count(), new Fields("count"))

and the drpc topology def is as follows

topology.newDRPCStream("Calc", drpc)

.each(new Fields("args"), new Split(), new Fields("word"))

.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));

The DRPC request is as follows

public static void main(String[] args) throws Exception {

Config conf = new Config();

if (args.length == 0) {

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Calculator", conf, buildTopology(drpc));

System.out.println("DRPC RESULT: "

+ drpc.execute("Calc", "id"));

Thread.sleep(1000);

} else {

conf.setNumWorkers(8);

StormSubmitter.submitTopology(args[0], conf, buildTopology(null));

}

}

Now in the above code, in the DRPC request i.e.

System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

The "id" should be same as the id emitted by the spout i.e I want to know which customer has active account using this id, so I need to send DRPC request for all the id's emitted by the spout. Now the DRPC is in main class, how can I pass the value emitted by the spout to the DRPC Request without specifying the id manually?

Can Someone help please

EDITED WITH NEW INFO

网友答案:

Update

Well, it's more clear now what your problem is, thanks.

So, you need to process DRPC requests for the same IDs that same DRPC's topology spout is emitting.

The only way you could accomplish this is to persist the IDs you emit from your spout to a Storm-external persistent storage (for example, a RDMS or a distributed hashmap).

That way, after you submit your topology for execution on the Storm cluster, you can poll your persistent storage for new IDs and execute your DRPC request for each new ID.

Original answer

I don't think I understand the question. Are you trying to execute Storm DRPC requests with the request's ID argument taken from that same DRPC topology's spout's output? I don't think this is an effective and intented use for a DRPC topology. You might better go with an ordinary topology.

DRPC topologies are intended for finite computations, while ordinary topologies are used for continuous computation. A DRPC call takes in the name of the DRPC topology, and a set of input arguments for computing the result of the DRPC call. Ordinary Storm (or Trident) topologies just run indefinitely, computing a result of some sort and persisting it.

I hope this helps. If not, please reformulate your question better, since it's not really clear what your problem is.

相关阅读:
Top