Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan authored and 王聃 committed Jan 22, 2025
1 parent 5d61a00 commit 230b993
Showing 1 changed file with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@

import com.google.common.net.InetAddresses;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pegasus.base.error_code;
import org.apache.pegasus.base.rpc_address;
import org.apache.pegasus.operator.client_operator;
Expand Down Expand Up @@ -269,35 +265,40 @@ void resolveHost(String hostPort) throws IllegalArgumentException {
}

Set<rpc_address> newSet = new TreeSet<>(Arrays.asList(addrs));
Set<rpc_address> oldSet = new TreeSet<>();
for (ReplicaSession meta : metaList) {
oldSet.add(meta.getAddress());
}
Set<rpc_address> oldSet =
metaList.stream()
.map(ReplicaSession::getAddress)
.collect(Collectors.toCollection(TreeSet::new));

// fast path: do nothing if meta list is unchanged.
if (newSet.equals(oldSet)) {
return;
}

// removed metas
Set<rpc_address> removed = new HashSet<>(oldSet);
removed.removeAll(newSet);
for (rpc_address addr : removed) {
logger.info("meta server {} was removed", addr);
for (int i = 0; i < metaList.size(); i++) {
if (metaList.get(i).getAddress().equals(addr)) {
ReplicaSession session = metaList.remove(i);
session.closeSession();
}
Set<rpc_address> removedSet = new HashSet<>(oldSet);
removedSet.removeAll(newSet);

Iterator<ReplicaSession> iterator = metaList.iterator();
while (iterator.hasNext()) {
ReplicaSession session = iterator.next();
rpc_address addr = session.getAddress();
if (!removedSet.contains(addr)) {
continue;
}

session.closeSession();
iterator.remove();
logger.info("meta server {} was removed", addr);
}

// newly added metas
Set<rpc_address> added = new HashSet<>(newSet);
added.removeAll(oldSet);
for (rpc_address addr : added) {
logger.info("meta server {} was added", addr);
Set<rpc_address> addedSet = new HashSet<>(newSet);
addedSet.removeAll(oldSet);

for (rpc_address addr : addedSet) {
metaList.add(clusterManager.getReplicaSession(addr));
logger.info("meta server {} was added", addr);
}
}

Expand Down

0 comments on commit 230b993

Please sign in to comment.