Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 1 addition & 21 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.internal.BroadcastFuture;
import net.spy.memcached.internal.BulkFuture;
Expand Down Expand Up @@ -120,7 +118,7 @@
* }</pre>
*/
public class MemcachedClient extends SpyThread
implements MemcachedClientIF, ConnectionObserver {
implements MemcachedClientIF {

private volatile boolean running = true;
private volatile boolean shuttingDown = false;
Expand All @@ -132,16 +130,12 @@ public class MemcachedClient extends SpyThread

protected final Transcoder<Object> transcoder;

private final AuthDescriptor authDescriptor;

private final byte delimiter;

private static final String DEFAULT_MEMCACHED_CLIENT_NAME = "MemcachedClient";

private static final int GET_BULK_CHUNK_SIZE = 200;

private final AuthThreadMonitor authMonitor = new AuthThreadMonitor();

/**
* Get a memcached client operating on the specified memcached locations.
*
Expand Down Expand Up @@ -219,10 +213,6 @@ public MemcachedClient(ConnectionFactory cf, String name, List<InetSocketAddress
conn = cf.createConnection(name, addrs);
assert conn != null : "Connection factory failed to make a connection";
operationTimeout = cf.getOperationTimeout();
authDescriptor = cf.getAuthDescriptor();
if (authDescriptor != null) {
addObserver(this);
}
delimiter = cf.getDelimiter();
setName("Memcached IO over " + conn);
setDaemon(cf.isDaemon());
Expand Down Expand Up @@ -2160,16 +2150,6 @@ public boolean removeObserver(ConnectionObserver obs) {
return conn.removeObserver(obs);
}

public void connectionEstablished(MemcachedNode node, int reconnectCount) {
if (authDescriptor != null) {
authMonitor.authConnection(conn, opFact, authDescriptor, node);
}
}

public void connectionLost(MemcachedNode node) {
// Don't care.
}

/**
* Returns current MemcachedConnection
*
Expand Down
72 changes: 71 additions & 1 deletion src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;

import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthException;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.compat.log.LoggerFactory;
Expand Down Expand Up @@ -102,6 +106,7 @@ public final class MemcachedConnection extends SpyObject {

private final OperationFactory opFactory;
private final ConnectionFactory connFactory;
private final AuthDescriptor authDescriptor;
private final Collection<ConnectionObserver> connObservers =
new ConcurrentLinkedQueue<>();
private final Set<MemcachedNode> nodesNeedVersionOp = new HashSet<>();
Expand Down Expand Up @@ -137,6 +142,7 @@ public MemcachedConnection(String name, ConnectionFactory f,
FailureMode fm, OperationFactory opfactory)
throws IOException {
this.connFactory = f;
authDescriptor = f.getAuthDescriptor();
connName = name;
connObservers.addAll(obs);
addedQueue = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -636,6 +642,69 @@ private MemcachedNode makeMemcachedNode(String name,
return qa;
}

private void prepareAuthentication(final MemcachedNode node) {
if (authDescriptor == null) {
return;
}

final SaslClient sc;
try {
sc = Sasl.createSaslClient(authDescriptor.getMechs(), null,
"memcached", node.getSocketAddress().toString(), null, authDescriptor.getCallback());
} catch (Exception e) {
throw new IllegalStateException("Can't create SaslClient", e);
}
if (sc == null) {
throw new IllegalStateException("SaslClient is null");
}

final OperationCallback cb = new OperationCallback() {
private boolean authDone = false;
private boolean mechDone = false;
private OperationStatus priorStatus = null;

@Override
public void receivedStatus(OperationStatus val) {
String msg = val.getMessage();
// If the status we found was SASL_OK or NOT_SUPPORTED, we're authDone.
if ("SASL_OK".equals(msg) || "NOT_SUPPORTED".equals(msg)) {
authDone = true;
node.authComplete(true);
getLogger().info("Authenticated to " + node.getSocketAddress());
} else if (!val.isSuccess()) {
authDone = true;
node.authComplete(false);
getLogger().error("Authentication failed to " + node.getSocketAddress() + ": " + msg);
} else if (!mechDone) {
mechDone = true;
} else {
// Get the prior status to create the correct operation.
priorStatus = val;
}
}

@Override
public void complete() {
if (authDone) {
return;
}

// NOTE: `this` keyword below is the OperationCallback object itself.
final Operation op;
if (priorStatus == null) {
op = opFactory.saslAuth(sc, this);
} else {
op = opFactory.saslStep(sc, KeyUtil.getKeyBytes(priorStatus.getMessage()), this);
}

insertOperation(node, op);
}
};

final Operation mechOp = opFactory.saslMechs(true, cb);
insertOperation(node, mechOp);
}

private void prepareVersionInfo(final MemcachedNode node) {
Operation op = opFactory.version(new OperationCallback() {
@Override
Expand Down Expand Up @@ -867,6 +936,7 @@ private void connected(MemcachedNode qa) {
for (ConnectionObserver observer : connObservers) {
observer.connectionEstablished(qa, rt);
}
prepareAuthentication(qa);
prepareVersionInfo(qa);
}

Expand Down Expand Up @@ -1422,7 +1492,7 @@ public void addOperation(final String key, final Operation o) {
addOperation(findNodeByKey(key, o), o);
}

public void insertOperation(final MemcachedNode node, final Operation o) {
private void insertOperation(final MemcachedNode node, final Operation o) {
if (!node.isConnected() && failureMode == FailureMode.Cancel) {
o.setHandlingNode(node);
o.cancel("inactive node");
Expand Down
109 changes: 0 additions & 109 deletions src/main/java/net/spy/memcached/auth/AuthThread.java

This file was deleted.

58 changes: 0 additions & 58 deletions src/main/java/net/spy/memcached/auth/AuthThreadMonitor.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,10 @@ public final boolean addOpToWriteQ(Operation op) {
public final void insertOp(Operation op) {
op.setHandlingNode(this);
op.initialize();
ArrayList<Operation> tmp = new ArrayList<>(
inputQueue.size() + 1);
tmp.add(op);
inputQueue.drainTo(tmp);
inputQueue.addAll(tmp);
if (!writeQ.offer(op)) {
op.cancel("write queue overflow");
return;
}
addOpCount.incrementAndGet();
}

Expand Down