How to Sync MySQL with Elasticsearch Using Logstash

How to Sync MySQL with Elasticsearch Using Logstash

Syncing MySQL with Elasticsearch using Logstash

In this tutorial, we’ll demonstrate how to use Logstash to sync data from MySQL to Elasticsearch. Logstash is a powerful data processing pipeline that enables you to ingest data from various sources, transform it, and then send it to your desired destination, such as Elasticsearch.

Prerequisites

Before we begin, ensure we have the following:

  • MySQL is installed and running with a database containing the data we want to sync.
  • Elasticsearch is installed and running.
  • Docker is installed and running.

If you haven’t gone through our previous tutorials, make sure to check them out to get started with the basics.

Setup Logstash with Docker

Running Logstash using Docker is a convenient way to manage and deploy Logstash instances in a containerized environment.

Step 1: Install Docker

If you haven’t already installed Docker, you can download and install it from the official Docker website.

Step 2: Download MySQL Connector/J

Download the platform-independent version of MySQL Connector/J from the MySQL Connector/J download page.

  1. Create a folder named logstash.
  2. Extract the mysql-connector-java-x.x.x.jar from the downloaded ZIP archive.
  3. Copy the JAR file into the logstash directory.

Step 3: Create Logstash Configuration File

Create a Logstash configuration file (e.g., mysql.conf) into logstash directory. This file will define our input (MySQL) and output (Elasticsearch) configurations. Here’s an example:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://docker.for.mac.host.internal:3306/test"
    jdbc_user => "root"
    jdbc_password => ""
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-9.0.0.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    statement => "SELECT * FROM products"
    schedule => "* * * * *"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "products"
    document_id => "%{id}"
  }
}

The schedule option in Logstash determines how often the input plugin (in this case, jdbc) should execute the SQL query defined in the statement. It uses a cron-like syntax to specify the frequency of data fetching and processing. Here’s a breakdown:

  • Syntax: schedule => "* * * * *"
  • Meaning: The cron syntax * * * * * means executing every minute. Each asterisk (*) represents a time unit:
    • Minute (*)
    • Hour (*)
    • Day of month (*)
    • Month (*)
    • Day of week (*)

We can customize the schedule to suit our specific syncing frequency requirements. For example, "*/5 * * * *" would run every 5 minutes.

The statement option in the jdbc input plugin specifies the SQL query that Logstash will execute against our MySQL database. We can customize this query to select specific columns or apply filters as needed:

statement => "SELECT id, name, price FROM products WHERE price > 100"

Step 4: Create Dockerfile for Logstash

Create a Dockerfile to build a custom Logstash Docker image with our configuration file:

# Use the official Logstash image from Docker Hub
FROM docker.elastic.co/logstash/logstash:8.14.1

# Copy your Logstash configuration file into the container
COPY mysql.conf /usr/share/logstash/mysql.conf

ENV LS_HOME /usr/share/logstash

# Install MySQL JDBC driver
RUN logstash-plugin install logstash-integration-jdbc

COPY mysql-connector-j-9.0.0.jar ${LS_HOME}/logstash-core/lib/jars/

# Start Logstash and load the configuration
CMD ["logstash", "-f", "/usr/share/logstash/mysql.conf"]

Save this Dockerfile in the logstash directory along with our mysql.conf configuration file mysql-connector-java-x.x.x.jar.

Step 5: Build Logstash Docker Image

Open a terminal, navigate to the directory containing our Dockerfile and mysql.conf, and build the Logstash Docker image:

docker build -t elasticsearch-logstash-image .

Step 6: Create Docker Network

To allow communication between the Logstash and Elasticsearch containers, create a Docker network:

docker network create my-network

Step 7: Run Logstash Docker Container

Once the Docker image is built, we can run Logstash as a Docker container:

docker run -d --name logstash --network my-network elasticsearch-logstash-image

Step 8: Verify Logstash Container

Check the logs of our running Logstash container to ensure it started successfully:

docker logs logstash

We should see Logstash logs indicating successful startup and data processing.

...
Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}

Combining Spring Data JPA and Elasticsearch

We’ll integrate Spring Data JPA repositories for CRUD operations and Elasticsearch repositories for search functionality. Here’s how to modify our service, model, and controller:

Step 1: Add Maven Dependency

