Asynchronous work with the file system in Vert.x
Vert.x is an ecosystem to build reactive applications on the JVM that can scale and process huge amounts of data in real time. This is a polyglot platform that supports not only Java, but also Kotlin, Groovy, Scala, and js. In the context of the article we will work in Java
One of the features of vert.x is set of abstractions And API for asynchronous work with the network, file system and other resources. With it, you can easily create scalable web applications, microservices, network utilities and much more.
File system
In Vert.x, all work with the file system is done through the class FileSystem
.
To create a file you can use the method createFile
which asynchronously creates a new file:
FileSystem fs = vertx.fileSystem();
fs.createFile("myfile.txt", result -> {
if (result.succeeded()) {
System.out.println("Файл успешно создан.");
} else {
System.err.println("Ошибка при создании файла: " + result.cause());
}
});
Similarly, to delete a file, use the method delete
:
fs.delete("myfile.txt", result -> {
if (result.succeeded()) {
System.out.println("Файл успешно удален.");
} else {
System.err.println("Ошибка при удалении файла: " + result.cause());
}
});
There is a method to read the contents of the Vert.x file readFile
which returns the contents of the file as an object Buffer
which can easily be converted to a string or byte array:
fs.readFile("myfile.txt", result -> {
if (result.succeeded()) {
Buffer buffer = result.result();
System.out.println("Содержимое файла: " + buffer.toString());
} else {
System.err.println("Ошибка при чтении файла: " + result.cause());
}
});
To write to a file, use the method writeFile
which takes a file path and data in the form Buffer
:
Buffer data = Buffer.buffer("Hello, Vert.x!");
fs.writeFile("myfile.txt", data, result -> {
if (result.succeeded()) {
System.out.println("Данные успешно записаны в файл.");
} else {
System.err.println("Ошибка при записи в файл: " + result.cause());
}
});
AsyncFile
AsyncFile
in Vert.x is an interface for asynchronously working with files. It allows you to perform various operations with files: reading and writing, asynchronously, i.e. without blocking the thread in which the operation is performed.
Basic methods:
read(Buffer buffer, int offset, long position, int length, Handler<AsyncResult<Buffer>> handler)
: Asynchronously reads data from a file toBuffer
.offset
indicates the position in the buffer where the data will be written,position
is the position in the file from which reading will begin, andlength
— number of bytes to read.write(Buffer buffer, long position, Handler<AsyncResult<Void>> handler)
: Asynchronously writes data fromBuffer
to the file starting from the specified oneposition
.flush(Handler<AsyncResult<Void>> handler)
: Asynchronously flushes data from a buffer to a fileclose(Handler<AsyncResult<Void>> handler)
: Closes the file asynchronously. Once the file is closed, any attempt to read or write it will result in an error.
Reading a file asynchronously might look like this:
FileSystem fileSystem = vertx.fileSystem();
String filePath = "path/to/your/file.txt";
fileSystem.open(filePath, new OpenOptions(), openRes -> {
if (openRes.succeeded()) {
AsyncFile file = openRes.result();
Buffer buffer = Buffer.buffer(1024); // размер буфера для чтения
file.read(buffer, 0, 0, 1024, readRes -> {
if (readRes.succeeded()) {
System.out.println("File content: " + buffer.toString());
file.close(voidAsyncResult -> {});
} else {
System.err.println("Error reading file: " + readRes.cause().getMessage());
}
});
} else {
System.err.println("Could not open file: " + openRes.cause().getMessage());
}
});
Asynchronous file writing:
Buffer data = Buffer.buffer("This is some data to be written to the file.");
fileSystem.open(filePath, new OpenOptions().setWrite(true), openRes -> {
if (openRes.succeeded()) {
AsyncFile file = openRes.result();
file.write(data, 0, writeRes -> {
if (writeRes.succeeded()) {
System.out.println("Data written successfully!");
file.flush(flushRes -> {
if (flushRes.succeeded()) {
System.out.println("Data flushed to disk.");
}
file.close(voidAsyncResult -> {});
});
} else {
System.err.println("Failed to write data: " + writeRes.cause().getMessage());
}
});
} else {
System.err.println("Could not open file: " + openRes.cause().getMessage());
}
});
To open a file asynchronously, use the method open
class FileSystem
. Method returns Future<AsyncFile>
which allows you to work with the file asynchronously:
FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";
fs.open(path, new OpenOptions(), result -> {
if (result.succeeded()) {
AsyncFile file = result.result();
// можно работать с AsyncFile
} else {
// обработка ошибки
System.err.println("Error opening file: " + result.cause().getMessage());
}
});
Processing results via Future
In Vert.x Future<T>
is used to represent a result that will be available later. It may succeed with a result like T
or fail with an error.
Let's create a simple one Future
:
Future<String> future = Future.future(promise -> {
// асинхронная операция
vertx.setTimer(1000, id -> promise.complete("Operation completed"));
});
To process the results Future
has methods onSuccess
, onFailure
And compose
.
onSuccess
called if Future
completed successfully and onFailure
– in case of error:
future.onSuccess(result -> {
System.out.println("Result: " + result);
}).onFailure(error -> {
System.err.println("Failed: " + error.getMessage());
});
compose
allows you to create a chain of asynchronous operations, where each subsequent operation begins after the successful completion of the previous one:
Future<String> future1 = Future.future(promise -> {
vertx.setTimer(1000, id -> promise.complete("First operation"));
});
future1.compose(result -> {
// код выполнится после future1
return Future.future(promise -> vertx.setTimer(1000, id -> promise.complete(result + ", second operation")));
}).onSuccess(result -> System.out.println("Result: " + result));
Example with reading a file:
FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";
Future<Buffer> readFileFuture = fs.readFile(path);
readFileFuture.onSuccess(buffer -> {
System.out.println("File content: " + buffer.toString());
}).onFailure(Throwable::printStackTrace);
Let's say there are two asynchronous operations: the first one reads a file, and the second one writes the contents to another file:
String readPath = "path/to/read.txt";
String writePath = "path/to/write.txt";
fs.readFile(readPath).compose(buffer -> {
// после успешного чтения файла записываем содержимое в другой файл
return fs.writeFile(writePath, buffer);
}).onSuccess(v -> System.out.println("File was successfully copied"))
.onFailure(Throwable::printStackTrace);
Future
can be used to compose the results of multiple asynchronous operations:
Future<String> future1 = Future.succeededFuture("Hello");
Future<String> future2 = Future.succeededFuture("World");
CompositeFuture.all(future1, future2).onSuccess(composite -> {
String result = composite.resultAt(0) + " " + composite.resultAt(1);
System.out.println(result); // "Hello World"
});
Pump and CompositeFuture
Pump
in Vert.x is a utility that helps transfer data from ReadStream
V WriteStream
automatically controlling the back pressure:
FileSystem fs = vertx.fileSystem();
String sourcePath = "path/to/source/file";
String destPath = "path/to/dest/file";
fs.open(sourcePath, new OpenOptions().setRead(true), readResult -> {
if (readResult.succeeded()) {
AsyncFile readFile = readResult.result();
fs.open(destPath, new OpenOptions().setWrite(true), writeResult -> {
if (writeResult.succeeded()) {
AsyncFile writeFile = writeResult.result();
Pump pump = Pump.pump(readFile, writeFile);
readFile.endHandler(v -> writeFile.close());
writeFile.endHandler(v -> readFile.close());
pump.start();
} else {
System.err.println("Failed to open destination file: " + writeResult.cause());
}
});
} else {
System.err.println("Failed to open source file: " + readResult.cause());
}
});
Pump.pump(readFile, writeFile)
creates an instance Pump
which automatically reads from readFile
and writes to writeFile
by controlling back pressure between streams
CompositeFuture
allows you to group several asynchronous operations and process their results as a single whole:
FileSystem fs = vertx.fileSystem();
List<Future> futures = new ArrayList<>();
// путь к файлам, которые нужно прочитать
List<String> filePaths = Arrays.asList("path/to/file1", "path/to/file2", "path/to/file3");
for (String path : filePaths) {
Future<Buffer> future = fs.readFile(path).future();
futures.add(future);
}
CompositeFuture.all(futures).onComplete(ar -> {
if (ar.succeeded()) {
for (int i = 0; i < filePaths.size(); i++) {
System.out.println("Content of " + filePaths.get(i) + ": " + ar.result().resultAt(i));
}
} else {
System.err.println("Failed to read one or more files: " + ar.cause());
}
});
CompositeFuture.all(futures)
used to combine multiple asynchronous file read operations. The results of each operation are available via ar.result().resultAt(i)
Where i
— index of the operation in the source list.
A small example
Let's implement a server that will support uploading and downloading files via HTTP:
Create a new Maven project and add the Vert.x Web dependency to pom.xml
:
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>4.2.1</version> <!-- Используйте актуальную версию -->
</dependency>
</dependencies>
Create a class FileServerVerticle
which expands AbstractVerticle
. In method start
We define routes for uploading and downloading files:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.file.FileUpload;
public class FileServerVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);
// директория для сохранения загруженных файлов
String uploadDir = "uploads";
// обработчик для загрузки файлов
router.route("/upload").handler(BodyHandler.create().setUploadsDirectory(uploadDir));
router.post("/upload").handler(this::handleFileUpload);
// обработчик для скачивания файлов
router.get("/download/:fileName").handler(this::handleFileDownload);
vertx.createHttpServer().requestHandler(router).listen(8888, http -> {
if (http.succeeded()) {
startPromise.complete();
System.out.println("HTTP server started on port 8888");
} else {
startPromise.fail(http.cause());
}
});
}
private void handleFileUpload(RoutingContext context) {
for (FileUpload fileUpload : context.fileUploads()) {
System.out.println("Received file: " + fileUpload.fileName());
// логика обработки файла
}
context.response().setStatusCode(200).end("File uploaded");
}
private void handleFileDownload(RoutingContext context) {
String fileName = context.pathParam("fileName");
String fileLocation = "uploads/" + fileName;
context.response().sendFile(fileLocation, result -> {
if (result.failed()) {
context.response().setStatusCode(404).end("File not found");
}
});
}
}
Create the main class to launch the vertical:
import io.vertx.core.Vertx;
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new FileServerVerticle(), res -> {
if (res.succeeded()) {
System.out.println("FileServerVerticle deployed successfully.");
} else {
System.err.println("Failed to deploy FileServerVerticle: " + res.cause());
}
});
}
}
Start the server using curl:
curl -F "file=@path/to/your/file.txt" http://localhost:8888/upload
In anticipation of the start specializations Java developer I would like to recommend you several free webinars on the following topics: