How to Sync MySQL with Elasticsearch Using Logstash
In this tutorial, we’ll demonstrate how to use Logstash to sync data from MySQL to Elasticsearch. Logstash is a powerful data processing pipeline that enables you to ingest data from various sources, transform it, and then send it to your desired destination, such as Elasticsearch.
Prerequisites
Before we begin, ensure we have the following:
- MySQL is installed and running with a database containing the data we want to sync.
- Elasticsearch is installed and running.
- Docker is installed and running.
If you haven’t gone through our previous tutorials, make sure to check them out to get started with the basics.
Setup Logstash with Docker
Running Logstash using Docker is a convenient way to manage and deploy Logstash instances in a containerized environment.
Step 1: Install Docker
If you haven’t already installed Docker, you can download and install it from the official Docker website.
Step 2: Download MySQL Connector/J
Download the platform-independent version of MySQL Connector/J from the MySQL Connector/J download page.
- Create a folder named
logstash
. - Extract the
mysql-connector-java-x.x.x.jar
from the downloaded ZIP archive. - Copy the JAR file into the
logstash
directory.
Step 3: Create Logstash Configuration File
Create a Logstash configuration file (e.g., mysql.conf
) into logstash
directory. This file will define our input (MySQL) and output (Elasticsearch) configurations. Here’s an example:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://docker.for.mac.host.internal:3306/test"
jdbc_user => "root"
jdbc_password => ""
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-9.0.0.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT * FROM products"
schedule => "* * * * *"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "products"
document_id => "%{id}"
}
}
The schedule
option in Logstash determines how often the input plugin (in this case, jdbc
) should execute the SQL query defined in the statement
. It uses a cron-like syntax to specify the frequency of data fetching and processing. Here’s a breakdown:
- Syntax:
schedule => "* * * * *"
- Meaning: The cron syntax
* * * * *
means executing every minute. Each asterisk (*
) represents a time unit:- Minute (
*
) - Hour (
*
) - Day of month (
*
) - Month (
*
) - Day of week (
*
)
- Minute (
We can customize the schedule
to suit our specific syncing frequency requirements. For example, "*/5 * * * *"
would run every 5 minutes.
The statement
option in the jdbc
input plugin specifies the SQL query that Logstash will execute against our MySQL database. We can customize this query to select specific columns or apply filters as needed:
statement => "SELECT id, name, price FROM products WHERE price > 100"
Step 4: Create Dockerfile for Logstash
Create a Dockerfile
to build a custom Logstash Docker image with our configuration file:
# Use the official Logstash image from Docker Hub
FROM docker.elastic.co/logstash/logstash:8.14.1
# Copy your Logstash configuration file into the container
COPY mysql.conf /usr/share/logstash/mysql.conf
ENV LS_HOME /usr/share/logstash
# Install MySQL JDBC driver
RUN logstash-plugin install logstash-integration-jdbc
COPY mysql-connector-j-9.0.0.jar ${LS_HOME}/logstash-core/lib/jars/
# Start Logstash and load the configuration
CMD ["logstash", "-f", "/usr/share/logstash/mysql.conf"]
Save this Dockerfile
in the logstash
directory along with our mysql.conf
configuration file mysql-connector-java-x.x.x.ja
r.
Step 5: Build Logstash Docker Image
Open a terminal, navigate to the directory containing our Dockerfile
and mysql.conf
, and build the Logstash Docker image:
docker build -t elasticsearch-logstash-image .
Step 6: Create Docker Network
To allow communication between the Logstash and Elasticsearch containers, create a Docker network:
docker network create my-network
Step 7: Run Logstash Docker Container
Once the Docker image is built, we can run Logstash as a Docker container:
docker run -d --name logstash --network my-network elasticsearch-logstash-image
Step 8: Verify Logstash Container
Check the logs of our running Logstash container to ensure it started successfully:
docker logs logstash
We should see Logstash logs indicating successful startup and data processing.
...
Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
Combining Spring Data JPA and Elasticsearch
We’ll integrate Spring Data JPA repositories for CRUD operations and Elasticsearch repositories for search functionality. Here’s how to modify our service, model, and controller:
Step 1: Add Maven Dependency
Add the necessary dependencies to our pom.xml
:
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.4.0</version>
</dependency>
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<version>3.1.0</version>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.3.1</version>
</dependency>
Step 2: Modify Product
Entity for JPA
First, update our Product
entity to use Spring Data JPA annotations for persistence:
@Entity
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String description;
private double price;
// Getters and setters (omitted for brevity)
// Constructors (omitted for brevity)
}
Step 3: Create a Spring Data JPA Repository
Create a repository interface that extends JpaRepository
to handle CRUD operations for Product
entities:
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
// Add custom query methods if needed
}
Step 3: Modify ProductService
to Use JPA
Now, update our ProductService
to delegate CRUD operations to the ProductRepository
:
@Service
public class ProductService {
private final ProductRepository productRepository;
@Autowired
public ProductService(ProductRepository productRepository) {
this.productRepository = productRepository;
}
// Create Product
public Product createProduct(Product product) {
return productRepository.save(product);
}
// Get Product by ID
public Product getProduct(Long id) {
Optional<Product> optionalProduct = productRepository.findById(id);
return optionalProduct.orElse(null);
}
// Update Product
public Product updateProduct(Product product) {
// Check if product exists in database
if (productRepository.existsById(product.getId())) {
return productRepository.save(product);
}
return null; // Handle not found case
}
// Delete Product
public boolean deleteProduct(Long id) {
// Check if product exists in database
if (productRepository.existsById(id)) {
productRepository.deleteById(id);
return true;
}
return false; // Handle not found case
}
// Additional methods if needed, e.g., findAll, findByCriteria, etc.
}
Step 4: Modify ProductController
Next, update the ProductController
@RestController
@RequestMapping("/products")
public class ProductController {
@Autowired
private ProductService productService;
// Create Product
@PostMapping
public ResponseEntity<Product> createProduct(@RequestBody Product product) {
Product createdProduct = productService.createProduct(product);
return ResponseEntity.ok(createdProduct);
}
// Get Product by ID
@GetMapping("/{id}")
public ResponseEntity<Product> getProduct(@PathVariable Long id) {
Product product = productService.getProduct(id);
if (product != null) {
return ResponseEntity.ok(product);
} else {
return ResponseEntity.notFound().build();
}
}
// Update Product
@PutMapping("/{id}")
public ResponseEntity<Product> updateProduct(@PathVariable Long id, @RequestBody Product product) {
product.setId(id);
Product updatedProduct = productService.updateProduct(product);
if (updatedProduct != null) {
return ResponseEntity.ok(updatedProduct);
} else {
return ResponseEntity.notFound().build();
}
}
// Delete Product
@DeleteMapping("/{id}")
public ResponseEntity<String> deleteProduct(@PathVariable Long id) {
boolean isDeleted = productService.deleteProduct(id);
if (isDeleted) {
return ResponseEntity.ok("Product deleted successfully");
} else {
return ResponseEntity.notFound().build(); // Or handle differently as needed
}
}
}
Adding Elasticsearch Search/Filter Methods
To implement search or filter functionality using Elasticsearch in our ProductService
, we’ll use the Elasticsearch Java High-Level REST Client to perform queries. Here’s how we can enhance our ProductService
to include Elasticsearch queries for searching or filtering products based on certain criteria:
public List<Product> searchProductsByName(String name) throws IOException {
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name", name));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
return mapSearchResponseToProducts(searchResponse);
}
// Helper method to map SearchResponse to Product objects
private List<Product> mapSearchResponseToProducts(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits();
List<Product> products = new ArrayList<>();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
products.add(objectMapper.convertValue(sourceAsMap, Product.class));
}
return products;
}
We need to create corresponding endpoints in the controller.
@GetMapping("/search")
public ResponseEntity<List<Product>> searchProducts(@RequestParam String query) {
List<Product> products = productService.searchProductsByName(query);
return ResponseEntity.ok(products);
}
Test the Data Syncing
Start the Elastisearch container:
docker run -d --name elasticsearch --network my-network -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.17.22
Now that Logstash, Elasticsearch and Spring Boot applications are running, we can proceed to test the CRUD operations using curl
commands as previously outlined.
Create a Product:
curl -X POST -H "Content-Type: application/json" -d '{
"name": "Product A",
"description": "This is a product A",
"category": "CategoryA",
"price": 99.99
}' http://localhost:8080/products
Response:
{"id":1,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}%
Based on the logs from the Logstash Docker container, we can observe that Logstash is consistently syncing the products
table at regular intervals.
2024-07-03 15:52:00 [2024-07-03T07:52:00,970][INFO ][logstash.inputs.jdbc ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.092263s) SELECT * FROM products
2024-07-03 15:53:01 [2024-07-03T07:53:01,434][INFO ][logstash.inputs.jdbc ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.019784s) SELECT * FROM products
2024-07-03 15:54:00 [2024-07-03T07:54:00,705][INFO ][logstash.inputs.jdbc ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.005036s) SELECT * FROM products
Let’s try to search Product A
:
curl -X GET "http://localhost:8080/products/search?query=Product%20A"
[
{
"id": 3,
"category": "CategoryA",
"name": "Product A",
"description": "This is a product A",
"price": 99.99
},
{
"id": 1,
"category": "CategoryB",
"name": "Product C",
"description": "This is a product C",
"price": 50
},
{
"id": 2,
"category": "CategoryB",
"name": "Product B",
"description": "This is a product B",
"price": 209
}
]
Wait a minute! Why more than one record is returned? Let’s analyse the search response:
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 0.5287602,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "3",
"_score": 0.5287602,
"_source": {
"@timestamp": "2024-07-03T08:04:00.415032301Z",
"category": "CategoryA",
"@version": "1",
"id": 3,
"price": 99.99,
"description": "This is a product A",
"name": "Product A"
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_score": 0.026668247,
"_source": {
"@timestamp": "2024-07-03T08:04:00.413867717Z",
"category": "CategoryB",
"@version": "1",
"id": 1,
"price": 50,
"description": "This is a product C",
"name": "Product C"
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "2",
"_score": 0.026668247,
"_source": {
"@timestamp": "2024-07-03T08:04:00.414950217Z",
"category": "CategoryB",
"@version": "1",
"id": 2,
"price": 209,
"description": "This is a product B",
"name": "Product B"
}
}
]
}
}
The search response indicates that Elasticsearch is correctly matching the document with the name “Product A” but it also returns other products with lower scores. This is expected behavior with the matchQuery
because it performs a full-text search and includes documents with lower relevance scores.
To ensure we only get products that exactly match “Product A,” we can use the keyword
subfield for exact matching. The keyword
field is not analyzed, meaning it stores the exact string and is suitable for exact match queries.
Here’s how we can adjust our search method to query the name.keyword
subfield instead:
public List<Product> searchProductsByName(String name) throws IOException {
System.out.println(name);
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name.keyword", name)); // Use "name.keyword" for exact match
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("Search response: " + searchResponse.toString());
return mapSearchResponseToProducts(searchResponse);
}
Adjusting the Mapping (if needed)
Before we able to use the keyword
subfield, we need to ensure our Elasticsearch mapping includes a keyword
subfield for name
. If not already present, we need to reindex our data with the appropriate mapping:
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
...
}
}
}
To check the mapping of the name
field in our Elasticsearch index. We can do this using the following command:
curl -X GET "http://localhost:9200/products/_mapping?pretty"
...
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
...
This means that name
is stored as both text
and keyword
types:
text
type: Analyzed field used for full-text search. This is suitable for natural language processing and allows Elasticsearch to tokenize and analyze the content for efficient searching.keyword
type: Exact value field. This is used for sorting, aggregations, and exact match queries. It stores the exact value without analyzing it, making it suitable for searches that require exact matches.
Let’s try querying again. We should see the correct response this time:
[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}]%
Using termQuery
for Exact Match
In fact, to ensure we only get products that exactly match “Product A,, we should use a termQuery
for an exact match. However, the termQuery
requires that the field is not analyzed. Given that we have the name
field as text
and potentially analyzed, we might want to use the keyword
subfield for exact matching or else it will return empty:
public List<Product> searchProductsByName(String name) throws IOException {
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("name", name));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
return mapSearchResponseToProducts(searchResponse);
}
Try to access Product A
, we should expect an empty response:
curl -X GET "http://localhost:8080/products/search?query=Product%20A"
[]
Setting Elasticsearch Index Mappings
By default, if we never define the mapping to the fields, Elasticsearch will create a dynamic mapping with both Text
and Keyword
data type.
...
"category": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"description": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
...
However, it’s essential to define appropriate field types to optimize search and indexing performance. Here’s how we can configure mappings for name, category and description fields:
Step 1: Create Elasticsearch Index Template File
Create a JSON file (products_template.json
) with the following content. This template defines mappings for fields like “name” as “keyword” and “description” as “text”:
{
"index_patterns": ["products_v2"],
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"category": {
"type": "keyword",
},
"description": {
"type": "text"
},
"id": {
"type": "long"
},
"name": {
"type": "keyword"
},
"price": {
"type": "float"
}
}
}
}
Step 2: Load the Template into Elasticsearch
We can load this template into Elasticsearch using the following curl
command:
curl -X PUT "http://localhost:9200/_template/products_template" -H 'Content-Type: application/json' -d @products_template.json
This command uploads the index template (products_template.json
) to Elasticsearch under the name products_template
:
{"acknowledged":true}%
Step 3: Update Logstash Configuration to Use the Template
Modify our Logstash configuration file (mysql.conf
) to instruct Logstash to use the index template products_template
:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://docker.for.mac.host.internal:3306/test"
jdbc_user => "root"
jdbc_password => ""
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-9.0.0.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
statement => "SELECT * FROM products"
schedule => "* * * * *"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "products_v2"
document_id => "%{id}"
template_name => "products_template"
template_overwrite => false
}
}
By following these steps, new indices created by Logstash (products
_v2) will follow the mappings defined in our products_template.json
file. Existing indices will retain their current mappings unless explicitly updated.
Let’s restart the Logstash and Elasticsearch containers and then run the search query again:
curl -X GET "http://localhost:8080/products/search?query=Product%20A"
[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}]%
Update Record
Use curl to send a PUT request to update a product. Replace {id}
with the actual ID of the product we want to update, and provide the updated JSON payload for the product:
curl -X PUT "http://localhost:8080/products/1" -H 'Content-Type: application/json' -d '
{
"name": "Product A",
"category": "Updated Category",
"description": "Updated Description",
"price": 199.99
}
After updating, we can verify if the changes are reflected in Elasticsearch using the curl
command to query:
curl -X GET "http://localhost:8080/products/search?query=Product%20A"
[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99},{"id":1,"category":"Updated Category","name":"Product A","description":"Updated Description","price":199.99}]%
Conclusion
In this tutorial, we’ve covered how to utilize Logstash for synchronizing MySQL data with Elasticsearch and briefly explored index mapping templates and basic querying techniques. In the upcoming tutorial, we’ll dive deeper into understanding text
and keyword
mappings, as well as exploring more advanced query capabilities. These topics will enhance our understanding and proficiency in Elasticsearch integration. The full source code is available on GitHub.
Share this content:
Leave a Comment