Add the necessary dependencies to our pom.xml:

       <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.4.0</version>
        </dependency>
        <dependency>
            <groupId>jakarta.persistence</groupId>
            <artifactId>jakarta.persistence-api</artifactId>
            <version>3.1.0</version>
        </dependency>

        <!-- Spring Data JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
            <version>3.3.1</version>
        </dependency>

Step 2: Modify Product Entity for JPA

First, update our Product entity to use Spring Data JPA annotations for persistence:

@Entity
public class Product {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String name;
    private String description;
    private double price;

    // Getters and setters (omitted for brevity)

    // Constructors (omitted for brevity)
}

Step 3: Create a Spring Data JPA Repository

Create a repository interface that extends JpaRepository to handle CRUD operations for Product entities:

@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
    // Add custom query methods if needed
}

Step 3: Modify ProductService to Use JPA

Now, update our ProductService to delegate CRUD operations to the ProductRepository:

@Service
public class ProductService {

    private final ProductRepository productRepository;

    @Autowired
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    // Create Product
    public Product createProduct(Product product) {
        return productRepository.save(product);
    }

    // Get Product by ID
    public Product getProduct(Long id) {
        Optional<Product> optionalProduct = productRepository.findById(id);
        return optionalProduct.orElse(null);
    }

    // Update Product
    public Product updateProduct(Product product) {
        // Check if product exists in database
        if (productRepository.existsById(product.getId())) {
            return productRepository.save(product);
        }
        return null; // Handle not found case
    }

    // Delete Product
    public boolean deleteProduct(Long id) {
        // Check if product exists in database
        if (productRepository.existsById(id)) {
            productRepository.deleteById(id);
            return true;
        }
        return false; // Handle not found case
    }

    // Additional methods if needed, e.g., findAll, findByCriteria, etc.
}

Step 4: Modify ProductController

Next, update the ProductController

@RestController
@RequestMapping("/products")
public class ProductController {

    @Autowired
    private ProductService productService;

    // Create Product
    @PostMapping
    public ResponseEntity<Product> createProduct(@RequestBody Product product) {
        Product createdProduct = productService.createProduct(product);
        return ResponseEntity.ok(createdProduct);
    }

