curator/src/main/java/zander/library/RemoteStore.java

446 lines
15 KiB
Java

package zander.library;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ProtocolException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zander.library.Manifest.ManifestParseException;
import zander.library.secrets.Secrets;
public class RemoteStore {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStore.class);
private static final String MANIFEST_PATH = ".manifest";
public static interface FTPAction {
public void run() throws IOException;
}
private URI url;
private Secrets secrets;
private Manifest manifestCahce;
/** may be connected or disconnected */
private final FTPClient ftp;
private int connectionLevel = 0;
private boolean ignoreManifest = false;
private boolean passiveMode;
public RemoteStoreFileTypeHandler fileTypeHandler = new RemoteStoreFileTypeHandler();
public RemoteStore(URI url, Secrets secrets) throws IOException, ManifestParseException {
this.url = url;
if (!url.isAbsolute() || url.getScheme().equals("ftp")) {
ftp = new FTPClient();
} else if (url.getScheme().equals("ftps")) {
ftp = new FTPSClient("TLS", false);
((FTPSClient) ftp).setEndpointCheckingEnabled(true);
} else {
throw new ProtocolException("unknown protocol: '" + url.getScheme() + "'");
}
ftp.addProtocolCommandListener(new PrintCommandListener(System.out, true));
ftp.setListHiddenFiles(true);
this.secrets = secrets;
}
public URI getURL() {
return url;
}
public boolean isIgnoringManifest() {
return ignoreManifest;
}
public void setIgnoreManifest(boolean ignoreManifest) {
this.ignoreManifest = ignoreManifest;
}
public boolean isPassiveMode() {
return passiveMode;
}
public void setPassiveMode(boolean passiveMode) throws IOException {
this.passiveMode = passiveMode;
if (connectionLevel != 0) {
String modeString = passiveMode ? "passive" : "active";
LOGGER.info("Switching to " + modeString + " mode");
if (passiveMode) {
ftp.enterLocalPassiveMode();
} else {
ftp.enterLocalActiveMode();
}
checkFtpResponse("Failed to switch to " + modeString + " mode");
}
}
public void fetchManifest() throws IOException {
doFtpActions(() -> {
FTPFile info = fileInformation(MANIFEST_PATH);
if (info == null) {
manifestCahce = new Manifest();
uploadManifest();
} else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ftp.setFileType(FTP.ASCII_FILE_TYPE);
ftp.retrieveFile(resolvePath(MANIFEST_PATH), out);
LOGGER.info("Retrieved manifest ({} B)", out.size());
try {
manifestCahce = new Manifest(new String(out.toByteArray()));
} catch (ManifestParseException e) {
throw new IOException("Could not parse remote manifest");
}
}
});
}
public Manifest getManifest() throws IOException {
if (manifestCahce == null) {
fetchManifest();
}
return manifestCahce;
}
public void addBinaryFile(String path, byte[] data) throws IOException {
doFtpActions(() -> {
Manifest mf = getManifest();
createParentDirectories(path);
ftp.setFileType(FTP.BINARY_FILE_TYPE);
final ByteArrayInputStream in = new ByteArrayInputStream(data);
ftp.storeFile(resolvePath(path), in);
checkFtpResponse("Could not upload file: " + path);
LOGGER.info("Uploaded ascii file: {} ({} B)", path, data.length);
if (!ignoreManifest) {
mf.add(cleanPath(path), data);
uploadManifest();
}
});
}
public void addAsciiFile(String path, String data) throws IOException {
doFtpActions(() -> {
final Manifest mf = getManifest();
createParentDirectories(path);
ftp.setFileType(FTP.ASCII_FILE_TYPE);
final byte[] bytes = data.getBytes();
final ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ftp.storeFile(resolvePath(path), in);
checkFtpResponse("Could not upload file: " + path);
LOGGER.info("Uploaded file: {} ({} B)", path, bytes.length);
if (!ignoreManifest) {
mf.add(cleanPath(path), data);
uploadManifest();
}
});
}
public void addFile(String path, byte[] data) throws IOException {
if (fileTypeHandler.isBinaryFile(path)) {
addBinaryFile(path, data);
} else {
addAsciiFile(path, new String(data));
}
}
public void deleteFile(String path) throws IOException {
doFtpActions(() -> {
Manifest mf = getManifest();
String rp = resolvePath(path);
recursiveDelete(rp);
LOGGER.info("Deleted file: {}", path);
deleteEmptyUpward(rp);
if (!ignoreManifest) {
mf.delete(path);
uploadManifest();
}
});
}
public void moveFile(String source, String target) throws IOException {
doFtpActions(() -> {
Manifest mf = getManifest();
String rs = resolvePath(source);
String rt = resolvePath(target);
FTPFile info = fileInformation(target);
if (info != null) {
recursiveDelete(rt);
}
ftp.rename(rs, rt);
LOGGER.info("Moved remote file '{}' to '{}'", source, target);
if (!ignoreManifest) {
mf.move(cleanPath(source), cleanPath(target));
uploadManifest();
}
});
}
public byte[] retrieveBinaryFile(String path) throws IOException {
AtomicReference<byte[]> data = new AtomicReference<byte[]>();
doFtpActions(() -> {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
ftp.setFileType(FTP.BINARY_FILE_TYPE);
ftp.retrieveFile(resolvePath(path), out);
checkFtpResponse("Could not retrieve binary file: " + path);
LOGGER.info("Retrieved binary file: {} ({} B)", path, out.size());
data.set(out.toByteArray());
});
return data.get();
}
public String retrieveAsciiFile(String path) throws IOException {
AtomicReference<String> data = new AtomicReference<String>();
doFtpActions(() -> {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
ftp.setFileType(FTP.ASCII_FILE_TYPE);
ftp.retrieveFile(resolvePath(path), out);
checkFtpResponse("Could not retrieve ascii file: " + path);
LOGGER.info("Retrieved ascii file: {} ({} B)", path, out.size());
data.set(new String(out.toByteArray()));
});
return data.get();
}
public byte[] retrieveFile(String path) throws IOException {
if (fileTypeHandler.isBinaryFile(path)) {
return retrieveBinaryFile(path);
} else {
return retrieveAsciiFile(path).getBytes();
}
}
public boolean fileExists(String path) throws IOException {
return fileInformation(path) != null;
}
private boolean isRootDir(String path) {
return path.matches("^(/+\\.{0,2})+$");
}
public FTPFile fileInformation(String path) throws IOException {
if (isRootDir(path)) {
return null;
}
AtomicReference<FTPFile> file = new AtomicReference<FTPFile>();
doFtpActions(() -> {
String pp = getParentPath(path);
if (pp == null) {
pp = getBasePath();
}
String name = getFileName(path);
FTPFile[] files = ftp.listFiles(pp);
System.out.println("STAT " + name + " in " + pp);
for (FTPFile f : files) {
System.out.println(
(f.isDirectory() ? "D " : "F ") +
f.getName());
if (f.getName().equals(name)) {
file.set(f);
}
}
});
return file.get();
}
public void openConnection() throws IOException {
if (connectionLevel++ == 0) {
int port = url.getPort();
String host = url.getHost();
ftp.connect(host == null ? "" : host, port == -1 ? FTPClient.DEFAULT_PORT : port);
if (!ftp.login(secrets.getUsername(), secrets.getPassword())) {
throw new IOException("Invalid ftp secrets! Username: '" + secrets.getUsername() + "'");
}
LOGGER.info("Logged info ftp as user '{}'", secrets.getUsername());
if (ftp instanceof FTPSClient) {
FTPSClient ftps = (FTPSClient) ftp;
ftps.execPBSZ(0);
checkFtpResponse("Failed to send PBSZ command");
ftps.execPROT("P");
checkFtpResponse("Failed to send PROT command");
}
setPassiveMode(passiveMode);
}
}
public void closeConnection() throws IOException {
if (connectionLevel == 0) {
return;
}
if (--connectionLevel == 0) {
try {
ftp.logout();
ftp.disconnect();
} catch (IOException e) {
ftp.disconnect();
throw e;
} finally {
LOGGER.info("Disconnected from ftp");
}
}
}
public void closeNow() throws IOException {
connectionLevel = 1;
closeConnection();
}
public void doFtpActions(FTPAction action) throws IOException {
try {
openConnection();
action.run();
} finally {
closeConnection();
}
}
/** path must be resolved */
private void deleteEmptyUpward(String path) throws IOException {
doFtpActions(() -> {
if ((path).equals(cleanPath(getBasePath()))) {
return;
}
FTPFile info = fileInformation(path);
if (info == null) {
return;
} else if (!info.isDirectory()) {
throw new IOException("Not a directory: " + path);
}
FTPFile[] children = ftp.mlistDir(path);
checkFtpResponse("Could not list directory: " + path);
if (children.length == 0) {
ftp.deleteFile(path);
checkFtpResponse("Could not delete directory: " + path);
LOGGER.info("Deleted empty directory: {}", path);
deleteEmptyUpward(getParentPath(path));
}
});
}
/** path must be resolved */
private void recursiveDelete(String path) throws IOException {
doFtpActions(() -> {
FTPFile info = fileInformation(path);
if (info == null) {
return;
} else if (info.isDirectory()) {
FTPFile[] children = ftp.mlistDir(path);
checkFtpResponse("Could not list directory: " + path);
for (FTPFile child : children) {
recursiveDelete(path + "/" + child.getName());
}
}
ftp.deleteFile(path);
checkFtpResponse("Could not delete file: " + path);
LOGGER.info("Deleted remote file: {}", path);
});
}
private void createParentDirectories(String path) throws IOException {
doFtpActions(() -> {
String pp = getParentPath(path);
if (pp == null) {
return;
}
FTPFile info = fileInformation(pp);
if (info == null) {
final StringBuilder pb = new StringBuilder(getBasePath());
String[] parts = pp.split("/");
for (String part : parts) {
pb.append(part);
pb.append("/");
ftp.makeDirectory(pb.toString());
}
info = fileInformation(pp);
// if (info == null || !info.isDirectory()) {
// throw new IOException("Failed to create directory: " + pp);
// }
LOGGER.info("Created directory: {}", pp);
} else if (!info.isDirectory()) {
throw new IOException("Not a directory: " + pp);
}
});
}
private void checkFtpResponse(String error) throws IOException {
if (ftp.isConnected()) {
int r = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(r)) {
String replyString = ftp.getReplyString();
if (replyString.endsWith("\n")) {
replyString = replyString.substring(0, replyString.length() - 1);
}
if (replyString.endsWith(".")) {
replyString = replyString.substring(0, replyString.length() - 1);
}
LOGGER.info("{}: {}", error, replyString);
throw new IOException(error + ": " + replyString);
}
}
}
private void uploadManifest() throws IOException {
doFtpActions(() -> {
Manifest mf = getManifest();
byte[] bytes = mf.getData().getBytes();
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ftp.setFileType(FTP.ASCII_FILE_TYPE);
ftp.storeFile(resolvePath(MANIFEST_PATH), in);
checkFtpResponse("Could not upload manifest");
LOGGER.info("Uploaded manifest ({} B)", bytes.length);
});
}
private String getBasePath() {
String base = url.getPath();
if (!base.endsWith("/")) {
base += "/";
}
return base;
}
private String resolvePath(String p) {
return getBasePath() + cleanPath(p);
}
private static String cleanPath(String p) {
if (!p.isEmpty() && p.charAt(0) == '/') {
p = p.substring(1);
}
if (p.isEmpty() && p.charAt(p.length() - 1) == '/') {
p = p.substring(0, p.length() - 1);
}
return p;
}
private static String getParentPath(String path) {
String cp = cleanPath(path);
int i = cp.lastIndexOf("/");
if (i == -1) {
return null;
}
String pp = cp.substring(0, i);
while (!pp.isEmpty() && pp.charAt(pp.length() - 1) == '/') {
pp = pp.substring(0, pp.length() - 1);
}
return pp;
}
private static String getFileName(String path) {
String cp = cleanPath(path);
int i = cp.lastIndexOf("/");
if (i == -1) {
return cp;
}
return cp.substring(i + 1);
}
}