Asynchronous work with the file system in Vert.x

Vert.x is an ecosystem to build reactive applications on the JVM that can scale and process huge amounts of data in real time. This is a polyglot platform that supports not only Java, but also Kotlin, Groovy, Scala, and js. In the context of the article we will work in Java

One of the features of vert.x is set of abstractions And API for asynchronous work with the network, file system and other resources. With it, you can easily create scalable web applications, microservices, network utilities and much more.

File system

In Vert.x, all work with the file system is done through the class FileSystem.

To create a file you can use the method createFilewhich asynchronously creates a new file:

FileSystem fs = vertx.fileSystem();
fs.createFile("myfile.txt", result -> {
  if (result.succeeded()) {
    System.out.println("Файл успешно создан.");
  } else {
    System.err.println("Ошибка при создании файла: " + result.cause());
  }
});

Similarly, to delete a file, use the method delete:

fs.delete("myfile.txt", result -> {
  if (result.succeeded()) {
    System.out.println("Файл успешно удален.");
  } else {
    System.err.println("Ошибка при удалении файла: " + result.cause());
  }
});

There is a method to read the contents of the Vert.x file readFilewhich returns the contents of the file as an object Bufferwhich can easily be converted to a string or byte array:

fs.readFile("myfile.txt", result -> {
  if (result.succeeded()) {
    Buffer buffer = result.result();
    System.out.println("Содержимое файла: " + buffer.toString());
  } else {
    System.err.println("Ошибка при чтении файла: " + result.cause());
  }
});

To write to a file, use the method writeFilewhich takes a file path and data in the form Buffer:

Buffer data = Buffer.buffer("Hello, Vert.x!");
fs.writeFile("myfile.txt", data, result -> {
  if (result.succeeded()) {
    System.out.println("Данные успешно записаны в файл.");
  } else {
    System.err.println("Ошибка при записи в файл: " + result.cause());
  }
});

AsyncFile

AsyncFile in Vert.x is an interface for asynchronously working with files. It allows you to perform various operations with files: reading and writing, asynchronously, i.e. without blocking the thread in which the operation is performed.

Basic methods:

  • read(Buffer buffer, int offset, long position, int length, Handler<AsyncResult<Buffer>> handler): Asynchronously reads data from a file to Buffer. offset indicates the position in the buffer where the data will be written, position is the position in the file from which reading will begin, and length — number of bytes to read.

  • write(Buffer buffer, long position, Handler<AsyncResult<Void>> handler): Asynchronously writes data from Buffer to the file starting from the specified one position.

  • flush(Handler<AsyncResult<Void>> handler): Asynchronously flushes data from a buffer to a file

  • close(Handler<AsyncResult<Void>> handler): Closes the file asynchronously. Once the file is closed, any attempt to read or write it will result in an error.

Reading a file asynchronously might look like this:

FileSystem fileSystem = vertx.fileSystem();
String filePath = "path/to/your/file.txt";

fileSystem.open(filePath, new OpenOptions(), openRes -> {
  if (openRes.succeeded()) {
    AsyncFile file = openRes.result();
    Buffer buffer = Buffer.buffer(1024); // размер буфера для чтения
    file.read(buffer, 0, 0, 1024, readRes -> {
      if (readRes.succeeded()) {
        System.out.println("File content: " + buffer.toString());
        file.close(voidAsyncResult -> {});
      } else {
        System.err.println("Error reading file: " + readRes.cause().getMessage());
      }
    });
  } else {
    System.err.println("Could not open file: " + openRes.cause().getMessage());
  }
});

Asynchronous file writing:

Buffer data = Buffer.buffer("This is some data to be written to the file.");
fileSystem.open(filePath, new OpenOptions().setWrite(true), openRes -> {
  if (openRes.succeeded()) {
    AsyncFile file = openRes.result();
    file.write(data, 0, writeRes -> {
      if (writeRes.succeeded()) {
        System.out.println("Data written successfully!");
        file.flush(flushRes -> {
          if (flushRes.succeeded()) {
            System.out.println("Data flushed to disk.");
          }
          file.close(voidAsyncResult -> {});
        });
      } else {
        System.err.println("Failed to write data: " + writeRes.cause().getMessage());
      }
    });
  } else {
    System.err.println("Could not open file: " + openRes.cause().getMessage());
  }
});

To open a file asynchronously, use the method open class FileSystem. Method returns Future<AsyncFile>which allows you to work with the file asynchronously:

FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";

fs.open(path, new OpenOptions(), result -> {
  if (result.succeeded()) {
    AsyncFile file = result.result();
    // можно работать с AsyncFile
  } else {
    // обработка ошибки
    System.err.println("Error opening file: " + result.cause().getMessage());
  }
});

Processing results via Future

In Vert.x Future<T> is used to represent a result that will be available later. It may succeed with a result like T or fail with an error.

Let's create a simple one Future:

Future<String> future = Future.future(promise -> {
  // асинхронная операция
  vertx.setTimer(1000, id -> promise.complete("Operation completed"));
});

To process the results Future has methods onSuccess, onFailureAnd compose.

onSuccess called if Future completed successfully and onFailure – in case of error:

future.onSuccess(result -> {
  System.out.println("Result: " + result);
}).onFailure(error -> {
  System.err.println("Failed: " + error.getMessage());
});

compose allows you to create a chain of asynchronous operations, where each subsequent operation begins after the successful completion of the previous one:

Future<String> future1 = Future.future(promise -> {
  vertx.setTimer(1000, id -> promise.complete("First operation"));
});