    // Get Product by ID
    @GetMapping("/{id}")
    public ResponseEntity<Product> getProduct(@PathVariable Long id) {
        Product product = productService.getProduct(id);
        if (product != null) {
            return ResponseEntity.ok(product);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    // Update Product
    @PutMapping("/{id}")
    public ResponseEntity<Product> updateProduct(@PathVariable Long id, @RequestBody Product product) {
        product.setId(id);
        Product updatedProduct = productService.updateProduct(product);
        if (updatedProduct != null) {
            return ResponseEntity.ok(updatedProduct);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    // Delete Product
    @DeleteMapping("/{id}")
    public ResponseEntity<String> deleteProduct(@PathVariable Long id) {
        boolean isDeleted = productService.deleteProduct(id);
        if (isDeleted) {
            return ResponseEntity.ok("Product deleted successfully");
        } else {
            return ResponseEntity.notFound().build(); // Or handle differently as needed
        }
    }
}

Adding Elasticsearch Search/Filter Methods

To implement search or filter functionality using Elasticsearch in our ProductService, we’ll use the Elasticsearch Java High-Level REST Client to perform queries. Here’s how we can enhance our ProductService to include Elasticsearch queries for searching or filtering products based on certain criteria:

public List<Product> searchProductsByName(String name) throws IOException {
    SearchRequest searchRequest = new SearchRequest(INDEX);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.query(QueryBuilders.matchQuery("name", name));
    searchRequest.source(searchSourceBuilder);

    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    return mapSearchResponseToProducts(searchResponse);
}

// Helper method to map SearchResponse to Product objects
private List<Product> mapSearchResponseToProducts(SearchResponse searchResponse) {
    SearchHit[] searchHits = searchResponse.getHits().getHits();
    List<Product> products = new ArrayList<>();
    for (SearchHit hit : searchHits) {
        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
        products.add(objectMapper.convertValue(sourceAsMap, Product.class));
    }
    return products;
}

We need to create corresponding endpoints in the controller.

@GetMapping("/search")
public ResponseEntity<List<Product>> searchProducts(@RequestParam String query) {
    List<Product> products = productService.searchProductsByName(query);
    return ResponseEntity.ok(products);
}

Test the Data Syncing

Start the Elastisearch container:

docker run -d --name elasticsearch --network my-network -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.17.22

Now that Logstash, Elasticsearch and Spring Boot applications are running, we can proceed to test the CRUD operations using curl commands as previously outlined.

Create a Product:

curl -X POST -H "Content-Type: application/json" -d '{
    "name": "Product A",
    "description": "This is a product A",
    "category": "CategoryA",
    "price": 99.99
}' http://localhost:8080/products

Response:

{"id":1,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}%   

Based on the logs from the Logstash Docker container, we can observe that Logstash is consistently syncing the products table at regular intervals.

2024-07-03 15:52:00 [2024-07-03T07:52:00,970][INFO ][logstash.inputs.jdbc     ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.092263s) SELECT * FROM products
2024-07-03 15:53:01 [2024-07-03T07:53:01,434][INFO ][logstash.inputs.jdbc     ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.019784s) SELECT * FROM products
2024-07-03 15:54:00 [2024-07-03T07:54:00,705][INFO ][logstash.inputs.jdbc     ][main][e9ccf327372b65a9353cccc8ffce13864c8b2a6a9b21885d89b3e1d562a4630d] (0.005036s) SELECT * FROM products

Let’s try to search Product A:

curl -X GET "http://localhost:8080/products/search?query=Product%20A"

[
  {
    "id": 3,
    "category": "CategoryA",
    "name": "Product A",
    "description": "This is a product A",
    "price": 99.99
  },
  {
    "id": 1,
    "category": "CategoryB",
    "name": "Product C",
    "description": "This is a product C",
    "price": 50
  },
  {
    "id": 2,
    "category": "CategoryB",
    "name": "Product B",
    "description": "This is a product B",
    "price": 209
  }
]

Wait a minute! Why more than one record is returned? Let’s analyse the search response:

{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 0.5287602,
    "hits": [
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "3",
        "_score": 0.5287602,
        "_source": {
          "@timestamp": "2024-07-03T08:04:00.415032301Z",
          "category": "CategoryA",
          "@version": "1",
          "id": 3,
          "price": 99.99,
          "description": "This is a product A",
          "name": "Product A"
        }
      },
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "1",
        "_score": 0.026668247,
        "_source": {
          "@timestamp": "2024-07-03T08:04:00.413867717Z",
          "category": "CategoryB",
          "@version": "1",
          "id": 1,
          "price": 50,
          "description": "This is a product C",
          "name": "Product C"
        }
      },
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "2",
        "_score": 0.026668247,
        "_source": {
          "@timestamp": "2024-07-03T08:04:00.414950217Z",
          "category": "CategoryB",
          "@version": "1",
          "id": 2,
          "price": 209,
          "description": "This is a product B",
          "name": "Product B"
        }
      }
    ]
  }
}

The search response indicates that Elasticsearch is correctly matching the document with the name “Product A” but it also returns other products with lower scores. This is expected behavior with the matchQuery because it performs a full-text search and includes documents with lower relevance scores.

To ensure we only get products that exactly match “Product A,” we can use the keyword subfield for exact matching. The keyword field is not analyzed, meaning it stores the exact string and is suitable for exact match queries.

Here’s how we can adjust our search method to query the name.keyword subfield instead:

public List<Product> searchProductsByName(String name) throws IOException {
System.out.println(name);
SearchRequest searchRequest = new SearchRequest(INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("name.keyword", name)); // Use "name.keyword" for exact match
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("Search response: " + searchResponse.toString());
return mapSearchResponseToProducts(searchResponse);
}

Adjusting the Mapping (if needed)

Before we able to use the keyword subfield, we need to ensure our Elasticsearch mapping includes a keyword subfield for name. If not already present, we need to reindex our data with the appropriate mapping:

{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      ...
    }
  }
}

To check the mapping of the name field in our Elasticsearch index. We can do this using the following command:

curl -X GET "http://localhost:9200/products/_mapping?pretty"
...
"name": {
  "type": "text",
  "fields": {
    "keyword": {
      "type": "keyword",
      "ignore_above": 256
    }
  }
},
...

This means that name is stored as both text and keyword types:

  • text type: Analyzed field used for full-text search. This is suitable for natural language processing and allows Elasticsearch to tokenize and analyze the content for efficient searching.
  • keyword type: Exact value field. This is used for sorting, aggregations, and exact match queries. It stores the exact value without analyzing it, making it suitable for searches that require exact matches.

Let’s try querying again. We should see the correct response this time:

[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}]%  

Using termQuery for Exact Match

In fact, to ensure we only get products that exactly match “Product A,, we should use a termQuery for an exact match. However, the termQuery requires that the field is not analyzed. Given that we have the name field as text and potentially analyzed, we might want to use the keyword subfield for exact matching or else it will return empty:

public List<Product> searchProductsByName(String name) throws IOException {
    SearchRequest searchRequest = new SearchRequest(INDEX);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.query(QueryBuilders.termQuery("name", name));
    searchRequest.source(searchSourceBuilder);

    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    return mapSearchResponseToProducts(searchResponse);
}

Try to access Product A, we should expect an empty response:

curl -X GET "http://localhost:8080/products/search?query=Product%20A"
[]

Setting Elasticsearch Index Mappings

By default, if we never define the mapping to the fields, Elasticsearch will create a dynamic mapping with both Text and Keyword data type.

...
"category": {
  "type": "text",
  "fields": {
    "keyword": {
      "type": "keyword",
      "ignore_above": 256
    }
  }
},
"description": {
  "type": "text",
  "fields": {
    "keyword": {
      "type": "keyword",
      "ignore_above": 256
    }
  }
},
"name": {
  "type": "text",
  "fields": {
    "keyword": {
      "type": "keyword",
      "ignore_above": 256
    }
  }
},
...

However, it’s essential to define appropriate field types to optimize search and indexing performance. Here’s how we can configure mappings for name, category and description fields:

Step 1: Create Elasticsearch Index Template File

Create a JSON file (products_template.json) with the following content. This template defines mappings for fields like “name” as “keyword” and “description” as “text”:

{
  "index_patterns": ["products_v2"],
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "category": {
        "type": "keyword",
      },
      "description": {
        "type": "text"
      },
      "id": {
        "type": "long"
      },
      "name": {
        "type": "keyword"
      },
      "price": {
        "type": "float"
      }
    }
  }
}

Step 2: Load the Template into Elasticsearch

We can load this template into Elasticsearch using the following curl command:

curl -X PUT "http://localhost:9200/_template/products_template" -H 'Content-Type: application/json' -d @products_template.json

This command uploads the index template (products_template.json) to Elasticsearch under the name products_template:

{"acknowledged":true}%

Step 3: Update Logstash Configuration to Use the Template

Modify our Logstash configuration file (mysql.conf) to instruct Logstash to use the index template products_template:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://docker.for.mac.host.internal:3306/test"
    jdbc_user => "root"
    jdbc_password => ""
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-j-9.0.0.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    statement => "SELECT * FROM products"
    schedule => "* * * * *"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "products_v2"
    document_id => "%{id}"
    template_name => "products_template"
    template_overwrite => false
  }
}

By following these steps, new indices created by Logstash (products_v2) will follow the mappings defined in our products_template.json file. Existing indices will retain their current mappings unless explicitly updated.

Let’s restart the Logstash and Elasticsearch containers and then run the search query again:

curl -X GET "http://localhost:8080/products/search?query=Product%20A"

[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99}]%

Update Record

Use curl to send a PUT request to update a product. Replace {id} with the actual ID of the product we want to update, and provide the updated JSON payload for the product:

curl -X PUT "http://localhost:8080/products/1" -H 'Content-Type: application/json' -d '
{
  "name": "Product A",    
  "category": "Updated Category",
  "description": "Updated Description",
  "price": 199.99
}

After updating, we can verify if the changes are reflected in Elasticsearch using the curl command to query:

curl -X GET "http://localhost:8080/products/search?query=Product%20A"

[{"id":3,"category":"CategoryA","name":"Product A","description":"This is a product A","price":99.99},{"id":1,"category":"Updated Category","name":"Product A","description":"Updated Description","price":199.99}]%      

Conclusion

In this tutorial, we’ve covered how to utilize Logstash for synchronizing MySQL data with Elasticsearch and briefly explored index mapping templates and basic querying techniques. In the upcoming tutorial, we’ll dive deeper into understanding text and keyword mappings, as well as exploring more advanced query capabilities. These topics will enhance our understanding and proficiency in Elasticsearch integration. The full source code is available on GitHub.

Share this content:

Leave a Comment

Discover more from nnyw@tech

Subscribe now to keep reading and get access to the full archive.

Continue reading