future1.compose(result -> {
  // код выполнится после future1
  return Future.future(promise -> vertx.setTimer(1000, id -> promise.complete(result + ", second operation")));
}).onSuccess(result -> System.out.println("Result: " + result));

Example with reading a file:

FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";

Future<Buffer> readFileFuture = fs.readFile(path);

readFileFuture.onSuccess(buffer -> {
  System.out.println("File content: " + buffer.toString());
}).onFailure(Throwable::printStackTrace);

Let's say there are two asynchronous operations: the first one reads a file, and the second one writes the contents to another file:

String readPath = "path/to/read.txt";
String writePath = "path/to/write.txt";

fs.readFile(readPath).compose(buffer -> {
  // после успешного чтения файла записываем содержимое в другой файл
  return fs.writeFile(writePath, buffer);
}).onSuccess(v -> System.out.println("File was successfully copied"))
  .onFailure(Throwable::printStackTrace);

Future can be used to compose the results of multiple asynchronous operations:

Future<String> future1 = Future.succeededFuture("Hello");
Future<String> future2 = Future.succeededFuture("World");

CompositeFuture.all(future1, future2).onSuccess(composite -> {
  String result = composite.resultAt(0) + " " + composite.resultAt(1);
  System.out.println(result); // "Hello World"
});

Pump and CompositeFuture

Pump in Vert.x is a utility that helps transfer data from ReadStream V WriteStreamautomatically controlling the back pressure:

FileSystem fs = vertx.fileSystem();
String sourcePath = "path/to/source/file";
String destPath = "path/to/dest/file";

fs.open(sourcePath, new OpenOptions().setRead(true), readResult -> {
  if (readResult.succeeded()) {
    AsyncFile readFile = readResult.result();
    fs.open(destPath, new OpenOptions().setWrite(true), writeResult -> {
      if (writeResult.succeeded()) {
        AsyncFile writeFile = writeResult.result();
        Pump pump = Pump.pump(readFile, writeFile);
        readFile.endHandler(v -> writeFile.close());
        writeFile.endHandler(v -> readFile.close());
        pump.start();
      } else {
        System.err.println("Failed to open destination file: " + writeResult.cause());
      }
    });
  } else {
    System.err.println("Failed to open source file: " + readResult.cause());
  }
});

Pump.pump(readFile, writeFile) creates an instance Pumpwhich automatically reads from readFile and writes to writeFileby controlling back pressure between streams

CompositeFuture allows you to group several asynchronous operations and process their results as a single whole:

FileSystem fs = vertx.fileSystem();
List<Future> futures = new ArrayList<>();

// путь к файлам, которые нужно прочитать
List<String> filePaths = Arrays.asList("path/to/file1", "path/to/file2", "path/to/file3");

for (String path : filePaths) {
  Future<Buffer> future = fs.readFile(path).future();
  futures.add(future);
}

CompositeFuture.all(futures).onComplete(ar -> {
  if (ar.succeeded()) {
    for (int i = 0; i < filePaths.size(); i++) {
      System.out.println("Content of " + filePaths.get(i) + ": " + ar.result().resultAt(i));
    }
  } else {
    System.err.println("Failed to read one or more files: " + ar.cause());
  }
});

CompositeFuture.all(futures) used to combine multiple asynchronous file read operations. The results of each operation are available via ar.result().resultAt(i)Where i — index of the operation in the source list.

A small example

Let's implement a server that will support uploading and downloading files via HTTP:

Create a new Maven project and add the Vert.x Web dependency to pom.xml:

<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-web</artifactId>
        <version>4.2.1</version> <!-- Используйте актуальную версию -->
    </dependency>
</dependencies>

Create a class FileServerVerticlewhich expands AbstractVerticle. In method start We define routes for uploading and downloading files:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.file.FileUpload;

public class FileServerVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    Router router = Router.router(vertx);

    // директория для сохранения загруженных файлов
    String uploadDir = "uploads";

    // обработчик для загрузки файлов
    router.route("/upload").handler(BodyHandler.create().setUploadsDirectory(uploadDir));
    router.post("/upload").handler(this::handleFileUpload);

    // обработчик для скачивания файлов
    router.get("/download/:fileName").handler(this::handleFileDownload);

    vertx.createHttpServer().requestHandler(router).listen(8888, http -> {
      if (http.succeeded()) {
        startPromise.complete();
        System.out.println("HTTP server started on port 8888");
      } else {
        startPromise.fail(http.cause());
      }
    });
  }

  private void handleFileUpload(RoutingContext context) {
    for (FileUpload fileUpload : context.fileUploads()) {
      System.out.println("Received file: " + fileUpload.fileName());
      // логика обработки файла
    }
    context.response().setStatusCode(200).end("File uploaded");
  }

  private void handleFileDownload(RoutingContext context) {
    String fileName = context.pathParam("fileName");
    String fileLocation = "uploads/" + fileName;
    context.response().sendFile(fileLocation, result -> {
      if (result.failed()) {
        context.response().setStatusCode(404).end("File not found");
      }
    });
  }
}

Create the main class to launch the vertical:

import io.vertx.core.Vertx;

public class Main {

  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    vertx.deployVerticle(new FileServerVerticle(), res -> {
      if (res.succeeded()) {
        System.out.println("FileServerVerticle deployed successfully.");
      } else {
        System.err.println("Failed to deploy FileServerVerticle: " + res.cause());
      }
    });
  }
}

Start the server using curl:

curl -F "file=@path/to/your/file.txt" http://localhost:8888/upload

In anticipation of the start specializations Java developer I would like to recommend you several free webinars on the following topics:

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *